#!/usr/bin/env python3 # -*- coding: utf-8 -*- from typing import Self, Callable from Abstracts.ModelAbstract import ModelAbstract from Abstracts.ControllerAbstract import ControllerAbstract from Models.RequestModel import RequestModel from Models.AIResponseModel import AIResponseModel from Utils.Common import Common class AIController(ControllerAbstract, ModelAbstract): def __analyse_loras(self:Self, iterations:dict[str, int], conversation:str, message:str, request:RequestModel, loras:list[str] ) -> list[tuple[str, str]]: session:str|None = request.session.get("anp_loras_session") results:AIResponseModel session, results = self.anp.ai_interpreters.request( "anp_titles", session, conversation, message, self.anp.i18n.get("ai_controller_loras_system", { "items" : len(loras), "list" : "".join("\n" + str(i) + ". " + title for i, title in enumerate(loras)) }) ) results = Common.json_decode(results.response) iterations["i"] += 1 if session is not None or iterations["i"] == 1: request.session.set("anp_loras_session", session) return results def __message_execution(self:Self, end:Callable[[], None], request:RequestModel) -> None: session:str|None = request.session.get("anp_ai_session") message_id:str|None = request.get("message_id") client_id:str|None = request.get("client_id") conversation:str = request.get("conversation") message:str = request.get("message") loras:list[tuple[str, str]] iterations:dict[str, int] = {"i" : 0} self.anp.web_socket_servers.send("anp", "ai", "status", { "message" : message_id, "conversation" : conversation, "status" : "analyzing" }, client_id) loras = self.anp.pseudoloras.get(lambda loras:self.__analyse_loras(iterations, conversation, message, request, loras)) session, _ = self.anp.ai_interpreters.request( "anp_responses", session, conversation, message, self.anp.i18n.get("ai_controller_response_system", { "loras" : "".join("\n\n# " + title + "\n\n" + markdown for title, markdown in loras if markdown is not None) }) if len(loras) else [], lambda id, response: self.anp.web_socket_servers.send("anp", "ai", "message", { "id" : id, "conversation" : response.conversation, "response" : response.response, "ok" : response.ok, "done" : response.done, "message" : message_id, "conversation" : conversation, }, client_id) ) if session is not None: request.session.set("anp_ai_session", session) end() def message(self:Self, request:RequestModel) -> None: self.anp.queues.add("anp", self.__message_execution, request) request.set_response({ "ok" : True, "code" : 200, "message" : "ok" })