diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2efc0d9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/Data +__pycache__ +*.[Ss]ecrets.* +*.[Ss]ecret.* \ No newline at end of file diff --git a/JSON/CXCV.commands.json b/JSON/CXCV.commands.json new file mode 100644 index 0000000..4bcc730 --- /dev/null +++ b/JSON/CXCV.commands.json @@ -0,0 +1,4 @@ +[{ + "names" : ["close", "exit", "quit", "bye", "shutdown"], + "action" : "cxcv.terminal.close" +}] \ No newline at end of file diff --git a/JSON/CXCV.settings.json b/JSON/CXCV.settings.json new file mode 100644 index 0000000..2b05106 --- /dev/null +++ b/JSON/CXCV.settings.json @@ -0,0 +1,8 @@ +{ + "default_settings_files" : ["/JSON/CXCV.settings.json"], + "secrets_settings_files" : ["/JSON/CXCV.secrets.json"], + "default_commands_files" : ["/JSON/CXCV.commands.json"], + "print_format" : "[{type}] {yyyy}{mm}{dd} {hh}{ii}{ss} [{line}]{file}({method}): {message}", + "exception_format" : " '{file}({method})[{line}]'{lines}\n\n{exception_message}", + "default_chunk_size" : 4096 +} \ No newline at end of file diff --git a/JSON/I18N/CXCV.i18n.english.json b/JSON/I18N/CXCV.i18n.english.json new file mode 100644 index 0000000..c713dda --- /dev/null +++ b/JSON/I18N/CXCV.i18n.english.json @@ -0,0 +1,3 @@ +{ + "english" : {} +} \ No newline at end of file diff --git a/JSON/I18N/CXCV.i18n.espanol.json b/JSON/I18N/CXCV.i18n.espanol.json new file mode 100644 index 0000000..f464355 --- /dev/null +++ b/JSON/I18N/CXCV.i18n.espanol.json @@ -0,0 +1,3 @@ +{ + "espanol" : {} +} \ No newline at end of file diff --git a/JSON/I18N/CXCV.i18n.galego.json b/JSON/I18N/CXCV.i18n.galego.json new file mode 100644 index 0000000..39f43be --- /dev/null +++ b/JSON/I18N/CXCV.i18n.galego.json @@ -0,0 +1,3 @@ +{ + "galego" : {} +} \ No newline at end of file diff --git a/Python/Abstracts/ControllerAbstract.py b/Python/Abstracts/ControllerAbstract.py new file mode 100644 index 0000000..70d13d8 --- /dev/null +++ b/Python/Abstracts/ControllerAbstract.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Optional, Any +from Interfaces.Application.CXCVInterface import CXCVInterface +from Utils.Utils import Utils + +class ControllerAbstract: + + def __init__(self:Self, + cxcv:CXCVInterface, + key:str, + inputs:Optional[dict[str, Any|None]|tuple[Any|None, ...]|list[Any|None]] = None + ) -> None: + self.cxcv:CXCVInterface = cxcv + self.key:str = key + self.via:str = Utils.get_value("via", inputs, "sqlite") \ No newline at end of file diff --git a/Python/Abstracts/DatabasesAbstract copy.py b/Python/Abstracts/DatabasesAbstract copy.py new file mode 100644 index 0000000..6ff112b --- /dev/null +++ b/Python/Abstracts/DatabasesAbstract copy.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from Interfaces.Application.CXCVInterface import CXCVInterface +from Models.ResultsModel import ResultsModel +from typing import Optional, Self, Any +from abc import ABC, abstractmethod + +class DatabasesAbstract(ABC): + + def __init__(self:Self, + cxcv:CXCVInterface, + key:str, + inputs:dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...] + ) -> None: + self.cxcv:CXCVInterface = cxcv + self.key:str = key + self.type:str = "UNKNOWN" + + @abstractmethod + def connect(self:Self) -> bool:pass + + @abstractmethod + def close(self:Self) -> bool:pass + + @abstractmethod + def execute(self:Self, + sql:str, + parameters:Optional[dict[str, Any|None]] = None + ) -> ResultsModel:pass + + @abstractmethod + def execute_file(self:Self, + path:str, + parameters:Optional[dict[str, Any|None]] = None + ) -> ResultsModel:pass + + @abstractmethod + def get_id(self:Self, query:str, parameters:Optional[dict[str, Any|None]] = None) -> int|None:pass + + @abstractmethod + def get_last_id(self:Self) -> int|None:pass \ No newline at end of file diff --git a/Python/Abstracts/DatabasesAbstract.py b/Python/Abstracts/DatabasesAbstract.py new file mode 100644 index 0000000..6ff112b --- /dev/null +++ b/Python/Abstracts/DatabasesAbstract.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from Interfaces.Application.CXCVInterface import CXCVInterface +from Models.ResultsModel import ResultsModel +from typing import Optional, Self, Any +from abc import ABC, abstractmethod + +class DatabasesAbstract(ABC): + + def __init__(self:Self, + cxcv:CXCVInterface, + key:str, + inputs:dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...] + ) -> None: + self.cxcv:CXCVInterface = cxcv + self.key:str = key + self.type:str = "UNKNOWN" + + @abstractmethod + def connect(self:Self) -> bool:pass + + @abstractmethod + def close(self:Self) -> bool:pass + + @abstractmethod + def execute(self:Self, + sql:str, + parameters:Optional[dict[str, Any|None]] = None + ) -> ResultsModel:pass + + @abstractmethod + def execute_file(self:Self, + path:str, + parameters:Optional[dict[str, Any|None]] = None + ) -> ResultsModel:pass + + @abstractmethod + def get_id(self:Self, query:str, parameters:Optional[dict[str, Any|None]] = None) -> int|None:pass + + @abstractmethod + def get_last_id(self:Self) -> int|None:pass \ No newline at end of file diff --git a/Python/Abstracts/ProcedureAbstract.py b/Python/Abstracts/ProcedureAbstract.py new file mode 100644 index 0000000..11f9009 --- /dev/null +++ b/Python/Abstracts/ProcedureAbstract.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Optional, Any +from Interfaces.Application.CXCVInterface import CXCVInterface + +class ProcedureAbstract: + + def __init__(self:Self, + cxcv:CXCVInterface, + key:str, + via:str, + inputs:Optional[dict[str, Any|None]|tuple[Any|None, ...]|list[Any|None]] = None + ) -> None: + self.cxcv:CXCVInterface = cxcv + self.key:str = key + self.via:str = via \ No newline at end of file diff --git a/Python/Abstracts/TableAbstract.py b/Python/Abstracts/TableAbstract.py new file mode 100644 index 0000000..e0ba183 --- /dev/null +++ b/Python/Abstracts/TableAbstract.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any +from abc import ABC, abstractmethod + +class TableAbstract(ABC): + + @abstractmethod + def get(self:Self) -> tuple[tuple[str]|None, tuple[tuple[Any|None, ...], ...]]:pass + + @abstractmethod + def get_tuples(self:Self) -> tuple[tuple[Any|None, ...], ...]:pass + + @abstractmethod + def get_dictionaries(self:Self) -> dict[str, Any|None]:pass \ No newline at end of file diff --git a/Python/Application/CXCV.py b/Python/Application/CXCV.py new file mode 100644 index 0000000..9aa995e --- /dev/null +++ b/Python/Application/CXCV.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from os.path import dirname as directory_name, abspath as path_absolute + +ROOT_PATH:str = directory_name(path_absolute(__file__)) + +from typing import Self, Any, Optional +from datetime import datetime +from re import Match as REMatch +from Drivers.FilesDriver import FilesDriver +from Managers.SettingsManager import SettingsManager +from Managers.I18NManager import I18NManager +from Managers.TerminalManager import TerminalManager +from Managers.ModelsManager import ModelsManager +from Managers.DatabasesManager import DatabasesManager +from Managers.ControllersManager import ControllersManager +from Managers.ProceduresManager import ProceduresManager +from Controllers.LogsController import LogsController +from Utils.Utils import Utils +from Utils.Patterns import RE + +class CXCV: + + def __init__(self:Self, + inputs:Optional[dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...]] = None + ) -> None: + + project_root_path:str = ROOT_PATH[:ROOT_PATH.index("/CXCV") + 6] + + self.__print_format:str = "[{yyyy}-{mm}-{dd} {hh}:{ii}:{ss}] [{type}] {message}" + self.__exception_format:str = "\nException in '{file}({method})[{line}]'{lines}\n\n{exception_message}" + self.working:bool = True + + self.files:FilesDriver = FilesDriver(self, ROOT_PATH) + self.settings:SettingsManager = SettingsManager(self, inputs) + + self.__print_format = self.settings.get("print_format", None, self.__print_format) + self.__exception_format = self.settings.get("exception_format", None, self.__exception_format) + + self.i18n:I18NManager = I18NManager(self) + self.terminal:TerminalManager = TerminalManager(self) + self.models:ModelsManager = ModelsManager(self) + self.databases:DatabasesManager = DatabasesManager(self) + self.controllers:ControllersManager = ControllersManager(self) + self.procedures:ProceduresManager = ProceduresManager(self) + + self.terminal.start() + + def close(self:Self) -> None: + self.working = False + + def print(self:Self, + _type:str, + string:str, + inputs:Optional[dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...]] = None, + i:int = 0 + ) -> None: + + date:datetime = datetime.now() + own:dict[str, Any|None] = { + "raw_type" : _type, + "type" : _type.upper()[:4], + "i18n" : Utils.get_strings(string)[0], + "message" : self.i18n.get(string, inputs), + **Utils.get_action_data(i + 1), + **Utils.get_dictionary(inputs) + } + + try: + own["file"] = own["file"][str(own["file"]).index("/CXCV/"):] + except Exception as exception: + pass + + while len(own["type"]) < 4: + own["type"] = " " + own["type"] if len(own["type"]) % 2 else own["type"] + " " + + for key in ("year", "month", "day", "hour", "minute", "second"): + + k:str = key[0] if key != "minute" else "i" + + own[k] = own[key] = getattr(date, key) + own[k + k] = ("00" + str(own[key] % 100))[-2:] + + own["yyyy"] = ("0000" + str(own["year"]))[-4:] + + if "end" in own: + own["message"] += own["end"] + + print(Utils.string_variables(self.__print_format, own)) + + if not self.controllers: + return + + controller:LogsController|None = self.controllers.get( + LogsController, "logs" + ) + + controller and own["type"] != "EXCE" and controller.set_log(own["i18n"], ( + 1 if own["type"] == "EXCE" else + 2 if own["type"] == "WARN" else + 3 if own["type"] == "ERRO" else + 0), inputs or {}, i) + + def exception(self, + exception:Exception, + message:Optional[str|list|tuple] = None, + inputs:Optional[dict[str, Any]|list|tuple] = None, + i:Optional[int] = 0 + ) -> None: + + lines:list[str]|None = Utils.get_trace(exception) + line_matches:REMatch[str]|None = RE.EXCEPTION.match(lines[-1]) + data:dict[str, Any|None] = { + **{key : value for subset in (inputs if isinstance(inputs, (list, tuple)) else (inputs,)) for key, value in (subset if isinstance(subset, dict) else {}).items()}, + **Utils.get_action_data(1), + "lines" : "".join("\n " + RE.BREAK_LINES.split(line.strip())[0] for line in lines), + "exception_message" : str(exception), + "method" : line_matches.group(3), + "line" : line_matches.group(2), + "file" : line_matches.group(1) + } + + data["end"] = Utils.string_variables(self.__exception_format, data) + + if message: + + self.print("exception", message, data, None, i + 1) + + if not self.controllers: + return + + controller:LogsController|None = self.controllers.get( + LogsController, "logs" + ) + + controller and controller.set_exception(exception, message, inputs or {}, None, i) \ No newline at end of file diff --git a/Python/Controllers/LogsController.py b/Python/Controllers/LogsController.py new file mode 100644 index 0000000..a539e2e --- /dev/null +++ b/Python/Controllers/LogsController.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any, Optional +from Abstracts.ControllerAbstract import ControllerAbstract +from Interfaces.Procedures.LogsProcedureInterface import LogsProcedureInterface +from Utils.Utils import Utils + +class LogsController(ControllerAbstract): + + def set_log(self:Self, + message:str, + error:int = 0, + parameters:dict[str, Any|None] = {}, + i:int = 0 + ) -> None: + + data:dict[str, Any|None] = Utils.get_action_data(i + 2) + + self.cxcv.procedures.get( + LogsProcedureInterface, (self.key, self.via) + ).set_log("CXCV", data["file"], data["method"], data["line"], message, error, parameters) + + def set_exception(self:Self, + exception:Exception, + message:str, + parameters:dict[str, Any|None] = {}, + status:Optional[str] = None, + i:int = 0 + ) -> None: + + trace:list[str] = Utils.get_trace(exception) + data:dict[str, Any|None] = Utils.get_action_data(i + 2) + + self.cxcv.procedures.get( + LogsProcedureInterface, (self.key, self.via) + ).set_exception("CXCV", data["file"], data["method"], data["line"], message, str(exception), "\n".join( + "- " + line for line in trace + ), parameters, status) \ No newline at end of file diff --git a/Python/Controllers/OrdersController.py b/Python/Controllers/OrdersController.py new file mode 100644 index 0000000..4a240dc --- /dev/null +++ b/Python/Controllers/OrdersController.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any, Optional +from Abstracts.ControllerAbstract import ControllerAbstract +from Models.FileModel import FileModel +from Models.HashModel import HashModel +from Interfaces.FilesInterface import FilesInterface +from Utils.Utils import Utils + +class OrdersController(ControllerAbstract): + + def execute(self:Self, + inputs:dict[str, Any|None]|tuple[Any|None, ...]|list[Any|None] + ) -> None: + + file_name:str + _from:FilesInterface = FileModel.get_driver(self.cxcv, FileModel.get_inputs({ + "mode" : "rb" + }, Utils.get_value("from", inputs))) + _to:FilesInterface = FileModel.get_driver(self.cxcv, FileModel.get_inputs({ + "mode" : "wb" + }, Utils.get_value("to", inputs))) + from_path:str = Utils.get_value("from_path", inputs) + to_path:str = Utils.get_value("to_path", inputs) + tries:tuple[int, ...] = tuple(range(Utils.get_value("tries", inputs, 3))) + action:str = Utils.get_value("action", inputs, "copy") + remove_from:bool = action in ("move", "cut", "remove") + save_to:bool = action in ("copy", "move", "cut") + + for file_name in _from.list(): + + try_i:int + + for try_i in tries: + + _from.open(from_path + "/" + file_name) + _to.reset(to_path + "/" + file_name) + + if _from.is_file: + pass + elif _from.is_directory: + pass + else: + break \ No newline at end of file diff --git a/Python/Drivers/FilesDriver.py b/Python/Drivers/FilesDriver.py new file mode 100644 index 0000000..08194d9 --- /dev/null +++ b/Python/Drivers/FilesDriver.py @@ -0,0 +1,191 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any, Callable +from Interfaces.Application.CXCVInterface import CXCVInterface +from Utils.Patterns import RE +from Utils.Utils import Utils +from os.path import exists as path_exists, isfile as is_file, isdir as is_directory +from os import mkdir as make_directory, remove as remove_file +from os import listdir as list_directory, stat as get_stat, stat_result as StatResult +from shutil import rmtree as remove_directory + +class FilesDriver: + + def __init__(self:Self, cxcv:CXCVInterface, root_path:str) -> None: + self.cxcv:CXCVInterface = cxcv + + self.root_paths:tuple[str, ...] = ("", root_path) + self.slash:str = "/" if "/" in root_path else "\\" + + for _ in range(2): + self.root_paths += (RE.LAST_PATH_ITEM.sub(r'', self.root_paths[-1]),) + + def fix_path(self:Self, path:str) -> str: + return RE.SLASHES.sub(self.slash, path) + + def exists(self:Self, path:str, analyze_roots:bool = True) -> bool: + if analyze_roots: + return self.get_absolute_path(path) is not None + return path_exists(path) + + def is_file(self:Self, path:str, analyze_roots:bool = True) -> bool: + if analyze_roots and (path := self.get_absolute_path(path)) is None: + return False + return is_file(path) + + def is_directory(self:Self, path:str, analyze_roots:bool = True) -> bool: + if analyze_roots and (path := self.get_absolute_path(path)) is None: + return False + return is_directory(path) + + def get_absolute_path(self:Self, path:str) -> str|None: + + root:str + + for root in self.root_paths: + absolute_path:str = self.fix_path((root + self.slash if root else "") + path) + if self.exists(absolute_path, False): + return absolute_path + return None + + def load(self:Self, path:str, mode:str = "r") -> str|bytes|None: + try: + + absolute_path:str = self.get_absolute_path(path) + + if absolute_path is not None and is_file(absolute_path): + with open(absolute_path, mode) as file: + return file.read() + + except Exception as exception: + pass + return None + + def load_by_chunks(self:Self, + path:str, + callback:Callable[[bytes], None], + chunk_size:int = 4096 + ) -> None: + try: + + absolute_path:str = self.get_absolute_path(path) + + if absolute_path is not None and is_file(absolute_path): + with open(absolute_path, "rb") as file: + while True: + chunk:bytes = file.read(chunk_size) + if not chunk: + break + callback(chunk) + + except Exception as exception: + pass + + def save(self:Self, path:str, data:str|bytes, mode:str = "w") -> bool: + try: + self.prepare_path(self.get_directory_path(path)) + with open(self.fix_path(path), mode) as file: + file.write(data) + return True + except Exception as exception: + self.cxcv.exception(exception, "files_driver_save_exception", { + "path" : path, + "mode" : mode, + "length" : len(data) if data is not None else 0 + }) + return False + + def remove(self:Self, path:str) -> bool: + if self.exists(path := self.fix_path(path), False): + try: + if self.is_file(path, False): + remove_file(path) + elif self.is_directory(path, False): + remove_directory(path) + return True + except Exception as exception: + pass + return False + + def load_json(self:Self, + inputs:Any|None, + only_dictionaries:bool = True + ) -> tuple[dict[str, Any|None]|tuple[Any|None, ...]|list[Any|None], ...]: + + items:tuple[dict[str, Any|None]|tuple[Any|None, ...]|list[Any|None], ...] = () + + if isinstance(inputs, dict): + items += (inputs,) + elif isinstance(inputs, list) or isinstance(inputs, tuple): + if only_dictionaries: + + subinputs:Any|None + + for subinputs in inputs: + items += self.load_json(subinputs, only_dictionaries) + + else: + items += (inputs,) + elif isinstance(inputs, str): + + json:dict[str, Any|None]|tuple[Any|None, ...]|list[Any|None]|None = Utils.json_decode(inputs) + + items += ( + self.load_json(Utils.json_decode(self.load(inputs, "r")), only_dictionaries) if json is None else + self.load_json(json, only_dictionaries)) + + return items + + def get_directory_path(self:Self, path:str) -> str: + return RE.LAST_PATH_ITEM.sub(r'', self.fix_path(path)) + + def prepare_path(self:Self, path:str) -> bool: + try: + + parts:list[str] = RE.SLASHES.split(path) + directory:str = "/" if self.slash == "/" else "" + part:str + + for part in parts: + if part: + directory += parts[0] + self.slash + self.exists(directory, False) or make_directory(directory) + parts = parts[1:] + + return True + except Exception as exception: + print(exception) + return False + + def list(self:Self, path:str) -> list[str]: + + items:list[str] = [] + + try: + + absolute_path:str = self.get_absolute_path(path) + + if absolute_path is not None and is_directory(absolute_path): + items = list_directory(absolute_path) + + except Exception as exception: + pass + + return items + + def get_stat(self:Self, path:str) -> StatResult|None: + + stat:StatResult|None = None + + try: + + absolute_path:str = self.get_absolute_path(path) + + if absolute_path is not None and self.exists(absolute_path, False): + stat = get_stat(absolute_path) + + except Exception as exception: + pass + + return stat \ No newline at end of file diff --git a/Python/Drivers/LocalFileDriver.py b/Python/Drivers/LocalFileDriver.py new file mode 100644 index 0000000..abe7629 --- /dev/null +++ b/Python/Drivers/LocalFileDriver.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import IO, Callable, Self, Optional, Any +from os import stat_result as StatResult, listdir as list_directory, remove as remove_path, stat as get_stat +from os.path import exists as path_exists +from stat import S_ISDIR as is_directory, S_ISREG as is_file +from Interfaces.Application.CXCVInterface import CXCVInterface +from Interfaces.FilesInterface import FilesInterface +from Managers.EventsManager import EventsManager +from Models.HashModel import HashModel +from Utils.Utils import Utils + +class LocalFileDriver(FilesInterface): + + def __init__(self:Self, + cxcv:Any, + path:str, + inputs:Optional[dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...]] = None + ) -> None: + + self.cxcv:CXCVInterface = cxcv + self.path:str = path + self.chunk_size:int = self.cxcv.settings.get(("chunk_size", "default_chunk_size"), inputs, 4096) + self.__file:IO[Any]|None = None + self.mode:str = self.cxcv.settings.get(("mode", "file_mode", "default_file_mode"), inputs, "rb") + self.__closed:bool = False + self.on_load_chunk:EventsManager = EventsManager() + callback:Callable[[bytes], bool]|None = Utils.get_value("on_load_chunk", inputs) + self.__autoclose:bool = self.cxcv.settings.get("auto_close", inputs, True) + self.reusable:bool = self.cxcv.settings.get("reusable", inputs, False) + self.exists:bool = path_exists(self.path) + self.is_file:bool = False + self.is_directory:bool = False + self.size:int = 0 + self.hash:HashModel = None + stat:StatResult + + try: + stat = get_stat(self.path) + self.exists = True + self.is_file = is_file(stat.st_mode) + self.is_directory = is_directory(stat.st_mode) + self.size = stat.st_size + except Exception as exception: + pass + + callback and self.on_load_chunk.add(callback) + + self.cxcv.settings.get("auto_open", inputs, True) and self.open() + + def open(self:Self, new_path:Optional[str] = None) -> Self: + + if new_path or self.reusable: + if new_path: + self.path = new_path + if self.__file and not self.__closed: + self.__file.close() + self.__closed = False + elif self.__file is not None: + return self + + self.__file = open(self.path, mode = self.mode) + + return self + + def close(self:Self) -> Self: + + if not self.__closed: + self.__closed = True + self.hash = None + self.is_directory = False + self.is_file = False + self.exists = False + self.size = 0 + self.__file and self.__file.close() + + return self + + def load(self:Self, all:bool = False) -> bytes: + + data:bytes = self.__file.read(self.size if all else self.chunk_size) + + if data: + self.on_load_chunk.execute(data) + elif self.__autoclose: + self.close() + + def save(self:Self, chunk:Optional[bytes|bool] = None) -> None: + if chunk: + self.__file.write(chunk) + elif self.__autoclose: + self.close() + + def list(self:Self) -> list[str]: + return list_directory(self.path) if self.is_directory else [] + + def remove(self:Self) -> None: + remove_path(self.path) + + def reset(self:Self) -> Self: + + self.close() + self.open() + + return self \ No newline at end of file diff --git a/Python/Drivers/SFTPFileDriver.py b/Python/Drivers/SFTPFileDriver.py new file mode 100644 index 0000000..886013b --- /dev/null +++ b/Python/Drivers/SFTPFileDriver.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Callable, Self, Optional, Any +from paramiko import SFTPFile, SSHClient, SFTPClient, AutoAddPolicy +from os import stat_result as StatResult +from stat import S_ISDIR as is_directory, S_ISREG as is_file +from Interfaces.Application.CXCVInterface import CXCVInterface +from Interfaces.FilesInterface import FilesInterface +from Managers.EventsManager import EventsManager +from Models.HashModel import HashModel +from Utils.Utils import Utils + +class SFTPFileDriver(FilesInterface): + + def __init__(self:Self, + cxcv:Any, + path:str, + inputs:Optional[dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...]] = None + ) -> None: + + self.cxcv:CXCVInterface = cxcv + self.__client:SSHClient = SSHClient() + self.path:str = path + self.chunk_size:int = self.cxcv.settings.get(("chunk_size", "default_chunk_size"), inputs, 4096) + self.__file:SFTPFile|None = None + self.mode:str = self.cxcv.settings.get("file_mode", inputs, "rb") + self.__closed:bool = False + self.on_load_chunk:EventsManager = EventsManager() + callback:Callable[[bytes], bool]|None = Utils.get_value("on_load_chunk", inputs) + self.__autoclose:bool = self.cxcv.settings.get("auto_close", inputs, True) + self.reusable:bool = self.cxcv.settings.get("reusable", inputs, True) + self.exists:bool = False + self.is_file:bool = False + self.is_directory:bool = False + self.size:int = 0 + self.hash:HashModel = None + self.__sftp:SFTPClient + stat:StatResult + + self.__client.set_missing_host_key_policy(AutoAddPolicy()) + self.__client.connect( + hostname = Utils.get_value(("host", "default_host", "default_sftp_host"), inputs, "localhost"), + port = Utils.get_value(("port", "default_port", "default_sftp_port"), inputs, 22), + username = Utils.get_value(("user", "default_user", "default_sftp_user"), inputs), + password = Utils.get_value(("password", "default_password", "default_sftp_password"), inputs) + ) + + self.__sftp = self.__client.open_sftp() + + try: + stat = self.__sftp.stat(self.path) + self.exists = True + self.is_file = is_file(stat.st_mode) + self.is_directory = is_directory(stat.st_mode) + self.size = stat.st_size + except Exception as exception: + pass + + callback and self.on_load_chunk.add(callback) + + self.cxcv.settings.get("auto_open", inputs, True) and self.open() + + def open(self:Self, new_path:Optional[str] = None) -> Self: + + if new_path or self.reusable: + if new_path: + self.path = new_path + if self.__file and not self.__closed: + self.__file.close() + self.__closed = False + elif self.__file is not None: + return self + + self.__file = self.__sftp.file(self.path, mode = self.mode) + + return self + + def close(self:Self) -> Self: + + if not self.__closed: + + self.__closed = True + + self.hash = None + self.is_directory = False + self.is_file = False + self.exists = False + self.size = 0 + self.__file and self.__file.close() + self.__sftp.close() + self.__client.close() + + return self + + def load(self:Self, all:bool = False) -> bytes: + + data:bytes = self.__file.read(self.size if all else self.chunk_size) + + if data: + self.on_load_chunk.execute(data) + elif self.__autoclose: + self.close() + + def save(self:Self, chunk:Optional[bytes|bool] = None) -> None: + if chunk: + self.__file.write(chunk) + elif self.__autoclose: + self.close() + + def list(self:Self) -> list[str]: + return self.__sftp.listdir(self.path) if self.is_directory else [] + + def remove(self:Self) -> None: + self.__sftp.remove(self.path) + + def reset(self:Self) -> Self: + + self.close() + self.open() + + return self \ No newline at end of file diff --git a/Python/Drivers/SQLiteDriver.py b/Python/Drivers/SQLiteDriver.py new file mode 100644 index 0000000..c632a83 --- /dev/null +++ b/Python/Drivers/SQLiteDriver.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any, Optional +from Interfaces.Application.CXCVInterface import CXCVInterface +from Models.ResultsModel import ResultsModel +from Abstracts.DatabasesAbstract import DatabasesAbstract +from Utils.Patterns import RE +from Utils.Utils import Utils +from sqlite3 import Connection, Cursor, connect as sqlite_connect +from re import Match as REMatch + +class SQLiteDriver(DatabasesAbstract): + + def __init__(self:Self, + cxcv:CXCVInterface, + key:str, + inputs:dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...] + ) -> None: + super().__init__(cxcv, key, inputs) + self.__connection:Connection|None = None + self.__path:str = Utils.get_value("path", inputs) + build_file:str|None = Utils.get_value("builder_file", inputs) + self.__in_use:bool = False + + self.connect() + + build_file is None or self.execute_file(build_file) + + def connect(self:Self) -> bool: + if self.__connection is None: + try: + + directory:str = self.cxcv.files.get_directory_path(self.__path) + + self.cxcv.files.prepare_path(directory) + self.__connection = sqlite_connect(self.__path, check_same_thread = False) + + except Exception as exception: + self.cxcv.exception(exception, "database_driver_connection_exception") + return False + return True + + def close(self:Self) -> bool: + try: + if self.__connection: + self.__connection.execute("PRAGMA wal_checkpoint(FULL);") + self.__connection.close() + self.__connection = None + except Exception as exception: + self.cxcv.exception(exception, "database_driver_close_exception") + return False + return True + + @staticmethod + def build_match(matches:REMatch, parameters:dict[str, Any|None]) -> str: + + value:Any|None = parameters.get(matches.group(1), matches.group(0)) + + return ( + "null" if value is None else + "'" + str(value).replace("'", "''") + "'" if isinstance(value, str) else + str(value)) + + @classmethod + def prepare(cls:type[Self], + sql:str, + parameters:Optional[dict[str, Any|None]] = None, + builder:Optional[dict[str, Any|None]] = None + ) -> list[str]: + + queries:list[str] = [""] + has_parameters:bool = parameters is not None + + if builder: + sql = Utils.string_variables(sql, builder) + + while len(sql): + + comment:str|None + fragment:str|None + string:str|None + separator:str|None + matches:REMatch = RE.SQL_LITE.search(sql) + + comment, fragment, string, separator = matches.groups() + + if comment is None: + if separator is None: + queries[-1] += fragment or string or "" + else: + queries[-1] = queries[-1].strip() + if len(queries[-1]): + queries[-1] += ";" + queries.append("") + + sql = sql[matches.end():] + + return [( + RE.STRING_VARIABLE.sub( + lambda matches:cls.build_match(matches, parameters), + subsql + ) if has_parameters else + subsql) for subsql in queries if len(subsql)] + + def execute(self:Self, + query:str, + parameters:Optional[dict[str, Any|None]] = None, + builder:Optional[dict[str, Any|None]] = None + ) -> ResultsModel: + + while self.__in_use and self.cxcv.working: + self.cxcv.wait(.01, .1) + self.__in_use = True + + cursor:Cursor = self.__connection.cursor() + results:ResultsModel = ResultsModel() + + try: + + for subquery in self.prepare(query, parameters, builder): + + order:str = subquery[:7].lower() + + try: + cursor.execute(subquery) + except Exception as exception: + print(subquery) + self.cxcv.exception(exception, "database_driver_execute_subquery_exception", { + "subquery" : subquery + }) + continue + + if order == "insert ": + self.__in_use = False + results.ids.append(self.get_last_id()) + elif order in ("select ", "with "): + results.set(cursor.fetchall(), Type=tuple(column[0] for column in cursor.description)) + + self.__connection.commit() + cursor.close() + + except Exception as exception: + self.cxcv.exception(exception, "database_driver_execute_exception", { + "query" : query, + "parameters" : parameters + }) + + self.__in_use = False + + return results + + def execute_file(self:Self, + path:str, + parameters:Optional[dict[str, Any|None]] = None, + builder:Optional[dict[str, Any|None]] = None + ) -> ResultsModel: + return self.execute(self.cxcv.files.load(path, "r"), parameters, builder) + + def get(self:Self, + query:str, + parameters:Optional[dict[str, Any|None]] = None, + builder:Optional[dict[str, Any|None]] = None + ) -> Any|None: + return self.execute(query, parameters, builder).get(0) + + def get_id(self:Self, + query:str, + parameters:Optional[dict[str, Any|None]] = None, + builder:Optional[dict[str, Any|None]] = None + ) -> int|None: + + value:Any|None = self.get(query, parameters, builder) + + return int(value) if value is not None else None + + def get_last_id(self:Self) -> int|None: + return self.get_id("select last_insert_rowid();") \ No newline at end of file diff --git a/Python/Interfaces/Abstracts/ControllerAbstractInterface.py b/Python/Interfaces/Abstracts/ControllerAbstractInterface.py new file mode 100644 index 0000000..1067b11 --- /dev/null +++ b/Python/Interfaces/Abstracts/ControllerAbstractInterface.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Optional, Any +from abc import ABC + +class ControllerAbstractInterface(ABC): + + def __init__(self:Self, + cxcv:Any, + key:str, + inputs:Optional[dict[str, Any|None]|tuple[Any|None, ...]|list[Any|None]] = None + ) -> None: + self.key:str = None + self.via:str = None \ No newline at end of file diff --git a/Python/Interfaces/Abstracts/DatabasesAbstractInterface.py b/Python/Interfaces/Abstracts/DatabasesAbstractInterface.py new file mode 100644 index 0000000..38a4e60 --- /dev/null +++ b/Python/Interfaces/Abstracts/DatabasesAbstractInterface.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from Models.ResultsModel import ResultsModel +from typing import Optional, Self, Any +from abc import ABC, abstractmethod + +class DatabasesAbstractInterface(ABC): + + def __init__(self:Self, + cxcv:Any, + key:str, + inputs:dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...] + ) -> None: + self.key:str = key + self.type:str = "UNKNOWN" + + @abstractmethod + def connect(self:Self) -> bool:pass + + @abstractmethod + def close(self:Self) -> bool:pass + + @abstractmethod + def execute(self:Self, + sql:str, + parameters:Optional[dict[str, Any|None]] = None + ) -> ResultsModel:pass + + @abstractmethod + def execute_file(self:Self, + path:str, + parameters:Optional[dict[str, Any|None]] = None + ) -> ResultsModel:pass + + @abstractmethod + def get_id(self:Self, query:str, parameters:Optional[dict[str, Any|None]] = None) -> int|None:pass + + @abstractmethod + def get_last_id(self:Self) -> int|None:pass \ No newline at end of file diff --git a/Python/Interfaces/Abstracts/ProcedureAbstractInterface.py b/Python/Interfaces/Abstracts/ProcedureAbstractInterface.py new file mode 100644 index 0000000..ba01968 --- /dev/null +++ b/Python/Interfaces/Abstracts/ProcedureAbstractInterface.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Optional, Any +from abc import ABC + +class ProcedureAbstractInterface(ABC): + + def __init__(self:Self, + cxcv:Any, + key:str, + via:str, + inputs:Optional[dict[str, Any|None]|tuple[Any|None, ...]|list[Any|None]] = None + ) -> None: + self.key:str = key + self.via:str = via \ No newline at end of file diff --git a/Python/Interfaces/Application/CXCVInterface.py b/Python/Interfaces/Application/CXCVInterface.py new file mode 100644 index 0000000..5e03a02 --- /dev/null +++ b/Python/Interfaces/Application/CXCVInterface.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any, Optional +from abc import ABC, abstractmethod +from Interfaces.Drivers.FilesDriverInterface import FilesDriverInterface +from Interfaces.Managers.SettingsManagerInterface import SettingsManagerInterface +from Interfaces.Managers.I18NManagerInterface import I18NManagerInterface +from Interfaces.Managers.TerminalManagerInterface import TerminalManagerInterface +from Interfaces.Managers.ModelsManagerInterface import ModelsManagerInterface +from Interfaces.Managers.DatabasesManagerInterface import DatabasesManagerInterface +from Interfaces.Managers.ControllersManagerInterface import ControllersManagerInterface +from Interfaces.Managers.ProceduresManagerInterface import ProceduresManagerInterface + +class CXCVInterface(ABC): + + def __init__(self:Self, + inputs:Optional[dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...]] = None + ) -> None: + self.working:bool = None + self.files:FilesDriverInterface = None + self.settings:SettingsManagerInterface = None + self.i18n:I18NManagerInterface = None + self.terminal:TerminalManagerInterface = None + self.models:ModelsManagerInterface = None + self.databases:DatabasesManagerInterface = None + self.controllers:ControllersManagerInterface = None + self.procedures:ProceduresManagerInterface = None + + @abstractmethod + def close(self:Self) -> None:pass + + @abstractmethod + def print(self:Self, + _type:str, + message:str|list[str]|tuple[str, ...], + inputs:Optional[dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...]] = None, + i:int = 0 + ) -> None:pass + + @abstractmethod + def exception(self:Self, + exception:Exception, + message:str|list[str]|tuple[str, ...], + inputs:Optional[dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...]] = None, + i:int = 0 + ) -> None:pass \ No newline at end of file diff --git a/Python/Interfaces/Drivers/FilesDriverInterface.py b/Python/Interfaces/Drivers/FilesDriverInterface.py new file mode 100644 index 0000000..def4ac1 --- /dev/null +++ b/Python/Interfaces/Drivers/FilesDriverInterface.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any, Callable +from abc import ABC, abstractmethod + +class FilesDriverInterface(ABC): + + @abstractmethod + def fix_path(self:Self, path:str) -> str:pass + + @abstractmethod + def exists(self:Self, path:str, analyze_roots:bool = True) -> bool:pass + + @abstractmethod + def is_file(self:Self, path:str, analyze_roots:bool = True) -> bool:pass + + @abstractmethod + def is_directory(self:Self, path:str, analyze_roots:bool = True) -> bool:pass + + @abstractmethod + def get_absolute_path(self:Self, path:str) -> str|None:pass + + @abstractmethod + def load(self:Self, path:str, mode:str = "r") -> bool:pass + + @abstractmethod + def load_by_chunks(self:Self, + path:str, + callback:Callable[[bytes], None], + chunk_size:int = 4096 + ) -> None:pass + + @abstractmethod + def save(self:Self, path:str, data:str|bytes, mode:str = "w") -> bool:pass + + @abstractmethod + def load_json(self:Self, + inputs:Any|None, + only_dictionaries:bool = True + ) -> tuple[dict[str, Any|None]|tuple[Any|None, ...]|list[Any|None], ...]:pass + + @abstractmethod + def get_directory_path(self:Self, path:str) -> str:pass + + @abstractmethod + def prepare_path(self:Self, path:str) -> bool:pass \ No newline at end of file diff --git a/Python/Interfaces/FilesInterface.py b/Python/Interfaces/FilesInterface.py new file mode 100644 index 0000000..7327d24 --- /dev/null +++ b/Python/Interfaces/FilesInterface.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Any, Optional, Self +from abc import ABC, abstractmethod +from Models.HashModel import HashModel + +class FilesInterface(ABC): + + def __init__(self:Self, + cxcv:Any, + path:str, + inputs:Optional[dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...]] = None + ) -> None: + self.is_file:bool = None + self.is_directory:bool = None + self.exists:bool = None + self.size:int = None + self.path:str = None + self.chunk_size:int = None + self.reusable:bool = None + self.hash:HashModel = None + + @abstractmethod + def open(self:Self, new_path:Optional[str] = None) -> Self:pass + + @abstractmethod + def close(self:Self) -> Self:pass + + @abstractmethod + def load(self:Self, all:bool = False) -> bytes:pass + + @abstractmethod + def save(self:Self, chunk:Optional[bytes|bool] = None) -> None:pass + + @abstractmethod + def list(self:Self) -> list[str]:pass + + @abstractmethod + def remove(self:Self) -> None:pass + + @abstractmethod + def reset(self:Self) -> Self:pass \ No newline at end of file diff --git a/Python/Interfaces/Managers/ControllersManagerInterface.py b/Python/Interfaces/Managers/ControllersManagerInterface.py new file mode 100644 index 0000000..ba6a9cf --- /dev/null +++ b/Python/Interfaces/Managers/ControllersManagerInterface.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any, TypeVar +from abc import ABC, abstractmethod +from Interfaces.Abstracts.ControllerAbstractInterface import ControllerAbstractInterface + +T = TypeVar("T") + +class ControllersManagerInterface(ABC): + + @abstractmethod + def get(self:Self, Type:type[T], scrapper:str|T) -> T|None:pass + + @abstractmethod + def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None:pass \ No newline at end of file diff --git a/Python/Interfaces/Managers/DatabasesManagerInterface.py b/Python/Interfaces/Managers/DatabasesManagerInterface.py new file mode 100644 index 0000000..720a84f --- /dev/null +++ b/Python/Interfaces/Managers/DatabasesManagerInterface.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any +from abc import ABC, abstractmethod +from Interfaces.Abstracts.DatabasesAbstractInterface import DatabasesAbstractInterface + +class DatabasesManagerInterface(ABC): + + @abstractmethod + def get(self:Self, database:str|DatabasesAbstractInterface) -> DatabasesAbstractInterface|None:pass + + @abstractmethod + def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None:pass \ No newline at end of file diff --git a/Python/Interfaces/Managers/I18NManagerInterface.py b/Python/Interfaces/Managers/I18NManagerInterface.py new file mode 100644 index 0000000..78c5627 --- /dev/null +++ b/Python/Interfaces/Managers/I18NManagerInterface.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Any, Optional, Self +from abc import ABC, abstractmethod + +class I18NManagerInterface(ABC): + + @abstractmethod + def get(self:Self, + texts:str|list[str]|tuple[str, ...], + inputs:Optional[dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...]] = None, + default:str = "" + ) -> str:pass + + @abstractmethod + def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None:pass \ No newline at end of file diff --git a/Python/Interfaces/Managers/ModelsManagerInterface.py b/Python/Interfaces/Managers/ModelsManagerInterface.py new file mode 100644 index 0000000..cd65c0d --- /dev/null +++ b/Python/Interfaces/Managers/ModelsManagerInterface.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any, TypeVar +from abc import ABC, abstractmethod + +T = TypeVar('T') + +class ModelsManagerInterface(ABC): + + @abstractmethod + def get(self:Self, Type:type[T], key:str) -> type[T]|None:pass + + @abstractmethod + def add(self:Self, + Type:type[T], + inputs:dict[str, Any]|list[Any|None]|tuple[Any|None, ...]|str, + overwrite:bool = False + ) -> None:pass \ No newline at end of file diff --git a/Python/Interfaces/Managers/ProceduresManagerInterface.py b/Python/Interfaces/Managers/ProceduresManagerInterface.py new file mode 100644 index 0000000..3d6e5b5 --- /dev/null +++ b/Python/Interfaces/Managers/ProceduresManagerInterface.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any, TypeVar +from abc import ABC, abstractmethod + +T = TypeVar("T") + +class ProceduresManagerInterface(ABC): + + @abstractmethod + def get(self:Self, Type:type[T], procedure:tuple[str, str]|T) -> T|None:pass + + @abstractmethod + def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None:pass \ No newline at end of file diff --git a/Python/Interfaces/Managers/SettingsManagerInterface.py b/Python/Interfaces/Managers/SettingsManagerInterface.py new file mode 100644 index 0000000..03b4c1b --- /dev/null +++ b/Python/Interfaces/Managers/SettingsManagerInterface.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Any, Optional, Self +from abc import ABC, abstractmethod + +class SettingsManagerInterface(ABC): + + @abstractmethod + def get(self:Self, + keys:str|list[str]|tuple[str, ...], + inputs:Optional[dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...]] = None, + default:Any|None = None + ) -> Any|None:pass + + @abstractmethod + def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None:pass + + @abstractmethod + def add_secrets(self:Self, inputs:Any|None, overwrite:bool = False) -> None:pass \ No newline at end of file diff --git a/Python/Interfaces/Managers/TerminalManagerInterface.py b/Python/Interfaces/Managers/TerminalManagerInterface.py new file mode 100644 index 0000000..6892d4d --- /dev/null +++ b/Python/Interfaces/Managers/TerminalManagerInterface.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any +from abc import ABC, abstractmethod + +class TerminalManagerInterface(ABC): + + @abstractmethod + def get_i_command(self, names:str|list[str]|tuple[str, ...]) -> int|None:pass + + @abstractmethod + def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None:pass + + + def close(self:Self, parameters:dict[str, Any|None], *arguments:list[Any|None]) -> None:pass \ No newline at end of file diff --git a/Python/Interfaces/Procedures/LogsProcedureInterface.py b/Python/Interfaces/Procedures/LogsProcedureInterface.py new file mode 100644 index 0000000..c2ba651 --- /dev/null +++ b/Python/Interfaces/Procedures/LogsProcedureInterface.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Optional, Self, Any +from abc import ABC, abstractmethod + +class LogsProcedureInterface(ABC): + + @abstractmethod + def set_log(self:Self, + application:str, + file:str, + method:str, + line:int, + message:str, + error:int = 0, + parameters:dict[str, Any|None] = {} + ) -> None:pass + + @abstractmethod + def set_exception(self:Self, + application:str, + file:str, + method:str, + line:int, + message:str, + exception:str, + trace:str, + parameters:dict[str, Any|None] = {}, + status:Optional[str] = None + ) -> None:pass \ No newline at end of file diff --git a/Python/Managers/ControllersManager.py b/Python/Managers/ControllersManager.py new file mode 100644 index 0000000..475728b --- /dev/null +++ b/Python/Managers/ControllersManager.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any, TypeVar +from Interfaces.Application.CXCVInterface import CXCVInterface +from Abstracts.ControllerAbstract import ControllerAbstract +from Utils.Utils import Utils + +T = TypeVar("T") + +class ControllersManager: + + def __init__(self:Self, cxcv:CXCVInterface) -> None: + + key:str + + self.cxcv:CXCVInterface = cxcv + self.__controllers:dict[str, ControllerAbstract] = {} + + for key in ("default_controllers_models", "controllers_models"): + self.cxcv.models.add(ControllerAbstract, self.cxcv.settings.get(key), True) + + for key in ( + "default_controllers_files", "default_controllers", + "controllers_files", "controllers" + ): + self.add(self.cxcv.settings.get(key), True) + + def get(self:Self, Type:type[T], controller:str|T) -> T|None: + return ( + controller if isinstance(controller, Type) else + self.__controllers[controller] if ( + controller in self.__controllers and + isinstance(self.__controllers[controller], Type) + ) else + None) + + def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None: + + subinputs:dict[str, ControllerAbstract] + + for subinputs in self.cxcv.files.load_json(inputs): + + key:str + subinputs:dict[str, Any|None] + + for key, subinputs in subinputs.items(): + if overwrite or key not in self.__controllers: + + ControllerModel:type[ControllerAbstract]|None = self.cxcv.models.get( + ControllerAbstract, + Utils.get_value("model", subinputs, key) + ) + + if ControllerModel is not None: + self.__controllers[key] = ControllerModel(self.cxcv, key, subinputs) \ No newline at end of file diff --git a/Python/Managers/DatabasesManager.py b/Python/Managers/DatabasesManager.py new file mode 100644 index 0000000..a5772c6 --- /dev/null +++ b/Python/Managers/DatabasesManager.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any +from Interfaces.Application.CXCVInterface import CXCVInterface +from Abstracts.DatabasesAbstract import DatabasesAbstract +from Utils.Utils import Utils + +class DatabasesManager: + + def __init__(self:Self, cxcv:CXCVInterface) -> None: + + key:str + + self.cxcv:CXCVInterface = cxcv + self.__databases:dict[str, DatabasesAbstract] = {} + + for key in ("default_databases_models", "databases_models"): + self.cxcv.models.add(DatabasesAbstract, self.cxcv.settings.get(key), True) + + for key in ( + "default_databases_files", "default_databases", + "databases_files", "databases" + ): + self.add(self.cxcv.settings.get(key), True) + + def get(self:Self, database:str|DatabasesAbstract) -> DatabasesAbstract|None: + return ( + database if isinstance(database, DatabasesAbstract) else + self.__databases[database] if database in self.__databases else + None) + + def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None: + + subinputs:dict[str, DatabasesAbstract] + + for subinputs in self.cxcv.files.load_json(inputs): + + key:str + subinputs:dict[str, Any|None] + + for key, subinputs in subinputs.items(): + if overwrite or key not in self.__databases: + + DatabaseModel:type[DatabasesAbstract]|None = self.cxcv.models.get( + DatabasesAbstract, + Utils.get_value("model", subinputs, key) + ) + + if DatabaseModel is not None: + self.__databases[key] = DatabaseModel(self.cxcv, key, subinputs) \ No newline at end of file diff --git a/Python/Managers/EventsManager.py b/Python/Managers/EventsManager.py new file mode 100644 index 0000000..f86b892 --- /dev/null +++ b/Python/Managers/EventsManager.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Callable, Any + +class EventsManager: + + def __init__(self:Self) -> None: + self.__events:list[Callable[[Any|None], Any|None]] = [] + + def execute(self:Self, *arguments:tuple[Any|None, ...]) -> tuple[Any|None]: + + results:tuple[Any|None] = () + + for event in self.__events: + if event is not None: + results += (event(*arguments),) + + return results + + def add(self:Self, + event:Callable[[Any|None], Any|None] + ) -> int|None: + + i:int|None = None + + if callable(event): + + l:int = len(self.__events) + + i = 0 + while i < l and self.__events[i] is not None: + i += 1 + + if i < l: + self.__events[i] = event + else: + self.__events.append(event) + + return i + + def remove(self:Self, i:int) -> bool: + if 0 <= i < len(self.__events) and self.__events[i] is not None: + self.__events[i] = None + return True + return False \ No newline at end of file diff --git a/Python/Managers/I18NManager.py b/Python/Managers/I18NManager.py new file mode 100644 index 0000000..9f98fa6 --- /dev/null +++ b/Python/Managers/I18NManager.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Any, Optional, Self +from Interfaces.Application.CXCVInterface import CXCVInterface +from Utils.Utils import Utils + +class I18NManager: + + def __init__(self:Self, cxcv:CXCVInterface) -> None: + + key:str + + self.cxcv:CXCVInterface = cxcv + self.__sentences:dict[str, dict[str, str|list[str]|tuple[str, ...]]] = {} + self.__language:str = "english" + + for key in ("default_i18n_files", "default_i18n", "i18n_files", "i18n"): + self.add(self.cxcv.settings.get(key), True) + + def __get(self:Self, + texts:str|list[str]|tuple[str, ...], + inputs:Optional[dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...]] = None, + default:str = "" + ) -> Any|None: + + keys:list[str] = Utils.get_keys(texts) + + if len(keys): + + language:str + done:tuple[str, ...] = () + + for language in (self.__language, *tuple(self.__sentences.keys())): + if language in done: + continue + done += (language,) + + if language in self.__sentences: + for key in keys: + if key in self.__sentences[language]: + return self.__sentences[language][key] + + if len(texts := Utils.get_strings(texts)): + return texts[0] + return default + + def get(self:Self, + texts:str|list[str]|tuple[str, ...], + inputs:Optional[dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...]] = None, + default:str = "" + ) -> str: + + text:str|list[str]|tuple[str, ...] = self.__get(texts, inputs, default) + + return Utils.string_variables(( + text if isinstance(text, str) else + " ".join(text) if isinstance(text, (list, tuple)) else + str(text)), inputs) + + def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None: + + dictionary:dict[str, Any|None] + + for dictionary in self.cxcv.files.load_json(inputs): + + language:str + sentences:Any|None + + for language, sentences in dictionary.items(): + if not isinstance(sentences, dict): + continue + + if language not in self.__sentences: + self.__sentences[language] = {} + + key:str + value:Any|None + + for key, value in sentences.items(): + if overwrite or key not in self.__sentences[language]: + self.__sentences[language][key] = value \ No newline at end of file diff --git a/Python/Managers/ModelsManager.py b/Python/Managers/ModelsManager.py new file mode 100644 index 0000000..53177cb --- /dev/null +++ b/Python/Managers/ModelsManager.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any, TypeVar +from inspect import isclass +from Interfaces.Application.CXCVInterface import CXCVInterface + +T = TypeVar('T') + +class ModelsManager: + + def __init__(self:Self, cxcv:CXCVInterface) -> None: + self.cxcv:CXCVInterface = cxcv + self.__models:dict[str, Any] = {} + + def get(self:Self, Type:type[T], model:str|type[T]) -> type[T]|None: + return ( + model if isclass(model) and issubclass(model, Type) else + self.__models[model] if ( + model in self.__models and + issubclass(self.__models[model], Type) + ) else + None) + + def add(self:Self, + Type:type[T], + inputs:dict[str, Any]|list[Any|None]|tuple[Any|None, ...]|str, + overwrite:bool = False + ) -> None: + + subinputs:dict[str, Any] + + for subinputs in self.cxcv.files.load_json(inputs): + + key:str + Model:type[T] + + for key, Model in subinputs.items(): + if issubclass(Model, Type) and ( + overwrite or key not in self.__models + ): + self.__models[key] = Model \ No newline at end of file diff --git a/Python/Managers/ProceduresManager.py b/Python/Managers/ProceduresManager.py new file mode 100644 index 0000000..bac4d73 --- /dev/null +++ b/Python/Managers/ProceduresManager.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any, TypeVar +from Interfaces.Application.CXCVInterface import CXCVInterface +from Abstracts.ProcedureAbstract import ProcedureAbstract +from Utils.Utils import Utils + +T = TypeVar("T") + +class ProceduresManager: + + def __init__(self:Self, cxcv:CXCVInterface) -> None: + + key:str + self.cxcv:CXCVInterface = cxcv + self.__procedures:dict[str, dict[str, ProcedureAbstract]] = {} + + for key in ("default_procedures_models", "procedures_models"): + self.cxcv.models.add(ProcedureAbstract, self.cxcv.settings.get(key), True) + + for key in ( + "default_procedures_files", "default_procedures", + "procedures_files", "procedures" + ): + self.add(self.cxcv.settings.get(key), True) + + def get(self:Self, Type:type[T], procedure:tuple[str, str]|T) -> T|None: + if isinstance(procedure, tuple): + + key:str + via:str + + key, via = procedure + + return self.__procedures[key][via] if ( + key in self.__procedures and + via in self.__procedures[key] and + isinstance(self.__procedures[key][via], Type) + ) else None + return procedure if isinstance(procedure, T) else None + + def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None: + + subinputs:dict[str, dict[str, ProcedureAbstract]] + + for subinputs in self.cxcv.files.load_json(inputs): + + via:str + vias:dict[str, ProcedureAbstract|str] + + for key, vias in subinputs.items(): + + key:str + subinputs:dict[str, Any|None] + + if key not in self.__procedures: + self.__procedures[key] = {} + + for via, subinputs in vias.items(): + if overwrite or via not in self.__procedures[key]: + + ProcedureModel:type[ProcedureAbstract] = self.cxcv.models.get( + ProcedureAbstract, + Utils.get_value("model", subinputs, key) + ) + + if ProcedureModel is not None: + self.__procedures[key][via] = ProcedureModel(self.cxcv, key, via, subinputs) \ No newline at end of file diff --git a/Python/Managers/SettingsManager.py b/Python/Managers/SettingsManager.py new file mode 100644 index 0000000..42e35b2 --- /dev/null +++ b/Python/Managers/SettingsManager.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Any, Optional, Self +from Interfaces.Application.CXCVInterface import CXCVInterface +from Utils.Utils import Utils + +class SettingsManager: + + __DEFAULT_SETTINGS:dict[str, Any|None] = { + "sql_builder_file" : "/SQLite/CXCV.lite.sql", + "database_file_path" : "/Data/CXCV.sqlite.db", + "default_settings_files" : ["/JSON/CXCV.settings.json"], + "default_secrets_files" : ["/JSON/CXCV.secrets.json"] + } + + def __init__(self:Self, + cxcv:CXCVInterface, + inputs:Optional[dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...]] = None + ) -> None: + + key:str + + self.cxcv:CXCVInterface = cxcv + self.__inputs:dict[str, Any|None] = Utils.get_dictionary(inputs) + self.__settings:dict[str, Any|None] = {} + self.__secrets:dict[str, Any|None] = {} + + for key in ("default_settings_files", "default_settings", "settings_files", "settings"): + self.add(self.get(key), True) + for key in ("default_secrets_files", "default_secrets", "secrets_files", "secrets"): + self.add_secrets(self.get(key), True) + + def get(self:Self, + keys:str|list[str]|tuple[str, ...], + inputs:Optional[dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...]] = None, + default:Any|None = None + ) -> Any|None: + return Utils.get_value(keys, ( + inputs, self.__inputs, self.__secrets, self.__settings, self.__DEFAULT_SETTINGS + ), default) + + def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None: + + dictionary:dict[str, Any|None] + + for dictionary in self.cxcv.files.load_json(inputs): + + key:str + value:Any|None + + for key, value in dictionary.items(): + if overwrite or key not in self.__settings: + self.__settings[key] = value + + def add_secrets(self:Self, inputs:Any|None, overwrite:bool = False) -> None: + + dictionary:dict[str, Any|None] + + for dictionary in self.cxcv.files.load_json(inputs): + + key:str + value:Any|None + + for key, value in dictionary.items(): + if overwrite or key not in self.__secrets: + self.__secrets[key] = value \ No newline at end of file diff --git a/Python/Managers/TerminalManager.py b/Python/Managers/TerminalManager.py new file mode 100644 index 0000000..eb6581c --- /dev/null +++ b/Python/Managers/TerminalManager.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any +from threading import Thread +from re import Match as REMatch +from Interfaces.Application.CXCVInterface import CXCVInterface +from Models.CommandsModel import CommandsModel +from Utils.Patterns import RE +from Utils.Utils import Utils + +class TerminalManager: + + def __init__(self:Self, cxcv:CXCVInterface) -> None: + + key:str + + self.cxcv:CXCVInterface = cxcv + self.__started:bool = False + self.__commands:tuple[CommandsModel, ...] = () + self.__thread:Thread = Thread(target=self.__execute) + + def start(self:Self) -> None: + if self.__started: + return + self.__started = True + + for key in ("default_commands_files", "default_commands", "commands_files", "commands"): + self.add(self.cxcv.settings.get(key), True) + + self.__thread.start() + + def __execute(self:Self) -> None: + while self.cxcv.working: + + order:str = input().strip() + + if not order: + continue + + matches:REMatch = RE.COMMAND.match(order) + + if not matches: + continue + + name:str = matches.group(1) + arguments:list[Any|None] = [] + parameters:dict[str, Any|None] = {} + arguments_block:str|None = (matches.group(2) or "").strip() + done:bool = False + + if arguments_block: + + block:tuple[str|None, ...] + + for block in RE.get_all(RE.COMMAND_ARGUMENT, arguments_block): + + key:str = Utils.get_from(block, (1, 5, 6, 7)) + value:str|None = Utils.get_from(block, (2, 3, 4)) + + if value is not None: + parameters[key] = value.strip() + else: + arguments += [key,] + + for command in self.__commands: + if (done := command.execute(name, parameters, *arguments)): + break + + done or print(f"Command '{name}' not found.") + + def get_i_command(self, names:str|list[str]|tuple[str, ...]) -> int|None: + for i, command in enumerate(self.__commands): + if command.check(names): + return i + return None + + def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None: + if isinstance(inputs, CommandsModel): + + i:int|None = self.get_i_command(inputs) + + if i is not None: + overwrite and self.__commands[i].update(inputs) + else: + self.__commands += (inputs,) + + elif isinstance(inputs, dict): + if "names" in inputs and isinstance(inputs["names"], (str, list, tuple)): + + names:str|list[str]|tuple[str, ...] = inputs["names"] + action:callable[[dict[str, Any|None], list[Any|None]], None]|None = inputs["action"] if "action" in inputs else None + i:int|None = self.get_i_command(names) + + if i is not None: + overwrite and self.__commands[i].update(names, action) + else: + self.__commands += (CommandsModel(self.cxcv, names, action),) + + else: + for command in self.cxcv.files.load_json(inputs, True): + if isinstance(command, (CommandsModel, dict)): + self.add(command, overwrite) + + def close(self:Self, parameters:dict[str, Any|None], *arguments:list[Any|None]) -> None: + self.cxcv.close() \ No newline at end of file diff --git a/Python/Models/CommandsModel.py b/Python/Models/CommandsModel.py new file mode 100644 index 0000000..55b2049 --- /dev/null +++ b/Python/Models/CommandsModel.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Callable, Any, Optional +from Interfaces.Application.CXCVInterface import CXCVInterface +from Utils.Utils import Utils + +class CommandsModel: + + def __set_action(self:Self, + action:Callable[[dict[str, Any|None], list[Any|None]], None]|str|None + ) -> None: + if callable(action): + self.__action = action + elif isinstance(action, str): + + parts:list[str] = action.split(".") + item:Any|None = ( + self.cxcv if parts[0] == "cxcv" else + None) + + if item: + for part in parts[1:]: + if hasattr(item, part): + item = getattr(item, part) + else: + item = None + break + else: + item = eval(action) + + if callable(item): + self.__action = item + + def __init__(self:Self, + cxcv:CXCVInterface, + names:str|list[str]|tuple[str, ...], + action:Callable[[dict[str, Any|None], list[Any|None]], None]|str|None = None + ) -> None: + self.cxcv:CXCVInterface = cxcv + self.__names:list[str] = Utils.get_keys(names) + self.__action:Callable[[dict[str, Any|None], list[Any|None]], None]|None = None + + self.__set_action(action) + + def get_names(self:Self) -> list[str]: + return [*self.__names] + + def get_action(self:Self) -> Callable[[dict[str, Any|None], list[Any|None]], None]|None: + return self.__action + + def check(self:Self, names:str|list[str]|tuple[str, ...]) -> bool: + return Utils.get_keys(names)[0] == self.__names[0] + + def update(self:Self, + names:str|list[str]|tuple[str, ...]|Self|None = None, + action:Optional[Callable[[dict[str, Any|None], list[Any|None]], None]] = None + ) -> bool: + + if isinstance(names, CommandsModel): + action = names.get_action() + names = names.get_names() + + names = Utils.get_keys(names) + + if names[0] == self.__names[0]: + for name in names[1:]: + if name not in self.__names: + self.__names += [name,] + if action is not None and action != self.__action: + self.__set_action(action) + return True + return False + + def execute(self:Self, + name:str, + parameters:dict[str, Any|None], + *arguments:list[Any|None] + ) -> bool: + if name in self.__names and self.__action is not None: + self.__action(parameters, *arguments) + return True + return False \ No newline at end of file diff --git a/Python/Models/FileModel.py b/Python/Models/FileModel.py new file mode 100644 index 0000000..06b2f64 --- /dev/null +++ b/Python/Models/FileModel.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Any, Self +from Interfaces.Application.CXCVInterface import CXCVInterface +from Models.HashModel import HashModel +from Drivers.LocalFileDriver import LocalFileDriver +from Drivers.SFTPFileDriver import SFTPFileDriver +from Interfaces.FilesInterface import FilesInterface +from Utils.Utils import Utils +from Utils.Patterns import RE + +class FileModel: + + def __init__(self:Self, cxcv:CXCVInterface, path:str, chunk_size:int = 4096) -> None: + self.cxcv:CXCVInterface = cxcv + self.path:str = path + self.size:int = 0 + self.hash:HashModel = HashModel(self.cxcv, chunk_size) + + def update(self:Self, chunk:bytes) -> None: + self.size += len(chunk) + self.hash.update(chunk) + + def check_integrity(self:Self, other:type[Self]) -> bool: + + other:FileModel = other + + return ( + self.size == other.size and + self.hash.md5 == other.hash.md5 and + self.hash.sha1 == other.hash.sha1 and + self.hash.sha256 == other.hash.sha256 and + self.hash.blake2b == other.hash.blake2b and + self.hash.crc32 == other.hash.crc32 + ) + + @staticmethod + def get_inputs(*inputs:list[str|dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...]]) -> dict[str, Any|None]: + return Utils.get_dictionary([( + {"path" : subinputs} if isinstance(subinputs, str) else + subinputs) for subinputs in inputs]) + + @staticmethod + def get_driver( + cxcv:CXCVInterface, + inputs:str|dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...] + ) -> FilesInterface: + + if isinstance(inputs, str): + + user:str|None = None + password:str|None = None + host:str|None = None + port:int|None = None + path:str = "" + + user, password, host, port, path = RE.FILE_PATH.match(inputs).groups() + + inputs = { + "user": user, + "password": password, + "host": host, + "port": int(port) if port else None, + "path": path + } + + return ( + SFTPFileDriver if "host" in inputs else + LocalFileDriver)(cxcv, Utils.get_value("path", inputs, "."), inputs) \ No newline at end of file diff --git a/Python/Models/HashModel.py b/Python/Models/HashModel.py new file mode 100644 index 0000000..2b04eda --- /dev/null +++ b/Python/Models/HashModel.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from hashlib import md5, sha1, sha256, blake2b +from zlib import crc32 +from typing import Self, Protocol +from Interfaces.Application.CXCVInterface import CXCVInterface + +class HashAlgorithm(Protocol): + def update(self:Self, data:bytes) -> None: ... + def hexdigest(self:Self) -> str: ... + +class CRC32HashAlgorithm(HashAlgorithm): + + def __init__(self:Self) -> None: + self.__value:int = 0 + + def update(self:Self, data:bytes) -> None: + self.__value = crc32(data, self.__value) + + def hexdigest(self:Self) -> str: + return f"{self.__value & 0xFFFFFFFF:08x}" + +class HashModel(): + + def __init__(self:Self, cxcv:CXCVInterface, data:bytes = b"", chunk_size:int = 4096) -> None: + + self.cxcv:CXCVInterface = cxcv + self.__chunk_size:int = chunk_size + self.md5:str = "" + self.sha1:str = "" + self.sha256:str = "" + self.blake2b:str = "" + self.crc32:str = "" + self.__hashes:list[HashAlgorithm] = ["md5", "sha1", "sha256", "blake2b", "crc32"] + self.__algorithms:list[HashAlgorithm] = [ + method() for method in (md5, sha1, sha256, blake2b, CRC32HashAlgorithm) + ] + + len(data) and self.update(data) + + def __update_chunk(self:Self, chunk:bytes) -> None: + for hash_algorithm in self.__algorithms: + hash_algorithm.update(chunk) + + def update(self:Self, data:bytes|str) -> None: + + done:bool = False + + if isinstance(data, bytes): + + while data: + + chunk:bytes = data[:self.__chunk_size] + + data = data[self.__chunk_size:] + self.__update_chunk(chunk) + + done = True + + elif isinstance(data, str): + self.cxcv.files.load_by_chunks(data, self.__update_chunk, self.__chunk_size) + done = True + + if done: + + name:str + + for name in self.__hashes: + setattr(self, name, getattr(self.__algorithms[self.__hashes.index(name)], "hexdigest")()) \ No newline at end of file diff --git a/Python/Models/ResultsModel.py b/Python/Models/ResultsModel.py new file mode 100644 index 0000000..4b38e43 --- /dev/null +++ b/Python/Models/ResultsModel.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Optional, Self, Any +from Abstracts.TableAbstract import TableAbstract + +class TableModel(TableAbstract): + + def __init__(self:Self, + data:tuple[tuple[Any|None, ...], ...], + map:Optional[tuple[str, ...]] = None + ) -> None: + self.data:tuple[tuple[Any|None, ...], ...] = data + self.map:tuple[str, ...]|None = map + + def get(self:Self) -> tuple[tuple[str]|None, tuple[tuple[Any|None, ...], ...]]: + return self.map, self.data + + def get_tuples(self:Self) -> tuple[tuple[Any|None, ...], ...]: + return self.data + + def get_dictionaries(self:Self) -> list[dict[str, Any|None]]: + + result:list[dict[str, Any|None]] = [] + + if len(self.data): + + names:tuple[str, ...] = self.map or tuple() + l:int = len(self.data[0]) + data:tuple[Any|None, ...] + + while len(names) < l: + names += (f"column_{len(names)}",) + + for data in self.data: + + i:int + key:str + row:dict[str, Any|None] = {} + + for i, key in enumerate(names): + row[key] = data[i] if i < len(data) else None + + result.append(row) + + return result + + def get(self:Self, column:int|str, tuple:int = 0) -> Any|None: + + if isinstance(column, str): + + if self.map is not None and column in self.map: + column = self.map.index(column) + else: + return None + + if 0 <= tuple < len(self.data) and 0 <= column < len(self.data[tuple]): + return self.data[tuple][column] + + return None + +class ResultsModel: + + def __init__(self:Self) -> None: + self.tables:list[TableModel] = [] + self.variables:dict[str, Any|None] = {} + self.ids:list[int] = [] + + def set(self:Self, + data:tuple[tuple[Any|None, ...], ...], + variables:dict[str, Any|None]|None = None, + Type:Optional[type[TableAbstract]|tuple[str, ...]] = None + ) -> None: + self.tables.append( + TableModel(data, Type) if Type is None or isinstance(Type, tuple) else + Type(data)) + if variables is not None: + self.variables.update(variables) + + def get(self:Self, column:int|str, tuple:int = 0, table:int = 0) -> Any|None: + return ( + self.tables[table].get(column, tuple) if 0 <= table < len(self.tables) else + None) \ No newline at end of file diff --git a/Python/Procedures/SQLite/LogsSQLiteProcedure.py b/Python/Procedures/SQLite/LogsSQLiteProcedure.py new file mode 100644 index 0000000..f776052 --- /dev/null +++ b/Python/Procedures/SQLite/LogsSQLiteProcedure.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Optional, Self, Any +from hashlib import sha256 +from Interfaces.Application.CXCVInterface import CXCVInterface +from Abstracts.ProcedureAbstract import ProcedureAbstract +from Interfaces.Procedures.LogsProcedureInterface import LogsProcedureInterface +from Drivers.SQLiteDriver import SQLiteDriver +from Utils.Utils import Utils + +class LogsSQLiteProcedure(ProcedureAbstract, LogsProcedureInterface): + + def __init__(self:Self, + cxcv:CXCVInterface, + key:str, + via:str, + inputs:Optional[dict[str, Any|None]|tuple[Any|None, ...]|list[Any|None]] = None + ) -> None: + super().__init__(cxcv, key, via, inputs) + + self.__database:SQLiteDriver = self.cxcv.databases.get("logs") + + def __set_item(self:Self, table:str, column:str, data:str, hashed:bool = False) -> int: + + parameters:dict[str, Any|None] = { + "hash": sha256(data.encode("utf-8")).hexdigest() if hashed else "", + "data" : data + } + + return self.__database.get_id(( + "select id from " + table + " where date_out is null and " + ( + "hash = {hash}" if hashed else + column + " = {data}") + ";" + ), parameters) or self.__database.execute(( + "insert into " + table + "(hash, " + column + ") values({hash}, {data});" if hashed else + "insert into " + table + "(" + column + ") values({data});"), parameters).ids[0] + + def __get_action_data(self:Self, application:str, file:str, method:str) -> dict[str, Any|None]: + + parameters:dict[str, Any|None] = { + "application": self.__set_item("Applications", "name", application), + "language" : self.__set_item("Languages", "name", "Python"), + "file": self.__set_item("Files", "path", file), + "method": method + } + + return self.__database.get_id(( + "select id from Methods where date_out is null and application = {application} and language = {language} and file = {file} and name = {method};" + ), parameters) or self.__database.execute(( + "insert into Methods(application, language, file, name) values({application}, {language}, {file}, {method});" + ), parameters).ids[0] + + def set_log(self:Self, + application:str, + file:str, + method:str, + line:int, + message:str, + error:int = 0, + parameters:dict[str, Any|None] = {} + ) -> None: + self.__database.execute(( + "insert into Logs(method, message, parameters, line, error) values({method}, {message}, {parameters}, {line}, {error});" + ), { + "method" : self.__get_action_data(application, file, method), + "line" : line, + "message": self.__set_item("Messages", "message", message, True), + "parameters": self.__set_item("Parameters", "data", Utils.json_encode(parameters), True), + "error" : error + }) + + def set_exception(self:Self, + application:str, + file:str, + method:str, + line:int, + message:str, + exception:str, + trace:str, + parameters:dict[str, Any|None] = {}, + status:Optional[str] = None + ) -> None: + self.__database.execute(( + "insert into Exceptions(method, message, exception, parameters, trace, line, status) values({method}, {message}, {exception}, {parameters}, {trace}, {line}, {status});" + ), { + "method" : self.__get_action_data(application, file, method), + "line" : line, + "message": self.__set_item("Messages", "message", message, True), + "exception" : self.__set_item("Messages", "message", exception, True), + "parameters": self.__set_item("Parameters", "data", Utils.json_encode(parameters), True), + "trace": self.__set_item("Traces", "trace", Utils.json_encode(trace), True), + "status" : status + }) \ No newline at end of file diff --git a/Python/Utils/Patterns.py b/Python/Utils/Patterns.py new file mode 100644 index 0000000..d8b8b00 --- /dev/null +++ b/Python/Utils/Patterns.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from re import Pattern as REPattern, compile as re_compile, IGNORECASE as RE_IGNORECASE, Match as REMatch + +class RE: + KEY:REPattern = re_compile(r'^[a-z_][a-z0-9_]*$', RE_IGNORECASE) + SLASHES:REPattern = re_compile(r'[\\/]+') + LAST_PATH_ITEM:REPattern = re_compile(r'[\\/][^\\/]+[\\/]?$') + STRING_VARIABLE:REPattern = re_compile(r'\{([a-z_][a-z0-9_]*)\}', RE_IGNORECASE) + COMMAND:REPattern = re_compile(r'^([^ \s]+)(?:[ \t]+(.*))?$') + COMMAND_ARGUMENT:REPattern = re_compile(r'([^\s"\'=]+)(?:=(?:"((?:[^\\"]+|\\.)*)"|\'((?:[^\\\']+|\\.)*)\'|([^\s]+))?)?|"((?:[^\\"]+|\\.)*)"|\'((?:[^\\\']+|\\.)*)\'') + TO_KEY:REPattern = re_compile(r'[^a-z0-9_]+', RE_IGNORECASE) + EXCEPTION:REPattern = re_compile(r'^\s*File "([^"]+)", line ([0-9]+), in ([^\n]+)(.*|[\r\n]*)*$') + BREAK_LINES:REPattern = re_compile(r'\r\n|[\r\n]') + SQL_LITE:REPattern = re_compile(r'(\-{2}[^\r\n]+|\/\*(?:(?!(?:\*\/))(?:.|[\r\n]+))+(?:\*\/)?)|([^;"\']+)|("(?:(?:[^"]+|""|[\r\n]+)*)"|\'(?:(?:[^\']+|\'{2}|[\r\n]+)*)\')|(;)') + FILE_PATH:REPattern = re_compile(r'^(?:([^\@\:]+)?(?:\:([^\@]*))?\@)?(?:([^\:]+)(?:\:([0-9]+))?\:)?(.+)$') + + @staticmethod + def get_all( + regex:REPattern, + string:str + ) -> list[tuple[str|None, ...]]: + + blocks:list[tuple[str|None, ...]] = [] + matches:REMatch|None + + while (matches := regex.search(string)) is not None: + blocks.append(matches.groups()) + string = string[matches.end():] + + return blocks \ No newline at end of file diff --git a/Python/Utils/Utils.py b/Python/Utils/Utils.py new file mode 100644 index 0000000..866fdfb --- /dev/null +++ b/Python/Utils/Utils.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Any, Self, Optional +from inspect import stack as get_stack, FrameInfo +from traceback import format_stack as trace_format_stack, extract_tb as extract_traceback +from json import loads as json_decode, dumps as json_encode +from re import Match as REMatch +from Utils.Patterns import RE + +class Utils: + + @classmethod + def get_keys(cls:type[Self], *items:list[Any|None]) -> list[str]: + + keys:list[str] = [] + item:Any|None + + for item in items: + if isinstance(item, (list, tuple)): + keys += cls.get_keys(*item) + elif isinstance(item, str) and RE.KEY.match(item): + keys += [item] + + return keys + + @classmethod + def get_dictionaries(cls:type[Self], *items:list[Any|None]) -> list[dict[str, Any|None]]: + + dictionaries:list[dict[str, Any|None]] = [] + item:Any|None + + for item in items: + if isinstance(item, (list, tuple)): + dictionaries += cls.get_dictionaries(*item) + elif isinstance(item, dict): + dictionaries += [item] + + return dictionaries + + @classmethod + def get_dictionary(cls:type[Self], *items:list[Any|None]) -> dict[str, Any|None]: + + dictionary:dict[str, Any|None] = {} + + for item in items: + if isinstance(item, (list, tuple)): + dictionary = {**cls.get_dictionary(*item), **dictionary} + elif isinstance(item, dict): + dictionary = {**item, **dictionary} + + return dictionary + + @classmethod + def get_value(cls:type[Self], + keys:str|list[str]|tuple[str, ...], + inputs:dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...], + default:Optional[Any] = None + ) -> Any|None: + if len(keys := cls.get_keys(keys)): + + subinputs:dict[str, Any|None] + + for subinputs in cls.get_dictionaries(inputs): + for key in keys: + if key in subinputs: + return subinputs[key] + return default + + @classmethod + def get_strings(cls:type[Self], *items:list[Any|None]) -> list[str]: + + strings:list[str] = [] + item:Any|None + + for item in items: + if isinstance(item, (list, tuple)): + strings += cls.get_strings(*item) + elif isinstance(item, str): + strings += [item] + + return strings + + @classmethod + def get_keys(cls:type[Self], *items:list[Any|None]) -> list[str]: + + keys:list[str] = [] + item:Any|None + + for item in items: + if isinstance(item, (list, tuple)): + keys += cls.get_keys(*item) + elif isinstance(item, str) and RE.KEY.match(item): + keys += [item] + + return keys + + @classmethod + def string_variables(cls:type[Self], + string:str, + variables:Optional[dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...]] = None, + default:str|None = None + ) -> str: + + variables = cls.get_dictionary(variables) + + def callback(matches:REMatch) -> str: + + key:str = matches.group(1) + + if key in variables: + return str(variables[key]) + return matches.group(0) if default is None else default + + return RE.STRING_VARIABLE.sub(callback, string) + + @staticmethod + def json_decode(data:str) -> dict[str, Any|None]|tuple[Any|None, ...]|list[Any|None]|None: + try: + return json_decode(data) + except Exception as exception: + pass + return None + + @staticmethod + def json_encode(data:dict[str, Any|None]|tuple[Any|None, ...]|list[Any|None]|None) -> str|None: + try: + return json_encode(data) + except Exception as exception: + pass + return None + + @staticmethod + def get_from( + item:list[Any|None]|tuple[Any|None, ...], + indexes:int|list[int]|tuple[int, ...], + default:Optional[Any] = None + ) -> Any|None: + + l:int = len(item) + i:int + + for i in indexes if isinstance(indexes, (list, tuple)) else (indexes,): + if i >= l: + break + if item[i] is not None: + return item[i] + return default + + @staticmethod + def get_action_data(i:int = 0) -> dict[str, str|int]: + + stack:FrameInfo = get_stack()[1 + i] + + return { + "file" : stack.filename, + "method" : stack.function, + "line" : stack.lineno + } + + @staticmethod + def get_trace(exception:Optional[Exception] = None) -> list[str]: + return trace_format_stack()[:-2] + ( + extract_traceback(exception.__traceback__).format() if exception else + []) \ No newline at end of file diff --git a/Python/cxcv.py b/Python/cxcv.py new file mode 100644 index 0000000..ff361d9 --- /dev/null +++ b/Python/cxcv.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from Application.CXCV import CXCV +from typing import Any +from Controllers.LogsController import LogsController +from Procedures.SQLite.LogsSQLiteProcedure import LogsSQLiteProcedure +from Drivers.SQLiteDriver import SQLiteDriver + +inputs:dict[str, Any|None] = { + "default_databases_models" : { + "sqlite" : SQLiteDriver + }, + "default_controllers_models" : { + "logs_controller" : LogsController, + }, + "default_procedures_models" : { + "logs_procedure_sqlite" : LogsSQLiteProcedure, + }, +} + +try: + + from secrets import secrets as custom_secrets + + for key, value in dict(custom_secrets).items(): + if key not in inputs or isinstance(inputs[key], dict): + inputs[key] = value + elif isinstance(value, dict): + for subkey, subvalue in value.items(): + inputs[key][subkey] = subvalue + +except ImportError: + pass + +cxcv:CXCV = CXCV() \ No newline at end of file diff --git a/README.md b/README.md index f109b8e..af7e407 100644 --- a/README.md +++ b/README.md @@ -4,22 +4,95 @@ Proyecto para gestionar un control completo sobre movimientos y copiados de fich > **NOTA**: Para gestionar control de los Logs se está valorando si hacer uso de Logs en texto plano; o crear una pequeña SQLite para garantizar control de búsqueda eficiente; o ambos. +```mermaid +flowchart TD + +F{{"@FOREACH archivo IN origen"}} +A[Archivo] +C[Caché] +D[Destino] +H([Hash]) +s[Sí] +n[No] + +l{"Es cargado en"} +b{Crea} +g{"Lo guarda en"} +c{Comprueba} +d{Da} +e{"Elimina y coge siguiente"} +o{"¿Ok?"} +r{Reintenta} + +F --> d +d --> A +A --> l +l --> C +C --> b +b --> H +C --> g +g --> D +D --> c +c --> H +H --> o +o --> s +s --> e +e --> A +o --> n +n --> r +r --> A + +``` + +# Objetivos + +* [ ] Organizar el árbol de directorios del proyecto y fijar elementos. +* [ ] Generar la base Python del mismo. +* [ ] Generar los Drivers de comunicaciones. +* [ ] Gestionar la operativa. +* [ ] Gestionar los argumentos de entrada por comandos. +* [ ] Generar la I18N. +* [ ] Generar las configuraciones y Testear los Secrets. +* [ ] Crear el SQLite de creación. +* [ ] Generar manual de instrucciones. +* [ ] Hacer testeo en entorno real. + # JAM Este proyecto está orientado a ser como una JAM para tener máxima motivación. Las normas y demás se expondrán cuando tenga permiso propio a empezar con dicho proyecto, mientras tanto, simplemente explicaré a continuación su motivación y los objetivos personales/profesionales que he de cumplir para poder llevar a cabo dicho proyecto. > **IMPORTANTE**: Este proyecto tiene un objetivo final para una gestión, la cual es de caracter profesional/privada, motivo por la cual, no se mostrará ni se usará como ejemplo, así como tampoco se indicarán los nombres reales y ni se usará el entorno real final, pero sí se mencionarán objetivos del mismo en un entorno simulado controlado totalmente ageno y no referenciable. +## Día 1 + Para poder realizar esta JAM, primero he de acabar los siguientes objetivos: -- [ ] Crear los generadores de Tests de: - - [ ] Estructura de la Ordenanza Municipal de Normalización Lingüística del Concello de Ferrol. - - [ ] Tests acordes al contenido. Por simplicidad, se unificará todo en uno. - - *Aplicar referencias.* - - [ ] Estructura de la Ley 3/1983 de Normalización Lingüística. - - [ ] Generar los Tests acordes al Capítulo 5 del Título Preliminar. -- [ ] Organizar el tema 25 y catalogar la información cara GLPi, Nagios y Zabbix. -- [ ] Repasar la Ley 39/2015 y Constitución. -- [ ] Piscina de 19:00 a 20:00. +* [-] Crear los generadores de Tests de: + * [X] Estructura de la Ordenanza Municipal de Normalización Lingüística del Concello de Ferrol. + * [-] Tests acordes al contenido. Por simplicidad, se unificará todo en uno. + * *Aplicar referencias.* + * [-] Estructura de la Ley 3/1983 de Normalización Lingüística. + * [ ] Generar los Tests acordes al Capítulo 5 del Título Preliminar. +* [ ] Organizar el tema 25 y catalogar la información cara GLPi, Nagios y Zabbix. +* [-] Repasar la Ley 39/2015 y Constitución. +* [ ] Piscina de 19:00 a 20:00. + - *No pude pues se me pilló un poco el lomo.* + +> **IMPORTANTE**: Todo ha de cumplirse hoy. Si no es el caso, otro día se marcarán otros objetivos. + +> *No conseguí completar los objetivos.* + +## Día 2 + +Para poder realizar esta JAM, primero he de acabar los siguientes objetivos: + +* [X] Crear los generadores de Tests de: + * [X] Terminar los Tests acordes al contenido. Por simplicidad, se unificará todo en uno. + * *Aplicar referencias.* + * [X] Estructura de la Ley 3/1983 de Normalización Lingüística. + * [X] Generar los Tests acordes al Capítulo 5 del Título Preliminar. +* [-] Organizar el tema 25 y catalogar la información cara GLPi, Nagios y Zabbix. +* [X] Repasar el tema de la Unión Europea y Constitución. +* [X] Hacer pilates y estiramientos de 19:00 a 20:00 y luego salir a andar un poco. > **IMPORTANTE**: Todo ha de cumplirse hoy. Si no es el caso, otro día se marcarán otros objetivos. \ No newline at end of file diff --git a/SQLite/Logs.lite.sql b/SQLite/Logs.lite.sql new file mode 100644 index 0000000..e16fdee --- /dev/null +++ b/SQLite/Logs.lite.sql @@ -0,0 +1,167 @@ +pragma foreign_keys = on; + +-- Level 0. +create table if not exists Applications( + id integer primary key autoincrement, + 'name' varchar(64) not null unique, + date_in datetime default current_timestamp, + date_out datetime null, + unique('name') +); + +create table if not exists Languages( + id integer primary key autoincrement, + 'name' varchar(64) not null unique, + date_in datetime default current_timestamp, + date_out datetime null, + unique('name') +); + +create table if not exists Files( + id integer primary key autoincrement, + 'path' varchar(256) not null unique, + date_in datetime default current_timestamp, + date_out datetime null, + unique('path') +); + +create table if not exists Traces( + id integer primary key autoincrement, + 'hash' varchar(256) not null, + trace text not null, + date_in datetime default current_timestamp, + date_out datetime null, + unique('hash') +); + +create table if not exists Messages( + id integer primary key autoincrement, + 'hash' varchar(256) not null, + 'message' text not null, + date_in datetime default current_timestamp, + date_out datetime null, + unique('hash') +); + +create table if not exists Parameters( + id integer primary key autoincrement, + 'hash' varchar(256) not null, + 'data' text not null, + date_in datetime default current_timestamp, + date_out datetime null, + unique('hash') +); + +-- Level 1. +create table if not exists Methods( + id integer primary key autoincrement, + 'application' integer not null, + 'language' integer not null, + 'file' integer null, + 'name' varchar(64) not null, + date_in datetime default current_timestamp, + date_out datetime null, + foreign key ('application') references Applications(id) on delete restrict on update restrict, + foreign key ('language') references Languages(id) on delete restrict on update restrict, + foreign key ('file') references Files(id) on delete restrict on update restrict +); + +-- Level 2. +create table if not exists Logs( + id integer primary key autoincrement, + 'method' integer not null, + 'message' integer not null, + parameters integer not null, + 'line' integer not null, + error integer not null, + date_in datetime default current_timestamp, + date_out datetime null, + foreign key ('method') references Methods(id) on delete restrict on update restrict, + foreign key ('message') references Messages(id) on delete restrict on update restrict, + foreign key (parameters) references Parameters(id) on delete restrict on update restrict +); + +create table if not exists Exceptions( + id integer primary key autoincrement, + 'method' integer not null, + trace integer not null, + 'message' integer not null, + exception integer not null, + parameters integer not null, + 'line' integer not null, + 'status' varchar(16) null, + date_in datetime default current_timestamp, + date_out datetime null, + foreign key ('method') references Methods(id) on delete restrict on update restrict, + foreign key ('trace') references Traces(id) on delete restrict on update restrict, + foreign key ('message') references Messages(id) on delete restrict on update restrict, + foreign key ('exception') references Messages(id) on delete restrict on update restrict, + foreign key (parameters) references Parameters(id) on delete restrict on update restrict +); + +drop view if exists MethodsView; +create view MethodsView as select + methods.id as id, + languages.id as language_id, + applications.id as application_id, + files.id as file_id, + applications.name as 'application', + languages.name as 'language', + files.path as 'file', + methods.name as 'name' +from Methods methods +join Applications applications on methods.application = applications.id +join Languages languages on methods.language = languages.id +join Files files on methods.file = files.id +where + methods.date_out is null and + applications.date_out is null and + languages.date_out is null and + files.date_out is null; + +drop view if exists LogsView; +create view LogsView as select + logs.id as id, + methods.application as 'application', + methods.language as 'language', + methods.file as 'file', + methods.name as method, + logs.line as 'line', + messages.message as 'message', + parameters.data as parameters, + logs.error as error, + logs.date_in as date_in +from Logs logs +join MethodsView methods on logs.method = methods.id +join Messages messages on logs.message = messages.id +join Parameters parameters on logs.parameters = parameters.id +where + logs.date_out is null and + messages.date_out is null and + parameters.date_out is null; + +drop view if exists ExceptionsView; +create view ExceptionsView as select + exceptions.id as id, + methods.application as 'application', + methods.language as 'language', + methods.file as 'file', + methods.name as method, + exceptions.line as 'line', + messages.message as 'message', + traces.trace as 'trace', + exceptions_message.message as 'exception', + parameters.data as parameters, + exceptions.date_in as date_in +from Exceptions exceptions +join MethodsView methods on exceptions.method = methods.id +join Messages messages on exceptions.message = messages.id +join Messages exceptions_message on exceptions.exception = exceptions_message.id +join Traces traces on exceptions.trace = traces.id +join Parameters parameters on exceptions.parameters = parameters.id +where + exceptions.date_out is null and + messages.date_out is null and + exceptions_message.date_out is null and + traces.date_out is null and + parameters.date_out is null; \ No newline at end of file diff --git a/Tools/run.sh b/Tools/run.sh new file mode 100755 index 0000000..d392d2f --- /dev/null +++ b/Tools/run.sh @@ -0,0 +1,3 @@ +#!/bin/bash +cd `dirname $(readlink -f "$0")`/../Python +python3 "cxcv.py" \ No newline at end of file diff --git a/version b/version index 8a9ecc2..7bcd0e3 100644 --- a/version +++ b/version @@ -1 +1 @@ -0.0.1 \ No newline at end of file +0.0.2 \ No newline at end of file