#wip(py): Queues Manager done.

This commit is contained in:
KyMAN 2026-06-04 07:38:36 +02:00
parent 3412ff05b5
commit 178f678b69
12 changed files with 261 additions and 158 deletions

View File

@ -42,6 +42,15 @@
"AnP_ModelsManager_start" : null, "AnP_ModelsManager_start" : null,
"AnP_ModelsManager_end" : null, "AnP_ModelsManager_end" : null,
"AnP_UniqueKeysManager_start" : null,
"AnP_UniqueKeysManager_end" : null,
"AnP_QueuesManager_start" : null,
"default_queues" : {
"anp" : 1
},
"AnP_QueuesManager_end" : null,
"AnP_ControllersManager_start" : null, "AnP_ControllersManager_start" : null,
"default_controllers" : { "default_controllers" : {
"ai" : "AIController" "ai" : "AIController"

View File

@ -11,6 +11,7 @@ from Managers.PrintTypesManager import PrintTypesManager
from Managers.TerminalManager import TerminalManager from Managers.TerminalManager import TerminalManager
from Managers.ModelsManager import ModelsManager from Managers.ModelsManager import ModelsManager
from Managers.UniqueKeysManager import UniqueKeysManager from Managers.UniqueKeysManager import UniqueKeysManager
from Managers.QueusManager import QueuesManager
from Managers.SessionsManager import SessionsManager from Managers.SessionsManager import SessionsManager
from Managers.ControllersManager import ControllersManager from Managers.ControllersManager import ControllersManager
from Managers.DispatchesManager import DispatchesManager from Managers.DispatchesManager import DispatchesManager
@ -47,6 +48,7 @@ class AnP:
self.terminal:TerminalManager = TerminalManager(self) self.terminal:TerminalManager = TerminalManager(self)
self.models:ModelsManager = ModelsManager(self) self.models:ModelsManager = ModelsManager(self)
self.unique_keys:UniqueKeysManager = UniqueKeysManager(self) self.unique_keys:UniqueKeysManager = UniqueKeysManager(self)
self.queues:QueuesManager = QueuesManager(self)
self.sessions:SessionsManager = SessionsManager(self) self.sessions:SessionsManager = SessionsManager(self)
self.controllers:ControllersManager = ControllersManager(self) self.controllers:ControllersManager = ControllersManager(self)
self.dispatches:DispatchesManager = DispatchesManager(self) self.dispatches:DispatchesManager = DispatchesManager(self)
@ -65,6 +67,7 @@ class AnP:
self.terminal.update() self.terminal.update()
self.models.update() self.models.update()
self.unique_keys.update() self.unique_keys.update()
self.queues.update()
self.sessions.update() self.sessions.update()
self.controllers.update() self.controllers.update()
self.dispatches.update() self.dispatches.update()
@ -82,6 +85,7 @@ class AnP:
self.terminal.reset() self.terminal.reset()
self.models.reset() self.models.reset()
self.unique_keys.reset() self.unique_keys.reset()
self.queues.reset()
self.sessions.reset() self.sessions.reset()
self.controllers.reset() self.controllers.reset()
self.dispatches.reset() self.dispatches.reset()
@ -93,6 +97,7 @@ class AnP:
def close(self:Self) -> None: def close(self:Self) -> None:
self.__working = False self.__working = False
self.queues.close()
self.ai_interpreters.close() self.ai_interpreters.close()
self.web_socket_servers.close() self.web_socket_servers.close()
self.http_servers.close() self.http_servers.close()

View File

@ -9,29 +9,26 @@ from Models.PseudoLoRAModel import PseudoLoRAModel
class AIController(ControllerAbstract, ModelAbstract): class AIController(ControllerAbstract, ModelAbstract):
# def __init__(self:Self, anp:AnPInterface) -> None: def __test_execution(self:Self, end:Callable[[], None], request:RequestModel) -> None:
# self.anp: AnPInterface = anp print("PASA")
self.anp.ai_interpreters.request(
# def __temp(self:Self) "anp_titles",
None,
# def __get_data(self:Self, request:RequestModel, callback:Callable[..., Any|None]) -> None: request.get("message", "Hola, Gemma. ¿Me puedes ayudar a instalar una impresora Canon?"),
# self.anp.pseudoloras.get("anp_titles") lambda id, response: print((id, response.response)),
[
def __filter_loras_callback() "Seleccionar títulos exactos relacionados con la consulta:" + "".join(
"\n - " + title for title in [
def __filter_loras(self:Self, message:str, loras:list[PseudoLoRAModel], keys:list[str]) -> None: "Información, gestión e instalación de Cividas",
"Información, gestión e instalación de Impresoras/Fotocopiadoras/Multifuncionales Canon"
has_keys:bool = len(keys) > 0 ]
),
self.anp.ai_interpreters.request("anp_titles", None, "# Títulos\n\n" + "\n".join( ]
"- " + lora.title + "" for lora in loras if not has_keys or lora.key in keys )
) + "\n\n# Mensaje\n\n" + message, lambda id, response:None, []) end()
def get_pseudoloras(self:Self, request:RequestModel) -> None:
pass
def test(self:Self, request:RequestModel) -> None: def test(self:Self, request:RequestModel) -> None:
self.anp.ai_interpreters.request("anp_titles", None, request.get("message", "Hola"), lambda id, response: print((id, response.response))) self.anp.queues.add("anp", self.__test_execution, request)
request.set_response({ request.set_response({
"ok" : True, "ok" : True,
"code" : 200, "code" : 200,

View File

@ -9,6 +9,7 @@ from Interfaces.Managers.PrintTypesManagerInterface import PrintTypesManagerInte
from Interfaces.Managers.TerminalManagerInterface import TerminalManagerInterface from Interfaces.Managers.TerminalManagerInterface import TerminalManagerInterface
from Interfaces.Managers.ModelsManagerInterface import ModelsManagerInterface from Interfaces.Managers.ModelsManagerInterface import ModelsManagerInterface
from Interfaces.Managers.UniqueKeysManagerInterface import UniqueKeysManagerInterface from Interfaces.Managers.UniqueKeysManagerInterface import UniqueKeysManagerInterface
from Interfaces.Managers.QueusManagerInterface import QueusManagerInterface
from Interfaces.Managers.SessionsManagerInterface import SessionsManagerInterface from Interfaces.Managers.SessionsManagerInterface import SessionsManagerInterface
from Interfaces.Managers.ControllersManagerInterface import ControllersManagerInterface from Interfaces.Managers.ControllersManagerInterface import ControllersManagerInterface
from Interfaces.Managers.DispatchesManagerInterface import DispatchesManagerInterface from Interfaces.Managers.DispatchesManagerInterface import DispatchesManagerInterface
@ -28,6 +29,7 @@ class AnPInterface(ABC):
self.terminal:TerminalManagerInterface = None self.terminal:TerminalManagerInterface = None
self.models:ModelsManagerInterface = None self.models:ModelsManagerInterface = None
self.unique_keys:UniqueKeysManagerInterface = None self.unique_keys:UniqueKeysManagerInterface = None
self.queues:QueusManagerInterface = None
self.sessions:SessionsManagerInterface = None self.sessions:SessionsManagerInterface = None
self.controllers:ControllersManagerInterface = None self.controllers:ControllersManagerInterface = None
self.dispatches:DispatchesManagerInterface = None self.dispatches:DispatchesManagerInterface = None

View File

@ -0,0 +1,31 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Self, Any, Callable, Optional, Sequence
from abc import ABC, abstractmethod
class QueusManagerInterface(ABC):
@abstractmethod
def update(self:Self) -> None:pass
@abstractmethod
def reset(self:Self) -> None:pass
@abstractmethod
def close(self:Self) -> None:pass
@abstractmethod
def create(self:Self, inputs:Any|None, overwrite:bool = False) -> None:pass
@abstractmethod
def add(self:Self, key:str, callback:Callable[[Callable[[], None]], None], *arguments:list[Any|None]) -> int|None:pass
@abstractmethod
def next(self:Self) -> None:pass
@abstractmethod
def cancel(self:Self, keys:Optional[str|Sequence[str]] = False, _is:Optional[int|Sequence[int]] = None) -> bool:pass
@abstractmethod
def remove(self:Self, keys:Optional[str|Sequence[str]] = None) -> bool:pass

View File

@ -5,7 +5,6 @@ from typing import Any, Callable, Self, Optional
from Interfaces.Application.AnPInterface import AnPInterface from Interfaces.Application.AnPInterface import AnPInterface
from Abstracts.AIInterpretersAbstract import AIInterpretersAbstract from Abstracts.AIInterpretersAbstract import AIInterpretersAbstract
from Models.AIResponseModel import AIResponseModel from Models.AIResponseModel import AIResponseModel
from Models.AIPoolRequestsModel import AIPoolRequestsModel
from Utils.Common import Common from Utils.Common import Common
class AIInterpretersManager: class AIInterpretersManager:
@ -14,7 +13,6 @@ class AIInterpretersManager:
self.anp:AnPInterface = anp self.anp:AnPInterface = anp
self.__interpreters:dict[str, AIInterpretersAbstract] = {} self.__interpreters:dict[str, AIInterpretersAbstract] = {}
self.__pool_requests:dict[str, AIPoolRequestsModel] = {}
self.update() self.update()
@ -63,7 +61,6 @@ class AIInterpretersManager:
if key in self.__interpreters: if key in self.__interpreters:
self.remove(key) self.remove(key)
self.__interpreters[key] = interpreter self.__interpreters[key] = interpreter
self.__pool_requests[interpreter.pool] = AIPoolRequestsModel()
def remove(self:Self, keys:Optional[str|list[str]] = None) -> None: def remove(self:Self, keys:Optional[str|list[str]] = None) -> None:
@ -71,46 +68,20 @@ class AIInterpretersManager:
for key in Common.get_keys(keys) if keys else list(self.__interpreters.keys()): for key in Common.get_keys(keys) if keys else list(self.__interpreters.keys()):
if key in self.__interpreters: if key in self.__interpreters:
pool:str = self.__interpreters[key].pool
has_interpreters:bool = False
interpreter:AIInterpretersAbstract
self.__pool_requests.get(pool).remove(self.__interpreters[key].key)
self.__interpreters[key].close() self.__interpreters[key].close()
del self.__interpreters[key] del self.__interpreters[key]
for interpreter in self.__interpreters.values():
if interpreter.pool == pool:
has_interpreters = True
break
if not has_interpreters and pool in self.__pool_requests:
del self.__pool_requests[pool]
def request(self:Self, def request(self:Self,
key:str, key:str,
session:int|None, session:int|None,
message:str, message:str,
callback:Callable[[int, AIResponseModel], None], callback:Callable[[int, AIResponseModel], None],
orders:list[str] = [] orders:list[str] = []
) -> int|None: ) -> tuple[int|None, AIResponseModel|None]:
i:int|None = None response:AIResponseModel|None = None
if key in self.__interpreters: if key in self.__interpreters:
session, response = self.__interpreters[key].request(session, message, callback, orders)
pool:str = self.__interpreters[key].pool return session, response
i = self.__pool_requests[pool].add(self.__interpreters[key], session, message, callback, orders)
self.__pool_requests[pool].execute()
return i
def cancel_request(self:Self, key:str, id:int) -> None:
if key in self.__interpreters:
pool:str = self.__interpreters[key].pool
if id in self.__pool_requests[pool].pool:
del self.__pool_requests[pool].pool[id]

View File

@ -58,7 +58,6 @@ class ControllersManager:
None) None)
def execute(self:Self, key:str, method:str, request:RequestModel) -> bool: def execute(self:Self, key:str, method:str, request:RequestModel) -> bool:
print([self.__controllers, key, method])
if key in self.__controllers and hasattr(self.__controllers[key], method): if key in self.__controllers and hasattr(self.__controllers[key], method):
getattr(self.__controllers[key], method)(request) getattr(self.__controllers[key], method)(request)
return True return True

View File

@ -89,15 +89,6 @@ class PseudoLoRAsManager:
for lora in self.__loras: for lora in self.__loras:
lora.clean_cache() lora.clean_cache()
def get_asynchronous(self:Self,
each_callback:Callable[[list[PseudoLoRAModel], list[PseudoLoRAModel], Callable[[list[PseudoLoRAModel]], None]], bool],
end_callback:Callable[[list[PseudoLoRAModel]], None],
keys:list[str] = [],
loras:Optional[list[PseudoLoRAModel]] = None,
results:list[PseudoLoRAModel] = []
) -> None:
Common.execute(each_callback, )
def get(self:Self, callback:Callable[[list[PseudoLoRAModel]], bool], keys:list[str] = []) -> list[tuple[str, str]]: def get(self:Self, callback:Callable[[list[PseudoLoRAModel]], bool], keys:list[str] = []) -> list[tuple[str, str]]:
next:list[PseudoLoRAModel] = [] next:list[PseudoLoRAModel] = []

View File

@ -0,0 +1,107 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Self, Any, Callable, Optional, Sequence
from Interfaces.Application.AnPInterface import AnPInterface
from Models.QueuesModel import QueuesModel
from Utils.Common import Common
class QueuesManager:
def __init__(self:Self, anp:AnPInterface) -> None:
self.anp:AnPInterface = anp
self.__queues:dict[str, QueuesModel] = {}
self.update()
def update(self:Self) -> None:
key:str
for key in ("default_queues_files", "queues_files", "default_queues", "queues"):
self.create(self.anp.settings.get(key), True)
def reset(self:Self) -> None:
self.__queues = {}
self.update()
def close(self:Self) -> None:
self.remove()
def create(self:Self, inputs:Any|None, overwrite:bool = False) -> None:
subinputs:dict[str, Any|None]
for subinputs in Common.load_json(inputs, True):
key:str
queue:Any|None
for key, queue in subinputs.items():
if overwrite or key not in self.__queues:
self.__queues[key] = QueuesModel(key, queue)
def add(self:Self, key:str, callback:Callable[[Callable[[], None]], None], *arguments:list[Any|None]) -> int|None:
if key in self.__queues:
return self.__queues[key].add(callback, *arguments)
return None
def next(self:Self) -> None:
queue:QueuesModel
for queue in self.__queues.values():
queue.next()
def cancel(self:Self, keys:Optional[str|Sequence[str]] = False, _is:Optional[int|Sequence[int]] = None) -> bool:
if keys is None:
queue:QueuesModel
for queue in self.__queues.values():
if _is is None:
queue.cancel_all()
else:
i:int
for i in Common.get_keys(_is):
queue.cancel(i)
return True
done:bool = False
key:str
for key in Common.get_keys(keys):
if key in self.__queues:
if _is is None:
self.__queues[key].cancel_all()
done = True
else:
i:int
for i in Common.get_keys(_is):
self.__queues[key].cancel(i)
done = True
return done
def remove(self:Self, keys:Optional[str|Sequence[str]] = None) -> bool:
if keys is None:
for queue in self.__queues.values():
queue.cancel_all()
self.__queues = {}
return True
done:bool = False
key:str
for key in Common.get_keys(keys):
if key in self.__queues:
del self.__queues[key]
done = True
return done

View File

@ -1,92 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Callable, Self
from threading import Thread, Lock
from Models.AIResponseModel import AIResponseModel
from Abstracts.AIInterpretersAbstract import AIInterpretersAbstract
from Utils.Common import Common
class AIPoolRequestsItemsModel:
def __init__(self:Self,
key:str,
interpreter:AIInterpretersAbstract,
session:int|None,
message:str,
callback:Callable[[int, AIResponseModel], None],
orders:list[str] = []
) -> None:
self.key:str = key
self.interpreter:AIInterpretersAbstract = interpreter
self.session:int|None = session
self.message:str = message
self.callback:Callable[[int, AIResponseModel], None] = callback
self.orders:list[str] = orders
class AIPoolRequestsModel:
def __init__(self:Self) -> None:
self.pool:dict[int, AIPoolRequestsItemsModel] = {}
self.iterations:int = 0
self.maximum_iterations:int = 1
self.i:int = 0
self.j:int = 0
self.__lock:Lock = Lock()
def add(self:Self,
interpreter:AIInterpretersAbstract,
session:int|None,
message:str,
callback:Callable[[int, AIResponseModel], None],
orders:list[str] = []
) -> int:
id:int
with self.__lock:
self.i += 1
id = self.i
self.pool[self.i] = AIPoolRequestsItemsModel(
interpreter.key, interpreter, session, message, callback, orders
)
self.execute()
return id
def __next(self:Self, callback:Callable[[int, AIResponseModel], None], session:int, response:AIResponseModel) -> None:
Common.execute(callback, session, response)
if response.done or not response.ok:
with self.__lock:
self.iterations -= 1
self.execute()
def __execute(self:Self) -> None:
item:AIPoolRequestsItemsModel|None = None
with self.__lock:
if len(self.pool) and self.iterations != self.maximum_iterations and self.i != self.j:
self.iterations += 1
self.j = min(self.pool.keys())
item:AIPoolRequestsItemsModel = self.pool[self.j]
del self.pool[self.j]
item and item.interpreter.request(
item.session, item.message, item.orders, lambda session, response: self.__next(item.callback, session, response)
)
def execute(self:Self) -> None:
Thread(target = self.__execute).start()
def cancel(self:Self, ids:int|list[int]) -> None:
with self.__lock:
for id in Common.get_keys(ids):
if id in self.pool:
del self.pool[id]
def remove(self:Self, key:str) -> None:
with self.__lock:
for id, item in list(self.pool.items()):
if item.key == key:
del self.pool[id]

View File

@ -0,0 +1,79 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Self, Any, Callable, Sequence, Optional
from threading import Thread, Lock
from Utils.Checks import Check
from Utils.Common import Common
class QueueItemModel:
def __init__(self:Self, i:int, callback:Callable[[Callable[[], None]], None], *arguments:list[Any|None]) -> None:
self.i:int = i
self.callback:Callable[[Callable[[], None]], None] = callback
self.arguments:list[Any|None] = arguments
self.executing:bool = False
self.thread:Thread|None = None
class QueuesModel:
def __init__(self:Self, key:str, inputs:Optional[int|dict[str, Any|None]|Sequence[Any|None]] = None) -> None:
if Check.is_integer(inputs):
inputs = {"maximum" : inputs}
self.key:str = key
self.items:dict[int, QueueItemModel] = {}
self.i:int = 0
self.p:int = 0
self.maximum:int = Common.get_value(("maximum", "maximum_concurrent", "maximum_simultaneous"), inputs, 1)
self.current:int = 0
def add(self:Self, callback:Callable[[Callable[[], None]], None], *arguments:list[Any|None]) -> int|None:
i:int|None = None
if Check.is_function(callback):
with Lock():
self.items[i := self.i] = QueueItemModel(self.i, callback, *arguments)
self.i += 1
self.next()
return i
def cancel(self:Self, i:int) -> bool:
with Lock():
if i in self.items and not self.items[i].executing:
del self.items[i]
return True
return False
def cancel_all(self:Self) -> None:
with Lock():
for i in list(self.items.keys()):
if not self.items[i].executing:
del self.items[i]
def __end(self:Self, p:int) -> None:
with Lock():
self.current -= 1
del self.items[p]
self.next()
def next(self:Self) -> None:
while self.p < self.i and self.current < self.maximum:
item:QueueItemModel
p:int|None = None
with Lock():
p = min((item.i for item in self.items.values() if not item.executing), default=None)
if p is None:
return
item = self.items[p]
item.executing = True
self.p = p
self.current += 1
item.thread = Thread(target = lambda:item.callback(lambda *results:self.__end(p, *results), *item.arguments))
item.thread.start()

View File

@ -27,6 +27,10 @@ class Check:
def is_boolean(item:Any|None) -> bool: def is_boolean(item:Any|None) -> bool:
return isinstance(item, bool) return isinstance(item, bool)
@staticmethod
def is_integer(item:Any|None) -> bool:
return isinstance(item, int)
@staticmethod @staticmethod
def is_function(item:Any|None) -> bool: def is_function(item:Any|None) -> bool:
return callable(item) return callable(item)