diff --git a/Python/NucelarMonitor.py b/Python/NucelarMonitor.py index 8ef30ca..dbba3d2 100644 --- a/Python/NucelarMonitor.py +++ b/Python/NucelarMonitor.py @@ -12,6 +12,8 @@ from threading import Thread from os.path import exists as path_exists, dirname as directory_name, abspath as absolute_path from json import loads as json_decode, dumps as json_encode from mimetypes import guess_type as get_mime_by_extension +from pyodbc import connect as odbc_connect, Connection as ODBCConnection, Cursor as ODBCCursor +from time import time as timestamp, sleep class NucelarMonitor: @@ -82,6 +84,7 @@ class NucelarMonitor: RE_SLASHES:REPattern = re_compile(r'[\\\/]+') RE_TO_REGULAR_EXPRESSION:REPattern = re_compile(r'[\(\)\{\}\/\\\.\-\+\*\^\$\?\|\!\<\>\r\n\t]') RE_ROUTE_KEY:REPattern = re_compile(r'\\\{([a-z_][a-z0-9_]*)\\\}', RE_IGNORE_CASE) + RE_ODBC_STRING_VARIABLE:REPattern = re_compile(r'\{([a-z_][a-z0-9_]*)\}|@([a-z0-9_]+)', RE_IGNORE_CASE) class Request: @@ -237,6 +240,17 @@ class NucelarMonitor: self.__commands:list[list[list[str], Callable[[dict[str, Any|None], list[Any|None]], None]]] = [ [["close", "exit", "quit", "bye"], self.__close_command] ] + self.__odbc_connection:ODBCConnection|None = None + self.__odbc_string_connection:str = self.string_variables(self.get(( + "odbc_string_connection", "sql_string_connection", "string_connection" + ), None, "DRIVER={driver};SERVER={host},{port};UID={user};PWD={password};DATABASE={database}"), { + "driver" : self.get("sql_driver", None, "{ODBC Driver 17 for SQL Server}"), + "host" : self.get(("sql_host", "odbc_host"), None, "localhost"), + "port" : self.get(("sql_port", "odbc_port"), None, 1433), + "user" : self.get(("sql_user", "odbc_user"), None, "sa"), + "password" : self.get(("sql_password", "odbc_password"), None, "password"), + "database" : self.get(("sql_database", "odbc_database"), None, "NucelarMonitor") + }) for _ in range(2): self.__root_paths.append(self.RE_LAST_DIRECTORY.sub(r'\1', self.__root_paths[-1])) @@ -612,6 +626,89 @@ class NucelarMonitor: "route" : route, "response_length" : len(response.body) }) + + def __odbc_connection_autoclose(self:Self) -> None: + pass + + def format_odbc_query(self:Self, + query:str, + parameters:Optional[dict[str, Any|None]|Sequence[Any|None]] = None + ) -> tuple[str, list[str]]: + + variables:list[str] = [] + + def callback(matches:REMatch) -> str: + + key:str = matches.group(1) + + if key: + return ( + "'" + str(parameters[key]).replace("'","''") + "'" if isinstance(parameters[key], str) else + str(parameters[key]) if key in parameters else matches.group(0)) + + key = matches.group(2) + + variables.append(key) + + return matches.group(0) + + query = self.RE_ODBC_STRING_VARIABLE.sub(callback, query) + + return ( + "".join("declare @" + variable + " varchar(max)\n" for variable in variables) + + query + + "\nselect " + ", ".join("@" + variable for variable in variables) + ) if len(variables) else query, variables + + def odbc_query(self:Self, + query:str, + parameters:Optional[dict[str, Any|None]|Sequence[Any|None]] = None + ) -> dict[str, dict[str, Any|None]|list[list[list[Any|None]]]|list[list[str]]]: + + response:dict[str, dict[str, Any|None]|list[list[list[Any|None]]]|list[list[str]]] = { + "tables" : [], + "columns" : [], + "variables" : {} + } + cursor:ODBCCursor|None = None + variables:list[str] = [] + + try: + + if self.__odbc_connection is None: + self.__odbc_connection = odbc_connect( + self.__odbc_string_connection, + autocommit = True + ) + cursor = self.__odbc_connection.cursor() + query, variables = self.format_odbc_query(query, parameters) + + cursor.execute(query) + + while True: + + if cursor.description is not None: + response["columns"].append([column[0] for column in cursor.description]) + response["tables"].append([tuple(row) for row in cursor.fetchall()]) + + if not cursor.nextset(): + break + + except Exception as exception: + self.exception(exception, "odbc_connection_exception", { + "string_connection" : self.__odbc_string_connection + }) + finally: + if cursor is not None: + cursor.close() + + if len(variables) and len(response["tables"]): + for i, variable in enumerate(variables): + response["variables"][variable] = response["tables"][-1][0][i] + response["tables"] = response["tables"][:-1] + response["columns"] = response["columns"][:-1] + + return response def debian(self:Self, request:Request, response:Response) -> None: