CXCV/Python/Drivers/SQLiteDriver.py

178 lines
5.8 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Self, Any, Optional
from Interfaces.Application.CXCVInterface import CXCVInterface
from Models.ResultsModel import ResultsModel
from Abstracts.DatabasesAbstract import DatabasesAbstract
from Utils.Patterns import RE
from Utils.Utils import Utils
from sqlite3 import Connection, Cursor, connect as sqlite_connect
from re import Match as REMatch
class SQLiteDriver(DatabasesAbstract):
def __init__(self:Self,
cxcv:CXCVInterface,
key:str,
inputs:dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...]
) -> None:
super().__init__(cxcv, key, inputs)
self.__connection:Connection|None = None
self.__path:str = Utils.get_value("path", inputs)
build_file:str|None = Utils.get_value("builder_file", inputs)
self.__in_use:bool = False
self.connect()
build_file is None or self.execute_file(build_file)
def connect(self:Self) -> bool:
if self.__connection is None:
try:
directory:str = self.cxcv.files.get_directory_path(self.__path)
self.cxcv.files.prepare_path(directory)
self.__connection = sqlite_connect(self.__path, check_same_thread = False)
except Exception as exception:
self.cxcv.exception(exception, "database_driver_connection_exception")
return False
return True
def close(self:Self) -> bool:
try:
if self.__connection:
self.__connection.execute("PRAGMA wal_checkpoint(FULL);")
self.__connection.close()
self.__connection = None
except Exception as exception:
self.cxcv.exception(exception, "database_driver_close_exception")
return False
return True
@staticmethod
def build_match(matches:REMatch, parameters:dict[str, Any|None]) -> str:
value:Any|None = parameters.get(matches.group(1), matches.group(0))
return (
"null" if value is None else
"'" + str(value).replace("'", "''") + "'" if isinstance(value, str) else
str(value))
@classmethod
def prepare(cls:type[Self],
sql:str,
parameters:Optional[dict[str, Any|None]] = None,
builder:Optional[dict[str, Any|None]] = None
) -> list[str]:
queries:list[str] = [""]
has_parameters:bool = parameters is not None
if builder:
sql = Utils.string_variables(sql, builder)
while len(sql):
comment:str|None
fragment:str|None
string:str|None
separator:str|None
matches:REMatch = RE.SQL_LITE.search(sql)
comment, fragment, string, separator = matches.groups()
if comment is None:
if separator is None:
queries[-1] += fragment or string or ""
else:
queries[-1] = queries[-1].strip()
if len(queries[-1]):
queries[-1] += ";"
queries.append("")
sql = sql[matches.end():]
return [(
RE.STRING_VARIABLE.sub(
lambda matches:cls.build_match(matches, parameters),
subsql
) if has_parameters else
subsql) for subsql in queries if len(subsql)]
def execute(self:Self,
query:str,
parameters:Optional[dict[str, Any|None]] = None,
builder:Optional[dict[str, Any|None]] = None
) -> ResultsModel:
while self.__in_use and self.cxcv.working:
self.cxcv.wait(.01, .1)
self.__in_use = True
cursor:Cursor = self.__connection.cursor()
results:ResultsModel = ResultsModel()
try:
for subquery in self.prepare(query, parameters, builder):
order:str = subquery[:7].lower()
try:
cursor.execute(subquery)
except Exception as exception:
print(subquery)
self.cxcv.exception(exception, "database_driver_execute_subquery_exception", {
"subquery" : subquery
})
continue
if order == "insert ":
self.__in_use = False
results.ids.append(self.get_last_id())
elif order in ("select ", "with "):
results.set(cursor.fetchall(), Type=tuple(column[0] for column in cursor.description))
self.__connection.commit()
cursor.close()
except Exception as exception:
self.cxcv.exception(exception, "database_driver_execute_exception", {
"query" : query,
"parameters" : parameters
})
self.__in_use = False
return results
def execute_file(self:Self,
path:str,
parameters:Optional[dict[str, Any|None]] = None,
builder:Optional[dict[str, Any|None]] = None
) -> ResultsModel:
return self.execute(self.cxcv.files.load(path, "r"), parameters, builder)
def get(self:Self,
query:str,
parameters:Optional[dict[str, Any|None]] = None,
builder:Optional[dict[str, Any|None]] = None
) -> Any|None:
return self.execute(query, parameters, builder).get(0)
def get_id(self:Self,
query:str,
parameters:Optional[dict[str, Any|None]] = None,
builder:Optional[dict[str, Any|None]] = None
) -> int|None:
value:Any|None = self.get(query, parameters, builder)
return int(value) if value is not None else None
def get_last_id(self:Self) -> int|None:
return self.get_id("select last_insert_rowid();")