#!/usr/bin/env python3 # -*- coding: utf-8 -*- from typing import Self, Any, Optional from Interfaces.Application.CXCVInterface import CXCVInterface from Models.ResultsModel import ResultsModel from Abstracts.DatabasesAbstract import DatabasesAbstract from Utils.Patterns import RE from Utils.Utils import Utils from sqlite3 import Connection, Cursor, connect as sqlite_connect from re import Match as REMatch class SQLiteDriver(DatabasesAbstract): def __init__(self:Self, cxcv:CXCVInterface, key:str, inputs:dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...] ) -> None: super().__init__(cxcv, key, inputs) self.__connection:Connection|None = None self.__path:str = Utils.get_value("path", inputs) build_file:str|None = Utils.get_value("builder_file", inputs) self.__in_use:bool = False self.connect() build_file is None or self.execute_file(build_file) def connect(self:Self) -> bool: if self.__connection is None: try: directory:str = self.cxcv.files.get_directory_path(self.__path) self.cxcv.files.prepare_path(directory) self.__connection = sqlite_connect(self.__path, check_same_thread = False) except Exception as exception: self.cxcv.exception(exception, "database_driver_connection_exception") return False return True def close(self:Self) -> bool: try: if self.__connection: self.__connection.execute("PRAGMA wal_checkpoint(FULL);") self.__connection.close() self.__connection = None except Exception as exception: self.cxcv.exception(exception, "database_driver_close_exception") return False return True @staticmethod def build_match(matches:REMatch, parameters:dict[str, Any|None]) -> str: value:Any|None = parameters.get(matches.group(1), matches.group(0)) return ( "null" if value is None else "'" + str(value).replace("'", "''") + "'" if isinstance(value, str) else str(value)) @classmethod def prepare(cls:type[Self], sql:str, parameters:Optional[dict[str, Any|None]] = None, builder:Optional[dict[str, Any|None]] = None ) -> list[str]: queries:list[str] = [""] has_parameters:bool = parameters is not None if builder: sql = Utils.string_variables(sql, builder) while len(sql): comment:str|None fragment:str|None string:str|None separator:str|None matches:REMatch = RE.SQL_LITE.search(sql) comment, fragment, string, separator = matches.groups() if comment is None: if separator is None: queries[-1] += fragment or string or "" else: queries[-1] = queries[-1].strip() if len(queries[-1]): queries[-1] += ";" queries.append("") sql = sql[matches.end():] return [( RE.STRING_VARIABLE.sub( lambda matches:cls.build_match(matches, parameters), subsql ) if has_parameters else subsql) for subsql in queries if len(subsql)] def execute(self:Self, query:str, parameters:Optional[dict[str, Any|None]] = None, builder:Optional[dict[str, Any|None]] = None ) -> ResultsModel: while self.__in_use and self.cxcv.working: self.cxcv.wait(.01, .1) self.__in_use = True cursor:Cursor = self.__connection.cursor() results:ResultsModel = ResultsModel() try: for subquery in self.prepare(query, parameters, builder): order:str = subquery[:7].lower() try: cursor.execute(subquery) except Exception as exception: print(subquery) self.cxcv.exception(exception, "database_driver_execute_subquery_exception", { "subquery" : subquery }) continue if order == "insert ": self.__in_use = False results.ids.append(self.get_last_id()) elif order in ("select ", "with "): results.set(cursor.fetchall(), Type=tuple(column[0] for column in cursor.description)) self.__connection.commit() cursor.close() except Exception as exception: self.cxcv.exception(exception, "database_driver_execute_exception", { "query" : query, "parameters" : parameters }) self.__in_use = False return results def execute_file(self:Self, path:str, parameters:Optional[dict[str, Any|None]] = None, builder:Optional[dict[str, Any|None]] = None ) -> ResultsModel: return self.execute(self.cxcv.files.load(path, "r"), parameters, builder) def get(self:Self, query:str, parameters:Optional[dict[str, Any|None]] = None, builder:Optional[dict[str, Any|None]] = None ) -> Any|None: return self.execute(query, parameters, builder).get(0) def get_id(self:Self, query:str, parameters:Optional[dict[str, Any|None]] = None, builder:Optional[dict[str, Any|None]] = None ) -> int|None: value:Any|None = self.get(query, parameters, builder) return int(value) if value is not None else None def get_last_id(self:Self) -> int|None: return self.get_id("select last_insert_rowid();")