#wip(py): ODBC Driver manage.

This commit is contained in:
mbruzon 2026-03-18 19:11:15 +01:00
parent 9906a16562
commit 4f9e9acfe9

View File

@ -12,6 +12,8 @@ from threading import Thread
from os.path import exists as path_exists, dirname as directory_name, abspath as absolute_path from os.path import exists as path_exists, dirname as directory_name, abspath as absolute_path
from json import loads as json_decode, dumps as json_encode from json import loads as json_decode, dumps as json_encode
from mimetypes import guess_type as get_mime_by_extension from mimetypes import guess_type as get_mime_by_extension
from pyodbc import connect as odbc_connect, Connection as ODBCConnection, Cursor as ODBCCursor
from time import time as timestamp, sleep
class NucelarMonitor: class NucelarMonitor:
@ -82,6 +84,7 @@ class NucelarMonitor:
RE_SLASHES:REPattern = re_compile(r'[\\\/]+') RE_SLASHES:REPattern = re_compile(r'[\\\/]+')
RE_TO_REGULAR_EXPRESSION:REPattern = re_compile(r'[\(\)\{\}\/\\\.\-\+\*\^\$\?\|\!\<\>\r\n\t]') RE_TO_REGULAR_EXPRESSION:REPattern = re_compile(r'[\(\)\{\}\/\\\.\-\+\*\^\$\?\|\!\<\>\r\n\t]')
RE_ROUTE_KEY:REPattern = re_compile(r'\\\{([a-z_][a-z0-9_]*)\\\}', RE_IGNORE_CASE) RE_ROUTE_KEY:REPattern = re_compile(r'\\\{([a-z_][a-z0-9_]*)\\\}', RE_IGNORE_CASE)
RE_ODBC_STRING_VARIABLE:REPattern = re_compile(r'\{([a-z_][a-z0-9_]*)\}|@([a-z0-9_]+)', RE_IGNORE_CASE)
class Request: class Request:
@ -237,6 +240,17 @@ class NucelarMonitor:
self.__commands:list[list[list[str], Callable[[dict[str, Any|None], list[Any|None]], None]]] = [ self.__commands:list[list[list[str], Callable[[dict[str, Any|None], list[Any|None]], None]]] = [
[["close", "exit", "quit", "bye"], self.__close_command] [["close", "exit", "quit", "bye"], self.__close_command]
] ]
self.__odbc_connection:ODBCConnection|None = None
self.__odbc_string_connection:str = self.string_variables(self.get((
"odbc_string_connection", "sql_string_connection", "string_connection"
), None, "DRIVER={driver};SERVER={host},{port};UID={user};PWD={password};DATABASE={database}"), {
"driver" : self.get("sql_driver", None, "{ODBC Driver 17 for SQL Server}"),
"host" : self.get(("sql_host", "odbc_host"), None, "localhost"),
"port" : self.get(("sql_port", "odbc_port"), None, 1433),
"user" : self.get(("sql_user", "odbc_user"), None, "sa"),
"password" : self.get(("sql_password", "odbc_password"), None, "password"),
"database" : self.get(("sql_database", "odbc_database"), None, "NucelarMonitor")
})
for _ in range(2): for _ in range(2):
self.__root_paths.append(self.RE_LAST_DIRECTORY.sub(r'\1', self.__root_paths[-1])) self.__root_paths.append(self.RE_LAST_DIRECTORY.sub(r'\1', self.__root_paths[-1]))
@ -612,6 +626,89 @@ class NucelarMonitor:
"route" : route, "route" : route,
"response_length" : len(response.body) "response_length" : len(response.body)
}) })
def __odbc_connection_autoclose(self:Self) -> None:
pass
def format_odbc_query(self:Self,
query:str,
parameters:Optional[dict[str, Any|None]|Sequence[Any|None]] = None
) -> tuple[str, list[str]]:
variables:list[str] = []
def callback(matches:REMatch) -> str:
key:str = matches.group(1)
if key:
return (
"'" + str(parameters[key]).replace("'","''") + "'" if isinstance(parameters[key], str) else
str(parameters[key]) if key in parameters else matches.group(0))
key = matches.group(2)
variables.append(key)
return matches.group(0)
query = self.RE_ODBC_STRING_VARIABLE.sub(callback, query)
return (
"".join("declare @" + variable + " varchar(max)\n" for variable in variables) +
query +
"\nselect " + ", ".join("@" + variable for variable in variables)
) if len(variables) else query, variables
def odbc_query(self:Self,
query:str,
parameters:Optional[dict[str, Any|None]|Sequence[Any|None]] = None
) -> dict[str, dict[str, Any|None]|list[list[list[Any|None]]]|list[list[str]]]:
response:dict[str, dict[str, Any|None]|list[list[list[Any|None]]]|list[list[str]]] = {
"tables" : [],
"columns" : [],
"variables" : {}
}
cursor:ODBCCursor|None = None
variables:list[str] = []
try:
if self.__odbc_connection is None:
self.__odbc_connection = odbc_connect(
self.__odbc_string_connection,
autocommit = True
)
cursor = self.__odbc_connection.cursor()
query, variables = self.format_odbc_query(query, parameters)
cursor.execute(query)
while True:
if cursor.description is not None:
response["columns"].append([column[0] for column in cursor.description])
response["tables"].append([tuple(row) for row in cursor.fetchall()])
if not cursor.nextset():
break
except Exception as exception:
self.exception(exception, "odbc_connection_exception", {
"string_connection" : self.__odbc_string_connection
})
finally:
if cursor is not None:
cursor.close()
if len(variables) and len(response["tables"]):
for i, variable in enumerate(variables):
response["variables"][variable] = response["tables"][-1][0][i]
response["tables"] = response["tables"][:-1]
response["columns"] = response["columns"][:-1]
return response
def debian(self:Self, request:Request, response:Response) -> None: def debian(self:Self, request:Request, response:Response) -> None: