diff --git a/.gitignore b/.gitignore index e3dd274..55c94fc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /ollama -/open-webui \ No newline at end of file +/open-webui +/websockets \ No newline at end of file diff --git a/AIChat.py b/AIChat.py index 13bf15e..7047f1f 100644 --- a/AIChat.py +++ b/AIChat.py @@ -1,53 +1,442 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -from typing import Self, Any -from threading import Thread +from typing import Self, Any, Optional, Sequence from requests import post as Post, Response -from json import loads as json_decode +from os.path import dirname as directory_name, abspath as absolute_path, exists as path_exists +from io import FileIO +from re import compile as re_compile, Pattern as REPattern, Match as REMatch, IGNORECASE as RE_IGNORECASE +from threading import Thread +from json import loads as json_decode, dumps as json_encode +from http import server as http_server +from http.server import BaseHTTPRequestHandler, HTTPServer +from mimetypes import guess_type as read_mime_types +from websockets.sync.server import serve as web_socket_server_serve +from websockets import ServerConnection as WebSocketServer, ClientConnection as WebSocketClient + +class ClientRequestModel: + def __init__(self:Self, client:WebSocketClient, message:str) -> None: + self.client:WebSocketClient = client + self.message:str = message class AIChat: - PAYLOAD:dict[str, str|bool] = { - "model" : "gemma", - "stream" : True + ROOT_PATH:str = directory_name(absolute_path(__file__)) + SLASH:str = "/" if "/" in ROOT_PATH else "\\\\" + ROOTS_PATH:list[str] = ["", ROOT_PATH, ROOT_PATH + "/..", ROOT_PATH + "/../.."] + DEFAULT_SETTINGS:dict[str, Any|None] = { + "default_settings_files" : "/JSON/AIChat.settings.json", + "titles_model" : "gemma3:1b", + "titles_temperature" : 0.0, + "titles_prompt_file" : "/TXT/AIChat.titles-prompt.md", + "responses_model" : "gemma3", + "responses_temperature" : 7.0, + "response_with_titles_prompt_file" : "/TXT/AIChat.response-with-titles.md", + } + DEFAULT_SENTENCES:dict[str, dict[str, str|Sequence[str]]] = { + "espanol" : {} } - - def __init__(self:Self) -> None: - self.__working:bool = True - self.__thread:Thread = Thread(target=self.__listener) - self.__thread.start() - def send(self:Self, message:str) -> str: + RE_KEY:REPattern = re_compile(r"^[a-z_][a-z0-9_]*$", RE_IGNORECASE) + RE_STRING_VARIABLES:REPattern = re_compile(r"\{([a-z_][a-z0-9_]*)\}", RE_IGNORECASE) + RE_SLASHES:REPattern = re_compile(r"[\\\/]+") + RE_NEW_LINES:REPattern = re_compile(r"\r\n|\r|\n") + + def __init__(self:Self, inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None) -> None: + + self.__working:bool = True + self.__inputs:dict[str, Any|None] = self.get_dictionary(inputs) + self.__settings:dict[str, Any|None] = {} + self.__secrets:dict[str, Any|None] = {} + self.__sentences:dict[str, dict[str, str|Sequence[str]]] = {language : { + key : value for key, value in sentences.items() + } for language, sentences in self.DEFAULT_SENTENCES.items()} + self.__default_language:str = self.get("default_language", inputs, "espanol") + self.__language:str = self.get("language", inputs, self.__default_language) + self.__host:str = self.get("host", inputs, "localhost") + self.__port:int = self.get("port", inputs, 11434) + self.__api_url:str = self.get("api_url", inputs, "http://{host}:{port}/api/generate") + self.__index:dict[str, str] = {} + self.__requests_order:list[int] = [] + self.__requests:dict[int, ClientRequestModel] = {} + self.__web_socket_server:WebSocketServer + self.__web_socket_host:str = self.get("web_socket_host", inputs, "localhost") + self.__web_socket_port:int = self.get("web_socket_port", inputs, 18001) + self.__web_socket_server_thread:Thread = Thread(target = self.__run_web_socket_server) + self.__titles_model:str = self.get("titles_model", inputs) + self.__titles_temperature:float = self.get("titles_temperature", inputs) + self.__titles_prompt:str = self.RE_NEW_LINES.sub("\\n", self.load_file(self.get("titles_prompt_file", inputs))) + self.__response_model:str = self.get("responses_model", inputs) + self.__response_temperature:float = self.get("responses_temperature", inputs) + self.__response_with_titles:str = self.RE_NEW_LINES.sub("\\n", self.load_file(self.get("response_with_titles_prompt_file", inputs))) + self.__http_server:HTTPServer + self.__http_server_thread:Thread = Thread(target = self.__run_http_server) + self.__http_host:str = self.get("http_host", inputs, "") + self.__http_port:int = self.get("http_port", inputs, 18000) + self.__http_directories:list[str] = self.get("http_indexes", inputs, ["/Public"]) + self.__http_indexes:list[str] = self.get("http_indexes", inputs, ["", "index.html", "index.htm", "index.md", "index.txt"]) + self.__terminal_thread:Thread = Thread(target = self.__terminal) + self.__web_sockets_clients:list[WebSocketClient] = [] + + self.__terminal_thread.start() + self.__http_server_thread.start() + self.__web_socket_server_thread.start() + + def close(self:Self) -> None: + + client:WebSocketClient + + self.__working = False + + for client in self.__web_sockets_clients: + try: + client.close() + except Exception as exception: + pass + self.__web_socket_server.shutdown() + self.__web_socket_server_thread.join() + + self.__http_server.shutdown() + + def get(self:Self, + keys:str|Sequence[str], + inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None, + default:Optional[Any|None] = None + ) -> Any|None: + return self.get_value(keys, ( + inputs, self.__inputs, self.__secrets, self.__settings, self.DEFAULT_SETTINGS + ), default) + + def __get_sentence(self:Self, texts:str|Sequence[str], custom_language:Optional[str] = None) -> str|Sequence[str]: + + language:str + done:list[str] = [] + keys:list[str] = self.get_keys(texts) + + for language in [ + custom_language, self.__language, self.__default_language + ] + list(self.__sentences.keys()): + if language not in done and language in self.__sentences: + + key:str + + done.append(language) + + for key in keys: + if key in self.__sentences[language]: + return self.__sentences[language][key] + return self.get_texts(texts)[0] + + def i18n(self:Self, + texts:str|Sequence[str], + inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None, + language:Optional[str] = None + ) -> str: + + text:str|Sequence[str] = self.__get_sentence(texts, language) + + return self.string_variables("".join(text) if isinstance(text, (list, tuple)) else str(text), inputs) + + def __terminal(self:Self) -> None: + while self.__working: + try: + + command:str = input() + + if command in ("exit", "close", "quit", "stop", "bye"): + self.close() + else: + print(self.i18n("unknown_command", {"command" : command})) + + except Exception as exception: + print(exception) + + def __run_web_socket_server(self:Self) -> None: + with web_socket_server_serve(self.__web_socket_handler, self.__web_socket_host, self.__web_socket_port) as self.__web_socket_server: + self.__web_socket_server.serve_forever() + + def __web_socket_handler(self:Self, client:WebSocketClient) -> None: + if client not in self.__web_sockets_clients: + self.__web_sockets_clients.append(client) + try: + for message in client: + try: + print(message) + except Exception as exception: + print(exception) + except Exception as exception: + print(exception) + self.__web_sockets_clients.remove(client) + print("Client disconnected") + + def __run_http_server(self:Self) -> None: + self.__http_server = HTTPServer((self.__http_host, self.__http_port), self.HTTPRequestHandler) + self.__http_server.aichat = self + self.__http_server.serve_forever() + + def get_http_indexes(self:Self) -> list[str]: + return [*self.__http_indexes] + + def get_http_directories(self:Self) -> list[str]: + return [*self.__http_directories] + + class HTTPRequestHandler(BaseHTTPRequestHandler): + + def do_GET(self:Self) -> None: + + aichat:AIChat = self.server.aichat + data:bytes|None = None + directory:str + + for directory in aichat.get_http_directories(): + + path:str = directory + self.path + index:str + + for index in aichat.get_http_indexes(): + if (data := aichat.load_file(path + ("/" + index if index else ""), "rb")) is not None: + break + + if data is None: + self.send_response(404) + self.end_headers() + else: + + mime:str = (read_mime_types(path) or ("application/octet-stream",))[0] + + self.send_response(200) + self.send_header("Content-Type", mime) + self.send_header("Content-Length", str(len(data))) + self.end_headers() + + self.wfile.write(data) + + def do_POST(self:Self) -> None: + + path:str = self.path + + if path == "/api/send": + pass + else: + self.send_response(404) + self.end_headers() + + def get_titles(self:Self, message:str) -> list[str]: try: response:Response - with Post('http://localhost:11434/api/chat', json = {**self.PAYLOAD, "prompt" : message}) as response: + with Post(self.string_variables(self.__api_url, { + "host" : self.__host, + "port" : self.__port + }), json = { + "model" : self.__titles_model, + "prompt" : self.string_variables(self.__titles_prompt, {"message" : message}), + "format" : "json", + "stream" : False, + "options" : { + "temperature" : self.__titles_temperature + } + }) as response: + if response.status_code == 200: + return response.json().get("titles") + except Exception as exception: + pass + return [] + + def send(self:Self, message:str, client:WebSocketClient) -> str: - line:bytes + titles:list[str] = self.get_titles(message) + + try: + + response:Response + + with Post(self.string_variables(self.__api_url, { + "host" : self.__host, + "port" : self.__port + }), json = { + "model" : self.__titles_model, + "prompt" : self.string_variables(self.__response_with_titles, { + "message" : message, + "guides" : "\n\n".join("## " + title + "\n\n" + self.load_file(self.__index[title]) for title in titles) + }) if len(titles) else message, + "stream" : True, + "options" : { + "temperature" : self.__titles_temperature + } + }, stream = True) as response: + + line:str for line in response.iter_lines(): if line: - - chunk:dict[str, Any|None] = json_decode(line) - - print(chunk.get("response", ""), end = "", flush = True) - - if chunk.get("done"): - break + client.send({"chunk" : json_decode(line)}) except Exception as exception: - print(f"An error occurred while sending the message: {exception}") + pass + return [] - def __listener(self:Self) -> None: - while self.__working: + @classmethod + def get_keys(cls:type[Self], *items:list[Any|None]) -> list[str]: - user_input:str = input('> ') + keys:list[str] = [] + item:Any|None - if user_input in ("close", "exit", "quit"): - self.__working = False - else: - self.send(user_input) + for item in items: + if isinstance(item, str): + item not in keys and cls.RE_KEY.match(item) and keys.append(item) + elif isinstance(item, list): -ai_chat = AIChat() \ No newline at end of file + key:str + + for key in cls.get_keys(*item): + key not in keys and keys.append(key) + + return keys + + @classmethod + def get_texts(cls:type[Self], *items:list[Any|None]) -> list[str]: + + texts:list[str] = [] + item:Any|None + + for item in items: + if isinstance(item, str): + texts.append(item) + elif isinstance(item, (list, tuple)): + + text:str + + for text in cls.get_texts(*item): + texts.append(text) + + return texts + + @classmethod + def get_dictionaries(cls:type[Self], *items:list[Any|None]) -> list[dict[str, Any|None]]: + + dictionaries:list[dict[str, Any|None]] = [] + item:Any|None + + for item in items: + if isinstance(item, dict): + dictionaries.append(item) + elif isinstance(item, (list, tuple)): + + dictionary:dict[str, Any|None] + + for dictionary in cls.get_dictionaries(*item): + dictionaries.append(dictionary) + + return dictionaries + + @classmethod + def get_dictionary(cls:type[Self], inputs:Any|None, overwrite:bool = False) -> dict[str, Any|None]: + + dictionary:dict[str, Any|None] = {} + + if isinstance(inputs, dict): + + key:str + value:Any|None + + for key, value in inputs.items(): + if overwrite or key not in dictionary: + dictionary[key] = value + + elif isinstance(inputs, (list, tuple)): + + subinputs:dict[str, Any|None] + + for subinputs in inputs: + + key:str + value:Any|None + + for key, value in cls.get_dictionaries(subinputs).items(): + if overwrite or key not in dictionary: + dictionary[key] = value + + return dictionary + + @classmethod + def get_value(cls:type[Self], + keys:str|Sequence[str], + inputs:dict[str, Any|None]|Sequence[Any|None], + default:Optional[Any|None] = None + ) -> Any|None: + if len(keys := cls.get_keys(keys)): + + subinputs:dict[str, Any|None] + + for subinputs in cls.get_dictionaries(inputs): + + key:str + + for key in keys: + if key in subinputs: + return subinputs[key] + return default + + @classmethod + def string_variables(cls:type[Self], + string:str, + inputs:dict[str, Any|None]|Sequence[Any|None], + default:Optional[str] = None + ) -> str: + + inputs = cls.get_dictionary(inputs) + + def replace(matches:REMatch) -> str: + + key:str = matches.group(1) + + return ( + str(inputs[key]) if key in inputs else + str(default) if default is not None else + matches.groups(0)) + + return cls.RE_STRING_VARIABLES.sub(replace, string) + + @classmethod + def fix_path(cls:type[Self], path:str) -> str: + path = cls.RE_SLASHES.sub(cls.SLASH, path) + # return "\\" + path if path[0] == "\\" else path + return path + + @classmethod + def get_absolute_path(cls:type[Self], path:str) -> str|None: + + root:str + + for root in cls.ROOTS_PATH: + + absolute_path:str = cls.fix_path((root + cls.SLASH if root else "") + path) + + if path_exists(absolute_path): + return absolute_path + return None + + @classmethod + def load_file(cls:type[Self], path:str, mode:str = "r") -> str|bytes|None: + try: + + file:FileIO + + if mode == "r": + for format in ("utf8", "cp1252", "cp850"): + try: + with open(cls.get_absolute_path(path), mode, encoding = format) as file: + return file.read() + except: + pass + elif mode == "rb": + with open(cls.get_absolute_path(path), mode) as file: + return file.read() + except Exception as exception: + # print(exception) + pass + return None + +aichat:AIChat = AIChat() \ No newline at end of file diff --git a/JSON/AIChat.settings.json b/JSON/AIChat.settings.json new file mode 100644 index 0000000..0bc8a67 --- /dev/null +++ b/JSON/AIChat.settings.json @@ -0,0 +1,6 @@ +{ + "autostart" : true, + "default_settings_files" : "/JSON/AIChat.settings.json", + "ai_model" : "gemma", + "ai_stream" : true +} \ No newline at end of file diff --git a/Public/ecma/AIChat.ecma.js b/Public/ecma/AIChat.ecma.js new file mode 100644 index 0000000..fb7d449 --- /dev/null +++ b/Public/ecma/AIChat.ecma.js @@ -0,0 +1,420 @@ +"use strict"; + +/** + * @class AIChat + * @constructor + * @param {?(Object.|Array.)} [inputs = nulls] + * @returns {void} + * @access public + * @static + */ +export const AIChat = (function(){ + + /** + * @callback aichat_init_callback + * @param {!boolean} ok + * @returns {boolean} + */ + + /** + * @callback aichat_preload_callback + * @param {?HTMLElement} item + * @param {!boolean} asynchronous + * @param {!boolean} ok + * @returns {void} + */ + + /** + * @typedef {HTMLElement|string|[string, Object.|null, HTMLElement|string|Array.|null]} aichat_html_item + */ + + /** + * @constructs AIChat + * @param {?(Object.|Array.)} [inputs = nulls] + * @returns {void} + * @access private + * @static + */ + const AIChat = function(inputs = null){ + + /** @type {AIChat} */ + const self = this, + /** @type {Object.} */ + settings = {}, + /** @type {Object.} */ + secrets = {}; + /** @type {boolean} */ + let started = false, + /** @type {number} */ + preload_timeout = 2000, + /** @type {number} */ + frames_per_second = 60, + /** @type {WebSocket|null} */ + web_socket_client = null; + + /** + * @returns {void} + * @access private + */ + const constructor = () => { + + self.get("autostart") && self.start(); + + }; + + /** + * @param {?aichat_init_callaback} [callback = null] + * @returns {boolean} + * @access public + */ + this.start = (callback = null) => { + + /** @type {aichat_init_callaback} */ + const end = ok => typeof callback == "function" ? callback(ok) : ok; + + if(started) + return end(false); + started = true; + + preload_timeout = self.get(["preload_timeout", "timeout"], null, preload_timeout); + frames_per_second = self.get(["frames_per_second", "fps"], null, frames_per_second); + + self.preload(self.get("position", null, "body"), (item, asynchronous, ok) => { + if(ok){ + build(item); + try{ + + web_socket_client = new WebSocket(self.get("web_socket_url", null, "ws://localhost:18001/")); + + web_socket_client.onopen = on_web_socket_open; + web_socket_client.onmessage = on_web_socket_message; + web_socket_client.onerror = on_web_socket_error; + web_socket_client.onclose = on_web_socket_close; + + }catch(exception){ + console.error("Failed to connect to the WebSocket server:", exception); + }; + }; + end(ok); + }); + + return true; + }; + + /** + * @param {?aichat_init_callaback} [callback = null] + * @returns {boolean} + * @access public + */ + this.stop = (callback = null) => { + + /** @type {aichat_init_callaback} */ + const end = ok => typeof callback == "function" ? callback(ok) : ok; + + if(!started) + return end(false); + started = false; + + return end(true); + }; + + const on_web_socket_open = event => { + web_socket_client.send("Hello, WebSocket server!"); + }; + + const on_web_socket_message = event => {}; + + const on_web_socket_error = event => {}; + + const on_web_socket_close = event => {}; + + /** + * @param {!(string|Array.)} keys + * @param {!(Object.|Array.)} inputs + * @param {?any} [_default = null] + * @returns {any|null} + * @access public + */ + this.get = (keys, subinputs, _default = null) => AIChat.get_value(keys, [ + subinputs, inputs, secrets, settings, AIChat.DEFAULT_SETTINGS + ], _default); + + /** + * @param {!(string|HTMLElement)} selector + * @param {!aichat_preload_callback} callback + * @returns {void} + * @access public + */ + this.preload = (selector, callback) => { + if(typeof callback != "function") + return; + if(AIChat.is_html_item(selector)) + return callback(selector, false, true); + if(typeof selector != "string") + return callback(null, false, false); + + /** @type {HTMLElement|null} */ + let item; + + try{ + if(item = document.querySelector(selector)) + return callback(item, false, true); + }catch(exception){ + return callback(null, false, false); + }; + + /** @type {number} */ + const date = Date.now(); + /** @type {number} */ + let interval = setInterval(() => { + if(item = document.querySelector(selector)){ + clearInterval(interval); + callback(item, true, true); + }else if(Date.now() - date > preload_timeout){ + clearInterval(interval); + callback(null, true, false); + }; + }, 1000 / frames_per_second); + + }; + + /** + * @param {!HTMLElement} item + * @return {void} + * @access private + */ + const build = item => { + AIChat.HTML(item, ["div", {class: "aichat"}, [ + ["header", null, [ + ["h1", null, "AI Chat"] + ]], + ["main", null, [ + ["fieldset", {class : "chat"}, [ + ["legend", {data_i18n : "chat"}, "Chat"], + ["section", {"class" : "messages"}], + ["form", { + action : "#", + method : "post", + on_submit : send + }, [ + ["div", null, [ + ["textarea", { + name : "message", + data_i18n : "type_message", + data_i18n_without : true, + placeholder : "Type your message here..." + }] + ]], + ["button", { + type : "submit", + data_i18n : "send", + data_i18n_without : true, + title : "Send" + }, [ + ["span", {data_icon : "send"}], + ["span", {data_i18n : "send"}, "Send"] + ]], + ["button", { + type : "reset", + data_i18n : "clean", + data_i18n_without : true, + title : "Clean" + }, [ + ["span", {data_icon : "clean"}], + ["span", {data_i18n : "clean"}, "Clean"] + ]] + ]] + ]] + ]], + ["footer"] + ]]); + }; + + /** + * @param {!HTMLElement} item + * @param {!Event} event + * @returns {any|null|void} + * @access private + */ + const send = (item, event) => { + + event.preventDefault(); + + web_socket_client.send(document.querySelector(".aichat form textarea").value); + + return false; + }; + + constructor(); + + }; + + /** @type {Object.} */ + AIChat.DEFAULT_SETTINGS = { + /** @type {boolean} */ + autostart : true, + /** @type {number} */ + timeout : 2000, + /** @type {number} */ + preload_timeout : 2000, + /** @type {number} */ + frames_per_second : 60, + /** @type {string} */ + position : "body" + }; + + /** + * @param {?any} item + * @returns {boolean} + * @access public + * @static + */ + AIChat.is_key = item => typeof item == "string" && /^[a-z_][a-z0-9_]*$/i.test(item); + + /** + * @param {?any} item + * @returns {boolean} + * @access public + * @static + */ + AIChat.is_dictionary = item => item && item.constructor === Object; + + /** + * @param {?any} item + * @returns {boolean} + * @access public + * @static + */ + AIChat.is_html_item = item => item && (item.tagName || item.nodeName); + + /** + * @param {...?any} items + * @returns {Array.} + * @access public + * @static + */ + AIChat.get_keys = (...items) => { + + /** @type {Array.} */ + const keys = []; + + items.forEach(item => { + if(AIChat.is_key(item)) + keys.includes(item) || keys.push(item); + else if(Array.isArray(items)) + AIChat.get_keys(...item).forEach(key => keys.includes(key) || keys.push(key)); + }); + + return keys; + }; + + /** + * @param {...?any} items + * @returns {Array.>} + * @access public + * @static + */ + AIChat.get_dictionaries = (...items) => { + + /** @type {Array.>} */ + const dictionaries = []; + + items.forEach(item => { + if(AIChat.is_dictionary(item)) + dictionaries.includes(item) || dictionaries.push(item); + else if(item instanceof Array) + AIChat.get_dictionaries(...item).forEach(dictionary => { + dictionaries.includes(dictionary) || dictionaries.push(dictionary); + }); + }); + + return dictionaries; + }; + + /** + * @param {!(string|Array.)} keys + * @param {!(Object.|Array.)} inputs + * @param {?any} [_default = null] + * @returns {any|null} + * @access public + * @static + */ + AIChat.get_value = (keys, inputs, _default = null) => { + if((keys = AIChat.get_keys(keys)).length) + for(const subinputs of AIChat.get_dictionaries(inputs)) + for(const key of keys) + if(key in subinputs) + return subinputs[key]; + return _default; + }; + + /** + * @param {!(HTMLElement|string)} item + * @param {!Object.} attributes + * @returns {void} + * @access public + * @static + */ + AIChat.set_attributes = (item, attributes) => { + + typeof item == "string" && (item = document.querySelector(item)); + + if(AIChat.is_html_item(item)) + for(const key in attributes){ + + /** @type {any|null} */ + const value = attributes[key]; + + if(/^on/i.test(key) && typeof value == "function") + item.addEventListener(key.replace(/^on[-_]?|-/ig, ""), event => value(item, event)); + else + item.setAttribute(key.replace(/[^a-z0-9]+/ig, "-"), value); + + }; + + }; + + /** + * @param {...aichat_html_item} items + * @returns {Array.} + * @access public + * @static + */ + AIChat.HTML = (...items) => { + + /** @type {boolean} */ + const has_html_item = AIChat.is_html_item(typeof items[0] == "string" ? items[0] = document.querySelector(items[0]) || items[0] : items[0]), + /** @type {DocumentFragment} */ + fragment = document.createDocumentFragment(); + + (has_html_item ? items.slice(1) : items).forEach(item => { + if(AIChat.is_html_item(item)) + fragment.appendChild(item); + else if(typeof item == "string") + fragment.appendChild(document.createTextNode(item)); + else if(Array.isArray(item)){ + + /** @type {[string, Object.|null, HTMLElement|string|Array.|null]} */ + const [tag, attributes, children] = item.concat(null, null), + /** @type {!HTMLElement} */ + child = fragment.appendChild(document.createElement(tag)); + + AIChat.is_dictionary(attributes) && AIChat.set_attributes(child, attributes); + + if(AIChat.is_html_item(children)) + child.appendChild(children); + else if(typeof children == "string") + child.appendChild(document.createTextNode(children)); + else if(Array.isArray(children)) + AIChat.HTML(child, ...children); + + }; + }); + + has_html_item && items[0].appendChild(fragment); + + return fragment.childNodes; + }; + + return AIChat; +})(); \ No newline at end of file diff --git a/Public/index.html b/Public/index.html new file mode 100644 index 0000000..3a945dd --- /dev/null +++ b/Public/index.html @@ -0,0 +1,24 @@ + + + + AIChat + + + + + + + + + + + + \ No newline at end of file diff --git a/Python/Application/AIChat.py b/Python/Application/AIChat.py new file mode 100644 index 0000000..6f208c9 --- /dev/null +++ b/Python/Application/AIChat.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any, Sequence, Optional, Callable +from threading import Thread +from requests import post as Post, Response +from json import loads as json_decode +from re import compile as re_compile, Pattern as REPattern, IGNORECASE as RE_IGNORECASE +from websockets.sync import server as WebSocketServer +from Utils.Utils import Utils +from Utils.Check import Check +from Drivers.FilesDriver import FilesDriver + +class PayloadModel: + def __init__(self:Self) -> None: + self.model:str = "gemma" + self.stream:bool = True + +class AIChat: + + DEFAULT_SETTINGS:dict[str, Any|None] = { + "autostart" : True, + "ai_model" : "gemma", + "ai_stream" : True + } + + def __init__(self:Self, inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None) -> None: + + self.__inputs:dict[str, Any|None] = self.get_dictionary(inputs, overwrite = True) + self.__working:bool = False + self.__started:bool = False + self.__thread:Thread|None = None + self.__payload:PayloadModel = PayloadModel() + + self.files:FilesDriver = FilesDriver(self) + + def start(self:Self, callback:Optional[Callable[[bool], bool]] = None) -> bool: + + end:Callable[[bool], bool] = lambda ok: callback(ok) if callable(callback) else ok + + if self.__started: + return end(False) + self.__started = True + + self.__working = True + self.__payload.model = self.get("ai_model", self.__payload.model) + self.__thread = Thread(target = self.__listener) + self.__thread.start() + + return end(True) + + def close(self:Self, callback:Optional[Callable[[bool], bool]] = None) -> bool: + + end:Callable[[bool], bool] = lambda ok: callback(ok) if callable(callback) else ok + + if not self.__started: + return end(False) + self.__started = False + + self.__working = False + + return end(True) + + def load_json(self:Self, data:Any|None, only_dictionary:bool = True) -> list[list[Any|None]|dict[str, Any|None]]: + + items:list[list[Any|None]|dict[str, Any|None]] = [] + + if Check.is_string(data): + items.extends(self.files.load_json(data)) + elif Utils.is_array(data): + if only_dictionary: + + item:Any|None + + for item in data: + items.extend(self.load_json(item, only_dictionary)) + + else: + items.append(data) + elif Utils.is_dictionary(data): + items.append(data) + + return items + + def send(self:Self, message:str) -> str: + try: + + response:Response + + with Post('http://localhost:11434/api/chat', json = {**self.PAYLOAD, "prompt" : message}) as response: + + line:bytes + + for line in response.iter_lines(): + if line: + + chunk:dict[str, Any|None] = json_decode(line) + + print(chunk.get("response", ""), end = "", flush = True) + + if chunk.get("done"): + break + + except Exception as exception: + print(f"An error occurred while sending the message: {exception}") + + def __listener(self:Self) -> None: + while self.__working: + + user_input:str = input('> ') + + if user_input in ("close", "exit", "quit"): + self.__working = False + else: + self.send(user_input) + +ai_chat = AIChat() \ No newline at end of file diff --git a/Python/Drivers/FilesDriver.py b/Python/Drivers/FilesDriver.py new file mode 100644 index 0000000..b7c83ea --- /dev/null +++ b/Python/Drivers/FilesDriver.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any, Optional, Sequence +from os.path import exists as path_exists, isfile as is_file, isdir as is_directory, dirname as directory_name, abspath as absolute_path +from io import FileIO +from json import loads as json_decode +from Interfaces.Application.AIChatInterface import AIChatInterface +from Utils.Patterns import RE + +class FilesDriver: + + ROOT:str = directory_name(absolute_path(__file__)) + SLASH:str = "/" if "/" in ROOT else "\\\\" + + def __init__(self:Self, aichat:AIChatInterface) -> None: + self.aichat:AIChatInterface = aichat + self.__root_paths:list[str] = ["", self.ROOT] + + for _ in range(2): + self.__root_paths.append(RE.PARENT_DIRECTORY.sub(r'\1', self.__root_paths[-1])) + + @classmethod + def fix_path(cls:type[Self], path:str) -> str: + return RE.SLASHES.sub(cls.SLASH, path) + + def get_absolute_path(self:Self, path:str) -> str|None: + + root:str + + for root in self.__root_paths: + + absolute:str = self.fix_path((root + self.slash if root else "") + path) + + if path_exists(absolute): + return absolute + return None + + def exists(self:Self, path:str) -> bool: + return self.get_absolute_path(path) is not None + + def is_file(self:Self, path:str) -> bool: + + absolute:str = self.get_absolute_path(path) + + return is_file(absolute) if absolute else False + + def is_directory(self:Self, path:str) -> bool: + + absolute:str = self.get_absolute_path(path) + + return is_directory(absolute) if absolute else False + + def load(self:Self, path:str, mode:str = "r") -> str|bytes|None: + + absolute:str = self.get_absolute_path(path) + + if absolute and is_file(absolute): + + file:FileIO + + try: + with open(absolute, mode) as file: + return file.read() + except Exception as exception: + pass + return None + + def load_json(self:Self, path:str) -> str|bytes|None: + + json:Sequence[Any]|dict[str, Any|None]|None + + try: + json = json_decode(path) + if json != None: + return json + except Exception as exception: + pass + + data:str|None = self.load(path) + + if data: + try: + return json_decode(data) + except Exception as exception: + pass + + return None + + def save(self:Self, path:str, content:str|bytes, mode:str = "w") -> bool: + try: + + file:FileIO + + with open(self.fix_path(path), mode) as file: + file.write(content) + return True + except Exception as exception: + pass + return False \ No newline at end of file diff --git a/Python/Interfaces/Application/AIChatInterface.py b/Python/Interfaces/Application/AIChatInterface.py new file mode 100644 index 0000000..b0da502 --- /dev/null +++ b/Python/Interfaces/Application/AIChatInterface.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any, Optional, Sequence +from abc import ABC, abstractmethod +from Interfaces.Drivers.FilesDriverInterface import FilesDriverInterface +from Interfaces.Managers.SettingsManagerInterface import SettingsManagerInterface + +class AIChatInterface(ABC): + + def __init__(self:Self, inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None) -> None: + self.files:FilesDriverInterface = None + self.settings:SettingsManagerInterface = None \ No newline at end of file diff --git a/Python/Interfaces/Drivers/FilesDriverInterface.py b/Python/Interfaces/Drivers/FilesDriverInterface.py new file mode 100644 index 0000000..049bd46 --- /dev/null +++ b/Python/Interfaces/Drivers/FilesDriverInterface.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self +from abc import ABC, abstractmethod + +class FilesDriverInterface(ABC): + + @abstractmethod + def get_absolute_path(self:Self, path:str) -> str|None:pass + + @abstractmethod + def exists(self:Self, path:str) -> bool:pass + + @abstractmethod + def is_file(self:Self, path:str) -> bool:pass + + @abstractmethod + def is_directory(self:Self, path:str) -> bool:pass + + @abstractmethod + def load(self:Self, path:str, mode:str = "r") -> str|bytes|None:pass + + @abstractmethod + def load_json(self:Self, path:str) -> str|bytes|None:pass + + @abstractmethod + def save(self:Self, path:str, content:str|bytes, mode:str = "w") -> bool:pass \ No newline at end of file diff --git a/Python/Interfaces/Managers/SettingsManagerInterface.py b/Python/Interfaces/Managers/SettingsManagerInterface.py new file mode 100644 index 0000000..efb44c1 --- /dev/null +++ b/Python/Interfaces/Managers/SettingsManagerInterface.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any, Optional, Sequence +from abc import ABC, abstractmethod + +class SettingsManagerInterface(ABC): + + @abstractmethod + def get(self:Self, + keys:str|Sequence[str], + inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None, + default:Optional[Any] = None + ) -> Any|None:pass + + @abstractmethod + def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None:pass + + @abstractmethod + def add_secrets(self:Self, inputs:Any|None, overwrite:bool = False) -> None:pass \ No newline at end of file diff --git a/Python/Managers/I18NManager.py b/Python/Managers/I18NManager.py new file mode 100644 index 0000000..5835650 --- /dev/null +++ b/Python/Managers/I18NManager.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any, Optional, Sequence +from Utils.Utils import Utils +from Interfaces.Application.AIChatInterface import AIChatInterface + +class SettingsManager: + + DEFAULT_SENTENCES:dict[str, dict[str, str]] = { + "english" : {} + } + + def __init__(self:Self, aichat:AIChatInterface) -> None: + self.aichat:AIChatInterface = aichat + + self.__sentences:dict[str, dict[str, str|list[str]]] = {} + self.__default_language:str = "english" + self.__language:str = "english" + + for key in ( + "default_i18n_files", "i18n_files", + "default_i18n", "i18n" + ): + self.add(self.get(key), overwrite = True) + + def __get(self:Self, texts:str|Sequence[str], language:Optional[str] = None) -> Any|None: + + keys:list[str] = Utils.get_keys(texts) + + if len(keys): + + language:str + done:list[str] = [] + + for language in (self.__language, self.__default_language) + tuple(Utils.get_keys(self.__sentences)): + if language in self.__sentences and language not in done: + + key:str + + done.append(language) + + for key in keys: + if key in self.__sentences[language]: + return self.__sentences[language][key] + return Utils.get_texts(texts)[0] + + def get(self:Self, + texts:str|Sequence[str], + inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None, + language:Optional[str] = None + ) -> Any|None: + + text:str|list[str]|None = self.__get(texts, language) + + return Utils.string_variables(( + "".join(text) if Utils.is_array(text) else + text), inputs) + + def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None: + + block:dict[str, Any|None] + + for block in Utils.get_dictionaries(inputs): + + language:str + sentences:dict[str, Any|None] + + for key, value in block.items(): + if overwrite or key not in self.__sentences: + self.__i18n[key] = value \ No newline at end of file diff --git a/Python/Managers/SettingsManager.py b/Python/Managers/SettingsManager.py new file mode 100644 index 0000000..278a2ff --- /dev/null +++ b/Python/Managers/SettingsManager.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any, Optional, Sequence +from Utils.Utils import Utils +from Interfaces.Application.AIChatInterface import AIChatInterface + +class SettingsManager: + + DEFAULT_SETTINGS:dict[str, Any|None] = { + "autostart" : True, + "default_settings_files" : "/JSON/AIChat.settings.json" + } + + def __init__(self:Self, + aichat:AIChatInterface, + inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None + ) -> None: + self.aichat:AIChatInterface = aichat + + self.__inputs:dict[str, Any|None] = self.get_dictionary(inputs) + self.__settings:dict[str, Any|None] = {} + self.__secrets:dict[str, Any|None] = {} + + for key in ( + "default_settings_files", "settings_files", + "default_settings", "settings" + ): + self.add(self.get(key), overwrite = True) + + for key in ( + "default_secrets_files", "secrets_files", + "default_secrets", "secrets" + ): + self.add_secrets(self.get(key), overwrite = True) + + def get(self:Self, + keys:str|Sequence[str], + inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None, + default:Optional[Any] = None + ) -> Any|None: + return Utils.get_values(keys, ( + inputs, self.__inputs, self.__secrets, self.__settings, self.aichat.DEFAULT_SETTINGS + ), default) + + def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None: + + block:dict[str, Any|None] + + for block in Utils.get_dictionaries(inputs): + + key:str + value:Any|None + + for key, value in block.items(): + if overwrite or key not in self.__settings: + self.__settings[key] = value + + def add_secrets(self:Self, inputs:Any|None, overwrite:bool = False) -> None: + + block:dict[str, Any|None] + + for block in Utils.get_dictionaries(inputs): + + key:str + value:Any|None + + for key, value in block.items(): + if overwrite or key not in self.__secrets: + self.__secrets[key] = value \ No newline at end of file diff --git a/Python/Utils/Check.py b/Python/Utils/Check.py new file mode 100644 index 0000000..0246f73 --- /dev/null +++ b/Python/Utils/Check.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Any, Self +from Utils.Patterns import RE + +class Check: + + @staticmethod + def is_string(item:Any|None) -> bool: + return isinstance(item, str) + + @classmethod + def is_key(cls:type[Self], item:Any|None) -> bool: + return cls.is_string(item) and RE.KEY.match(item) + + @staticmethod + def is_dictionary(item:Any|None) -> bool: + return isinstance(item, dict) + + @staticmethod + def is_array(item:Any|None) -> bool: + return isinstance(item, (list, tuple)) \ No newline at end of file diff --git a/Python/Utils/Patterns.py b/Python/Utils/Patterns.py new file mode 100644 index 0000000..c3c9d99 --- /dev/null +++ b/Python/Utils/Patterns.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from re import compile as re_compile, Pattern as REPattern, IGNORECASE as RE_IGNORECASE + +class RE: + + KEY:REPattern = re_compile(r"^[a-z_][a-z0-9_]*$", RE_IGNORECASE) + PARENT_DIRECTORY:REPattern = re_compile(r'^(.+)[\/\\][^\/\\]+[\/\\]*$') + SLASHES:REPattern = re_compile(r'[\/\\]+') + STRING_VARIABLES:REPattern = re_compile(r'\{([a-z_][a-z0-9_]*)\}', RE_IGNORECASE) \ No newline at end of file diff --git a/Python/Utils/Utils.py b/Python/Utils/Utils.py new file mode 100644 index 0000000..374d097 --- /dev/null +++ b/Python/Utils/Utils.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Self, Any, Sequence, Optional +from Utils.Check import Check +from Utils.Patterns import RE + +class Utils: + + @classmethod + def get_keys(cls:type[Self], *items:list[Any|None]) -> list[str]: + + keys:list[str] = [] + item:Any|None + + for item in items: + if Check.is_key(item): + if item not in keys: + keys.append(item) + elif Check.is_array(item): + + key:str + + for key in cls.get_keys(*item): + if key not in keys: + keys.append(key) + + return keys + + @classmethod + def get_dictionaries(cls:type[Self], *items:list[Any|None]) -> list[dict[str, Any|None]]: + + dictionaries:list[dict[str, Any|None]] = [] + item:Any|None + + for item in items: + if Check.is_dictionary: + dictionaries.append(item) + elif Check.is_array(item): + + dictionary:dict[str, Any|None] + + for dictionary in cls.get_dictionaries(*item): + dictionaries.append(dictionary) + + return dictionaries + + @classmethod + def get_dictionary(cls:type[Self], inputs:Any|None, overwrite:bool = False) -> dict[str, Any|None]: + + dictionary:dict[str, Any|None] = {} + + if Check.is_dictionary(inputs): + dictionary = inputs + elif Check.is_array(inputs): + + key:str + value:Any|None + + for key, value in cls.get_dictionary(inputs, overwrite).items(): + if overwrite or key not in dictionary: + dictionary[key] = value + + return dictionary + + @classmethod + def get_values(cls:type[Self], + keys:str|Sequence[str], + inputs:dict[str, Any|None]|Sequence[Any|None], + default:Optional[Any] = None + ) -> Any|None: + if len(keys := cls.get_keys(keys)): + + dictionary:dict[str, Any|None] + + for dictionary in cls.get_dictionaries(inputs): + + key:str + + for key in keys: + if key in dictionary: + return dictionary[key] + return default + + @classmethod + def get_texts(cls:type[Self], *items:list[Any|None]) -> list[str]: + + texts:list[str] = [] + item:Any|None + + for item in items: + if Check.is_string(item): + texts.append(item) + elif Check.is_array(item): + + text:str + + for text in cls.get_texts(*item): + texts.append(text) + + return texts + + @classmethod + def string_variables(cls:type[Self], + string:str, + inputs:dict[str, Any|None]|Sequence[Any|None], + default:Optional[str|None] = None + ) -> str: + + variables:dict[str, Any|None] = Utils.get_dictionary(inputs) + + def replace(match:Any) -> str: + + key:str = match.group(1) + + return str( + variables[key] if key in variables and variables[key] is not None else + default if default is not None else + match.group(0)) + + return RE.STRING_VARIABLES.sub(replace, string) \ No newline at end of file diff --git a/README.md b/README.md index fd75c9f..977e2d3 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,12 @@ docker exec -it ollama ollama run gemma ``` +Socket Python: + +- https://pypi.org/project/websockets/ +- https://github.com/python-websockets/websockets/ +- https://github.com/python-websockets/websockets/tree/main/src/websockets/sync + Modelos: - gemma - 5GB diff --git a/TXT/AIChat.response-with-titles.md b/TXT/AIChat.response-with-titles.md new file mode 100644 index 0000000..dfbf8e0 --- /dev/null +++ b/TXT/AIChat.response-with-titles.md @@ -0,0 +1,7 @@ +# Prompt del usuario + +{message} + +# Guías y manuales + +{guides} \ No newline at end of file diff --git a/TXT/AIChat.titles-prompt.md b/TXT/AIChat.titles-prompt.md new file mode 100644 index 0000000..534e0a5 --- /dev/null +++ b/TXT/AIChat.titles-prompt.md @@ -0,0 +1,21 @@ +Eres un clasificador estricto de dudas técnicas. +Tu objetivo es identificar si la consulta del usuario se puede resolver con alguno de los siguientes manuales. + +MANUALES DISPONIBLES: +{{titles}} + +REGLAS CRÍTICAS: +1. Si la consulta es un saludo, despedida, agradecimiento o charla trivial, devuelve {{"titles": []}}. +2. Si la consulta NO tiene relación directa con los manuales, devuelve {{"titles": []}}. +3. No intentes ser amable ni ayudar, solo clasifica. + +EJEMPLOS: +- Consulta: "Hola, buenas tardes" -> Respuesta: {{"titles": []}} +- Consulta: "Gracias por la ayuda" -> Respuesta: {{"titles": []}} +- Consulta: "¿Cómo instalo la impresora?" -> Respuesta: {{"titles": []}} (Si no hay manual de impresora) +- Consulta: "quiero poner el cividas" -> Respuesta: {{"titles": ["Instalar Cividas y tamaño de texto"]}} + +CONSULTA ACTUAL DEL USUARIO: +"{message}" + +Responde ÚNICAMENTE en formato JSON: \ No newline at end of file diff --git a/navecitas.html b/Tests/navecitas.html similarity index 100% rename from navecitas.html rename to Tests/navecitas.html diff --git a/navecitas.v2.html b/Tests/navecitas.v2.html similarity index 100% rename from navecitas.v2.html rename to Tests/navecitas.v2.html