From 178f678b690e76b99182174916b075b038e73ee9 Mon Sep 17 00:00:00 2001 From: KyMAN <0kyman0@gmail.com> Date: Thu, 4 Jun 2026 07:38:36 +0200 Subject: [PATCH] #wip(py): Queues Manager done. --- JSON/AnP.settings.json | 9 ++ Python/Application/AnP.py | 5 + Python/Controllers/AIController.py | 39 +++---- Python/Interfaces/Application/AnPInterface.py | 2 + .../Managers/QueusManagerInterface.py | 31 +++++ Python/Managers/AIInterpretersManager.py | 41 +------ Python/Managers/ControllersManager.py | 1 - Python/Managers/PseudoLoRAsManager.py | 9 -- Python/Managers/QueusManager.py | 107 ++++++++++++++++++ Python/Models/AIPoolRequestsModel.py | 92 --------------- Python/Models/QueuesModel.py | 79 +++++++++++++ Python/Utils/Checks.py | 4 + 12 files changed, 261 insertions(+), 158 deletions(-) create mode 100644 Python/Interfaces/Managers/QueusManagerInterface.py create mode 100644 Python/Managers/QueusManager.py delete mode 100644 Python/Models/AIPoolRequestsModel.py create mode 100644 Python/Models/QueuesModel.py diff --git a/JSON/AnP.settings.json b/JSON/AnP.settings.json index f52a1af..dd59637 100644 --- a/JSON/AnP.settings.json +++ b/JSON/AnP.settings.json @@ -42,6 +42,15 @@ "AnP_ModelsManager_start" : null, "AnP_ModelsManager_end" : null, + "AnP_UniqueKeysManager_start" : null, + "AnP_UniqueKeysManager_end" : null, + + "AnP_QueuesManager_start" : null, + "default_queues" : { + "anp" : 1 + }, + "AnP_QueuesManager_end" : null, + "AnP_ControllersManager_start" : null, "default_controllers" : { "ai" : "AIController" diff --git a/Python/Application/AnP.py b/Python/Application/AnP.py index 4e52217..df76247 100644 --- a/Python/Application/AnP.py +++ b/Python/Application/AnP.py @@ -11,6 +11,7 @@ from Managers.PrintTypesManager import PrintTypesManager from Managers.TerminalManager import TerminalManager from Managers.ModelsManager import ModelsManager from Managers.UniqueKeysManager import UniqueKeysManager +from Managers.QueusManager import QueuesManager from Managers.SessionsManager import SessionsManager from Managers.ControllersManager import ControllersManager from Managers.DispatchesManager import DispatchesManager @@ -47,6 +48,7 @@ class AnP: self.terminal:TerminalManager = TerminalManager(self) self.models:ModelsManager = ModelsManager(self) self.unique_keys:UniqueKeysManager = UniqueKeysManager(self) + self.queues:QueuesManager = QueuesManager(self) self.sessions:SessionsManager = SessionsManager(self) self.controllers:ControllersManager = ControllersManager(self) self.dispatches:DispatchesManager = DispatchesManager(self) @@ -65,6 +67,7 @@ class AnP: self.terminal.update() self.models.update() self.unique_keys.update() + self.queues.update() self.sessions.update() self.controllers.update() self.dispatches.update() @@ -82,6 +85,7 @@ class AnP: self.terminal.reset() self.models.reset() self.unique_keys.reset() + self.queues.reset() self.sessions.reset() self.controllers.reset() self.dispatches.reset() @@ -93,6 +97,7 @@ class AnP: def close(self:Self) -> None: self.__working = False + self.queues.close() self.ai_interpreters.close() self.web_socket_servers.close() self.http_servers.close() diff --git a/Python/Controllers/AIController.py b/Python/Controllers/AIController.py index 177a188..ddbaa7a 100644 --- a/Python/Controllers/AIController.py +++ b/Python/Controllers/AIController.py @@ -9,29 +9,26 @@ from Models.PseudoLoRAModel import PseudoLoRAModel class AIController(ControllerAbstract, ModelAbstract): - # def __init__(self:Self, anp:AnPInterface) -> None: - # self.anp: AnPInterface = anp - - # def __temp(self:Self) - - # def __get_data(self:Self, request:RequestModel, callback:Callable[..., Any|None]) -> None: - # self.anp.pseudoloras.get("anp_titles") - - def __filter_loras_callback() - - def __filter_loras(self:Self, message:str, loras:list[PseudoLoRAModel], keys:list[str]) -> None: - - has_keys:bool = len(keys) > 0 - - self.anp.ai_interpreters.request("anp_titles", None, "# Títulos\n\n" + "\n".join( - "- " + lora.title + "" for lora in loras if not has_keys or lora.key in keys - ) + "\n\n# Mensaje\n\n" + message, lambda id, response:None, []) - - def get_pseudoloras(self:Self, request:RequestModel) -> None: - pass + def __test_execution(self:Self, end:Callable[[], None], request:RequestModel) -> None: + print("PASA") + self.anp.ai_interpreters.request( + "anp_titles", + None, + request.get("message", "Hola, Gemma. ¿Me puedes ayudar a instalar una impresora Canon?"), + lambda id, response: print((id, response.response)), + [ + "Seleccionar títulos exactos relacionados con la consulta:" + "".join( + "\n - " + title for title in [ + "Información, gestión e instalación de Cividas", + "Información, gestión e instalación de Impresoras/Fotocopiadoras/Multifuncionales Canon" + ] + ), + ] + ) + end() def test(self:Self, request:RequestModel) -> None: - self.anp.ai_interpreters.request("anp_titles", None, request.get("message", "Hola"), lambda id, response: print((id, response.response))) + self.anp.queues.add("anp", self.__test_execution, request) request.set_response({ "ok" : True, "code" : 200, diff --git a/Python/Interfaces/Application/AnPInterface.py b/Python/Interfaces/Application/AnPInterface.py index 6114d07..0d18688 100644 --- a/Python/Interfaces/Application/AnPInterface.py +++ b/Python/Interfaces/Application/AnPInterface.py @@ -9,6 +9,7 @@ from Interfaces.Managers.PrintTypesManagerInterface import PrintTypesManagerInte from Interfaces.Managers.TerminalManagerInterface import TerminalManagerInterface from Interfaces.Managers.ModelsManagerInterface import ModelsManagerInterface from Interfaces.Managers.UniqueKeysManagerInterface import UniqueKeysManagerInterface +from Interfaces.Managers.QueusManagerInterface import QueusManagerInterface from Interfaces.Managers.SessionsManagerInterface import SessionsManagerInterface from Interfaces.Managers.ControllersManagerInterface import ControllersManagerInterface from Interfaces.Managers.DispatchesManagerInterface import DispatchesManagerInterface @@ -28,6 +29,7 @@ class AnPInterface(ABC): self.terminal:TerminalManagerInterface = None self.models:ModelsManagerInterface = None self.unique_keys:UniqueKeysManagerInterface = None + self.queues:QueusManagerInterface = None self.sessions:SessionsManagerInterface = None self.controllers:ControllersManagerInterface = None self.dispatches:DispatchesManagerInterface = None diff --git a/Python/Interfaces/Managers/QueusManagerInterface.py b/Python/Interfaces/Managers/QueusManagerInterface.py new file mode 100644 index 0000000..f011a6e --- /dev/null +++ b/Python/Interfaces/Managers/QueusManagerInterface.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any, Callable, Optional, Sequence +from abc import ABC, abstractmethod + +class QueusManagerInterface(ABC): + + @abstractmethod + def update(self:Self) -> None:pass + + @abstractmethod + def reset(self:Self) -> None:pass + + @abstractmethod + def close(self:Self) -> None:pass + + @abstractmethod + def create(self:Self, inputs:Any|None, overwrite:bool = False) -> None:pass + + @abstractmethod + def add(self:Self, key:str, callback:Callable[[Callable[[], None]], None], *arguments:list[Any|None]) -> int|None:pass + + @abstractmethod + def next(self:Self) -> None:pass + + @abstractmethod + def cancel(self:Self, keys:Optional[str|Sequence[str]] = False, _is:Optional[int|Sequence[int]] = None) -> bool:pass + + @abstractmethod + def remove(self:Self, keys:Optional[str|Sequence[str]] = None) -> bool:pass \ No newline at end of file diff --git a/Python/Managers/AIInterpretersManager.py b/Python/Managers/AIInterpretersManager.py index 57400f0..3595a02 100644 --- a/Python/Managers/AIInterpretersManager.py +++ b/Python/Managers/AIInterpretersManager.py @@ -5,7 +5,6 @@ from typing import Any, Callable, Self, Optional from Interfaces.Application.AnPInterface import AnPInterface from Abstracts.AIInterpretersAbstract import AIInterpretersAbstract from Models.AIResponseModel import AIResponseModel -from Models.AIPoolRequestsModel import AIPoolRequestsModel from Utils.Common import Common class AIInterpretersManager: @@ -14,7 +13,6 @@ class AIInterpretersManager: self.anp:AnPInterface = anp self.__interpreters:dict[str, AIInterpretersAbstract] = {} - self.__pool_requests:dict[str, AIPoolRequestsModel] = {} self.update() @@ -63,7 +61,6 @@ class AIInterpretersManager: if key in self.__interpreters: self.remove(key) self.__interpreters[key] = interpreter - self.__pool_requests[interpreter.pool] = AIPoolRequestsModel() def remove(self:Self, keys:Optional[str|list[str]] = None) -> None: @@ -71,46 +68,20 @@ class AIInterpretersManager: for key in Common.get_keys(keys) if keys else list(self.__interpreters.keys()): if key in self.__interpreters: - - pool:str = self.__interpreters[key].pool - has_interpreters:bool = False - interpreter:AIInterpretersAbstract - - self.__pool_requests.get(pool).remove(self.__interpreters[key].key) self.__interpreters[key].close() del self.__interpreters[key] - for interpreter in self.__interpreters.values(): - if interpreter.pool == pool: - has_interpreters = True - break - - if not has_interpreters and pool in self.__pool_requests: - del self.__pool_requests[pool] - def request(self:Self, key:str, session:int|None, message:str, callback:Callable[[int, AIResponseModel], None], orders:list[str] = [] - ) -> int|None: - - i:int|None = None - + ) -> tuple[int|None, AIResponseModel|None]: + + response:AIResponseModel|None = None + if key in self.__interpreters: + session, response = self.__interpreters[key].request(session, message, callback, orders) - pool:str = self.__interpreters[key].pool - - i = self.__pool_requests[pool].add(self.__interpreters[key], session, message, callback, orders) - self.__pool_requests[pool].execute() - - return i - - def cancel_request(self:Self, key:str, id:int) -> None: - if key in self.__interpreters: - - pool:str = self.__interpreters[key].pool - - if id in self.__pool_requests[pool].pool: - del self.__pool_requests[pool].pool[id] \ No newline at end of file + return session, response \ No newline at end of file diff --git a/Python/Managers/ControllersManager.py b/Python/Managers/ControllersManager.py index 9061383..f07cd34 100644 --- a/Python/Managers/ControllersManager.py +++ b/Python/Managers/ControllersManager.py @@ -58,7 +58,6 @@ class ControllersManager: None) def execute(self:Self, key:str, method:str, request:RequestModel) -> bool: - print([self.__controllers, key, method]) if key in self.__controllers and hasattr(self.__controllers[key], method): getattr(self.__controllers[key], method)(request) return True diff --git a/Python/Managers/PseudoLoRAsManager.py b/Python/Managers/PseudoLoRAsManager.py index 9f8e4b4..69565ff 100644 --- a/Python/Managers/PseudoLoRAsManager.py +++ b/Python/Managers/PseudoLoRAsManager.py @@ -89,15 +89,6 @@ class PseudoLoRAsManager: for lora in self.__loras: lora.clean_cache() - def get_asynchronous(self:Self, - each_callback:Callable[[list[PseudoLoRAModel], list[PseudoLoRAModel], Callable[[list[PseudoLoRAModel]], None]], bool], - end_callback:Callable[[list[PseudoLoRAModel]], None], - keys:list[str] = [], - loras:Optional[list[PseudoLoRAModel]] = None, - results:list[PseudoLoRAModel] = [] - ) -> None: - Common.execute(each_callback, ) - def get(self:Self, callback:Callable[[list[PseudoLoRAModel]], bool], keys:list[str] = []) -> list[tuple[str, str]]: next:list[PseudoLoRAModel] = [] diff --git a/Python/Managers/QueusManager.py b/Python/Managers/QueusManager.py new file mode 100644 index 0000000..d00cfc9 --- /dev/null +++ b/Python/Managers/QueusManager.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any, Callable, Optional, Sequence +from Interfaces.Application.AnPInterface import AnPInterface +from Models.QueuesModel import QueuesModel +from Utils.Common import Common + +class QueuesManager: + + def __init__(self:Self, anp:AnPInterface) -> None: + self.anp:AnPInterface = anp + self.__queues:dict[str, QueuesModel] = {} + + self.update() + + def update(self:Self) -> None: + key:str + + for key in ("default_queues_files", "queues_files", "default_queues", "queues"): + self.create(self.anp.settings.get(key), True) + + def reset(self:Self) -> None: + self.__queues = {} + self.update() + + def close(self:Self) -> None: + self.remove() + + def create(self:Self, inputs:Any|None, overwrite:bool = False) -> None: + + subinputs:dict[str, Any|None] + + for subinputs in Common.load_json(inputs, True): + + key:str + queue:Any|None + + for key, queue in subinputs.items(): + if overwrite or key not in self.__queues: + self.__queues[key] = QueuesModel(key, queue) + + def add(self:Self, key:str, callback:Callable[[Callable[[], None]], None], *arguments:list[Any|None]) -> int|None: + if key in self.__queues: + return self.__queues[key].add(callback, *arguments) + return None + + def next(self:Self) -> None: + + queue:QueuesModel + + for queue in self.__queues.values(): + queue.next() + + def cancel(self:Self, keys:Optional[str|Sequence[str]] = False, _is:Optional[int|Sequence[int]] = None) -> bool: + if keys is None: + + queue:QueuesModel + + for queue in self.__queues.values(): + if _is is None: + queue.cancel_all() + else: + + i:int + + for i in Common.get_keys(_is): + queue.cancel(i) + + return True + + done:bool = False + key:str + + for key in Common.get_keys(keys): + if key in self.__queues: + if _is is None: + self.__queues[key].cancel_all() + done = True + else: + + i:int + + for i in Common.get_keys(_is): + self.__queues[key].cancel(i) + done = True + + return done + + def remove(self:Self, keys:Optional[str|Sequence[str]] = None) -> bool: + if keys is None: + + for queue in self.__queues.values(): + queue.cancel_all() + self.__queues = {} + + return True + + done:bool = False + key:str + + for key in Common.get_keys(keys): + if key in self.__queues: + del self.__queues[key] + done = True + + return done diff --git a/Python/Models/AIPoolRequestsModel.py b/Python/Models/AIPoolRequestsModel.py deleted file mode 100644 index ee503b9..0000000 --- a/Python/Models/AIPoolRequestsModel.py +++ /dev/null @@ -1,92 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -from typing import Callable, Self -from threading import Thread, Lock -from Models.AIResponseModel import AIResponseModel -from Abstracts.AIInterpretersAbstract import AIInterpretersAbstract -from Utils.Common import Common - -class AIPoolRequestsItemsModel: - def __init__(self:Self, - key:str, - interpreter:AIInterpretersAbstract, - session:int|None, - message:str, - callback:Callable[[int, AIResponseModel], None], - orders:list[str] = [] - ) -> None: - self.key:str = key - self.interpreter:AIInterpretersAbstract = interpreter - self.session:int|None = session - self.message:str = message - self.callback:Callable[[int, AIResponseModel], None] = callback - self.orders:list[str] = orders - -class AIPoolRequestsModel: - - def __init__(self:Self) -> None: - self.pool:dict[int, AIPoolRequestsItemsModel] = {} - self.iterations:int = 0 - self.maximum_iterations:int = 1 - self.i:int = 0 - self.j:int = 0 - self.__lock:Lock = Lock() - - def add(self:Self, - interpreter:AIInterpretersAbstract, - session:int|None, - message:str, - callback:Callable[[int, AIResponseModel], None], - orders:list[str] = [] - ) -> int: - - id:int - - with self.__lock: - self.i += 1 - id = self.i - self.pool[self.i] = AIPoolRequestsItemsModel( - interpreter.key, interpreter, session, message, callback, orders - ) - - self.execute() - - return id - - def __next(self:Self, callback:Callable[[int, AIResponseModel], None], session:int, response:AIResponseModel) -> None: - Common.execute(callback, session, response) - if response.done or not response.ok: - with self.__lock: - self.iterations -= 1 - self.execute() - - def __execute(self:Self) -> None: - - item:AIPoolRequestsItemsModel|None = None - - with self.__lock: - if len(self.pool) and self.iterations != self.maximum_iterations and self.i != self.j: - self.iterations += 1 - self.j = min(self.pool.keys()) - item:AIPoolRequestsItemsModel = self.pool[self.j] - del self.pool[self.j] - - item and item.interpreter.request( - item.session, item.message, item.orders, lambda session, response: self.__next(item.callback, session, response) - ) - - def execute(self:Self) -> None: - Thread(target = self.__execute).start() - - def cancel(self:Self, ids:int|list[int]) -> None: - with self.__lock: - for id in Common.get_keys(ids): - if id in self.pool: - del self.pool[id] - - def remove(self:Self, key:str) -> None: - with self.__lock: - for id, item in list(self.pool.items()): - if item.key == key: - del self.pool[id] \ No newline at end of file diff --git a/Python/Models/QueuesModel.py b/Python/Models/QueuesModel.py new file mode 100644 index 0000000..864c0da --- /dev/null +++ b/Python/Models/QueuesModel.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any, Callable, Sequence, Optional +from threading import Thread, Lock +from Utils.Checks import Check +from Utils.Common import Common + +class QueueItemModel: + def __init__(self:Self, i:int, callback:Callable[[Callable[[], None]], None], *arguments:list[Any|None]) -> None: + self.i:int = i + self.callback:Callable[[Callable[[], None]], None] = callback + self.arguments:list[Any|None] = arguments + self.executing:bool = False + self.thread:Thread|None = None + +class QueuesModel: + + def __init__(self:Self, key:str, inputs:Optional[int|dict[str, Any|None]|Sequence[Any|None]] = None) -> None: + + if Check.is_integer(inputs): + inputs = {"maximum" : inputs} + + self.key:str = key + self.items:dict[int, QueueItemModel] = {} + self.i:int = 0 + self.p:int = 0 + self.maximum:int = Common.get_value(("maximum", "maximum_concurrent", "maximum_simultaneous"), inputs, 1) + self.current:int = 0 + + def add(self:Self, callback:Callable[[Callable[[], None]], None], *arguments:list[Any|None]) -> int|None: + + i:int|None = None + + if Check.is_function(callback): + with Lock(): + self.items[i := self.i] = QueueItemModel(self.i, callback, *arguments) + self.i += 1 + + self.next() + + return i + + def cancel(self:Self, i:int) -> bool: + with Lock(): + if i in self.items and not self.items[i].executing: + del self.items[i] + return True + return False + + def cancel_all(self:Self) -> None: + with Lock(): + for i in list(self.items.keys()): + if not self.items[i].executing: + del self.items[i] + + def __end(self:Self, p:int) -> None: + with Lock(): + self.current -= 1 + del self.items[p] + self.next() + + def next(self:Self) -> None: + while self.p < self.i and self.current < self.maximum: + + item:QueueItemModel + p:int|None = None + + with Lock(): + p = min((item.i for item in self.items.values() if not item.executing), default=None) + if p is None: + return + item = self.items[p] + item.executing = True + self.p = p + self.current += 1 + + item.thread = Thread(target = lambda:item.callback(lambda *results:self.__end(p, *results), *item.arguments)) + item.thread.start() \ No newline at end of file diff --git a/Python/Utils/Checks.py b/Python/Utils/Checks.py index cf53880..ff8ac08 100644 --- a/Python/Utils/Checks.py +++ b/Python/Utils/Checks.py @@ -27,6 +27,10 @@ class Check: def is_boolean(item:Any|None) -> bool: return isinstance(item, bool) + @staticmethod + def is_integer(item:Any|None) -> bool: + return isinstance(item, int) + @staticmethod def is_function(item:Any|None) -> bool: return callable(item)