AnP/Python/Controllers/AIController.py

66 lines
2.4 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Self, Any, Callable
from Abstracts.ModelAbstract import ModelAbstract
from Abstracts.ControllerAbstract import ControllerAbstract
from Models.RequestModel import RequestModel
from Models.PseudoLoRAModel import PseudoLoRAModel
class AIController(ControllerAbstract, ModelAbstract):
def __test_execution(self:Self, end:Callable[[], None], request:RequestModel) -> None:
self.anp.ai_interpreters.request(
"anp_responses",
None,
request.get("message", "Hola, Gemma. ¿Me puedes ayudar a instalar una impresora Canon?"),
# lambda id, response: print((id, response.response, request.get("client_id"))),
lambda id, response: self.anp.web_socket_servers.send("anp", "ai", "test", {
"id" : id,
"response" : response.response
}, request.get("client_id")),
# [
# "# Títulos\n" + "".join(
# "\n - " + title for title in [
# "Información, gestión e instalación de Cividas",
# "Información, gestión e instalación de Impresoras/Fotocopiadoras/Multifuncionales Canon"
# ]
# ),
# ]
)
end()
def test(self:Self, request:RequestModel) -> None:
self.anp.queues.add("anp", self.__test_execution, request)
request.set_response({
"ok" : True,
"code" : 200,
"message" : "ok"
})
def __message_execution(self:Self, end:Callable[[], None], request:RequestModel) -> None:
session:str|None = None
session, _ = self.anp.ai_interpreters.request(
"anp_responses",
session,
request.get("message"),
lambda id, response: self.anp.web_socket_servers.send("anp", "ai", "message", {
"id" : id,
"response" : response.response,
"ok" : response.ok,
"done" : response.done,
"data_id" : request.get("message_id")
}, request.get("client_id"))
)
end()
def message(self:Self, request:RequestModel) -> None:
self.anp.queues.add("anp", self.__message_execution, request)
request.set_response({
"ok" : True,
"code" : 200,
"message" : "ok"
})