Source code for impscan.db.db_utils

import sqlite3
from sys import stderr

from ..assets import _dir_path as store_path

__all__ = ["PackageDB", "CondaPackageDB"]  # TODO: "PyPIPackageDB"


[docs]class PackageDB: filename = "package_catalogue.db" # Default value directory = store_path def __init__(self, dir=directory, filename=filename, create=True, no_touch=False): self.directory = dir self.filename = filename if create: self.create(no_touch=no_touch) @property def path(self): return self.directory / self.filename
[docs] def exists(self): return self.path.exists()
[docs] def connect(self): return sqlite3.connect(self.path)
# This would also be useful for imported name when supplied
[docs] def has_package(self, package_name): peek = self.retrieve_package(package_name, fetch_all=False) has_pkg = peek is not None return has_pkg
def __repr__(self): return f"{type(self)} '{self.filename}' at {self.directory}"
[docs]class CondaPackageDB(PackageDB):
[docs] def create(self, no_touch=False): if no_touch and not self.path.exists(): raise FileNotFoundError(f"No PackageDB at {self.db.path}") with self.connect() as conn: c = conn.cursor() c.execute( """ CREATE TABLE IF NOT EXISTS conda_packages (packagename varchar(100), importedname varchar(100), channel tinytext, depends tinytext, filename tinytext, url tinytext, version varchar(100), rootpkgs text, Constraint pk_pid Primary key(channel, filename)) """ )
[docs] def insert_entry( self, pkgname: str, # "name" key impname: str, # site-packages (imported) name channel: str, depends: str, fn: str, # filename url: str, version: str, rootpkgs: str, ): try: with self.connect() as conn: c = conn.cursor() c.execute( "INSERT INTO conda_packages VALUES (?,?,?,?,?,?,?,?)", (pkgname, impname, channel, depends, fn, url, version, rootpkgs), ) conn.commit() except: print( f"{pkgname=} {impname=} {channel=} {depends=}", f"{fn=} {url=} {version=} {rootpkgs=}", file=stderr, ) raise
[docs] def retrieve_filename(self, fn, fetch_all=False): with self.connect() as conn: query_sql = """ SELECT * FROM conda_packages WHERE filename == ? """ c = conn.cursor() c.execute(query_sql, (fn,)) return c.fetchall() if fetch_all else c.fetchone()
[docs] def retrieve_package(self, package_name, fetch_all=True): with self.connect() as conn: query_sql = """ SELECT * FROM conda_packages WHERE packagename == ? """ c = conn.cursor() c.execute(query_sql, (package_name,)) return c.fetchall() if fetch_all else c.fetchone()