#wip: AIChat building...

This commit is contained in:
mbruzon 2026-05-06 18:58:50 +02:00
parent 0e595ea428
commit 4552cf4c81
20 changed files with 1479 additions and 31 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
/ollama
/open-webui
/websockets

447
AIChat.py
View File

@ -1,53 +1,442 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Self, Any
from threading import Thread
from typing import Self, Any, Optional, Sequence
from requests import post as Post, Response
from json import loads as json_decode
from os.path import dirname as directory_name, abspath as absolute_path, exists as path_exists
from io import FileIO
from re import compile as re_compile, Pattern as REPattern, Match as REMatch, IGNORECASE as RE_IGNORECASE
from threading import Thread
from json import loads as json_decode, dumps as json_encode
from http import server as http_server
from http.server import BaseHTTPRequestHandler, HTTPServer
from mimetypes import guess_type as read_mime_types
from websockets.sync.server import serve as web_socket_server_serve
from websockets import ServerConnection as WebSocketServer, ClientConnection as WebSocketClient
class ClientRequestModel:
def __init__(self:Self, client:WebSocketClient, message:str) -> None:
self.client:WebSocketClient = client
self.message:str = message
class AIChat:
PAYLOAD:dict[str, str|bool] = {
"model" : "gemma",
"stream" : True
ROOT_PATH:str = directory_name(absolute_path(__file__))
SLASH:str = "/" if "/" in ROOT_PATH else "\\\\"
ROOTS_PATH:list[str] = ["", ROOT_PATH, ROOT_PATH + "/..", ROOT_PATH + "/../.."]
DEFAULT_SETTINGS:dict[str, Any|None] = {
"default_settings_files" : "/JSON/AIChat.settings.json",
"titles_model" : "gemma3:1b",
"titles_temperature" : 0.0,
"titles_prompt_file" : "/TXT/AIChat.titles-prompt.md",
"responses_model" : "gemma3",
"responses_temperature" : 7.0,
"response_with_titles_prompt_file" : "/TXT/AIChat.response-with-titles.md",
}
DEFAULT_SENTENCES:dict[str, dict[str, str|Sequence[str]]] = {
"espanol" : {}
}
def __init__(self:Self) -> None:
self.__working:bool = True
self.__thread:Thread = Thread(target=self.__listener)
self.__thread.start()
RE_KEY:REPattern = re_compile(r"^[a-z_][a-z0-9_]*$", RE_IGNORECASE)
RE_STRING_VARIABLES:REPattern = re_compile(r"\{([a-z_][a-z0-9_]*)\}", RE_IGNORECASE)
RE_SLASHES:REPattern = re_compile(r"[\\\/]+")
RE_NEW_LINES:REPattern = re_compile(r"\r\n|\r|\n")
def send(self:Self, message:str) -> str:
def __init__(self:Self, inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None) -> None:
self.__working:bool = True
self.__inputs:dict[str, Any|None] = self.get_dictionary(inputs)
self.__settings:dict[str, Any|None] = {}
self.__secrets:dict[str, Any|None] = {}
self.__sentences:dict[str, dict[str, str|Sequence[str]]] = {language : {
key : value for key, value in sentences.items()
} for language, sentences in self.DEFAULT_SENTENCES.items()}
self.__default_language:str = self.get("default_language", inputs, "espanol")
self.__language:str = self.get("language", inputs, self.__default_language)
self.__host:str = self.get("host", inputs, "localhost")
self.__port:int = self.get("port", inputs, 11434)
self.__api_url:str = self.get("api_url", inputs, "http://{host}:{port}/api/generate")
self.__index:dict[str, str] = {}
self.__requests_order:list[int] = []
self.__requests:dict[int, ClientRequestModel] = {}
self.__web_socket_server:WebSocketServer
self.__web_socket_host:str = self.get("web_socket_host", inputs, "localhost")
self.__web_socket_port:int = self.get("web_socket_port", inputs, 18001)
self.__web_socket_server_thread:Thread = Thread(target = self.__run_web_socket_server)
self.__titles_model:str = self.get("titles_model", inputs)
self.__titles_temperature:float = self.get("titles_temperature", inputs)
self.__titles_prompt:str = self.RE_NEW_LINES.sub("\\n", self.load_file(self.get("titles_prompt_file", inputs)))
self.__response_model:str = self.get("responses_model", inputs)
self.__response_temperature:float = self.get("responses_temperature", inputs)
self.__response_with_titles:str = self.RE_NEW_LINES.sub("\\n", self.load_file(self.get("response_with_titles_prompt_file", inputs)))
self.__http_server:HTTPServer
self.__http_server_thread:Thread = Thread(target = self.__run_http_server)
self.__http_host:str = self.get("http_host", inputs, "")
self.__http_port:int = self.get("http_port", inputs, 18000)
self.__http_directories:list[str] = self.get("http_indexes", inputs, ["/Public"])
self.__http_indexes:list[str] = self.get("http_indexes", inputs, ["", "index.html", "index.htm", "index.md", "index.txt"])
self.__terminal_thread:Thread = Thread(target = self.__terminal)
self.__web_sockets_clients:list[WebSocketClient] = []
self.__terminal_thread.start()
self.__http_server_thread.start()
self.__web_socket_server_thread.start()
def close(self:Self) -> None:
client:WebSocketClient
self.__working = False
for client in self.__web_sockets_clients:
try:
client.close()
except Exception as exception:
pass
self.__web_socket_server.shutdown()
self.__web_socket_server_thread.join()
self.__http_server.shutdown()
def get(self:Self,
keys:str|Sequence[str],
inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None,
default:Optional[Any|None] = None
) -> Any|None:
return self.get_value(keys, (
inputs, self.__inputs, self.__secrets, self.__settings, self.DEFAULT_SETTINGS
), default)
def __get_sentence(self:Self, texts:str|Sequence[str], custom_language:Optional[str] = None) -> str|Sequence[str]:
language:str
done:list[str] = []
keys:list[str] = self.get_keys(texts)
for language in [
custom_language, self.__language, self.__default_language
] + list(self.__sentences.keys()):
if language not in done and language in self.__sentences:
key:str
done.append(language)
for key in keys:
if key in self.__sentences[language]:
return self.__sentences[language][key]
return self.get_texts(texts)[0]
def i18n(self:Self,
texts:str|Sequence[str],
inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None,
language:Optional[str] = None
) -> str:
text:str|Sequence[str] = self.__get_sentence(texts, language)
return self.string_variables("".join(text) if isinstance(text, (list, tuple)) else str(text), inputs)
def __terminal(self:Self) -> None:
while self.__working:
try:
command:str = input()
if command in ("exit", "close", "quit", "stop", "bye"):
self.close()
else:
print(self.i18n("unknown_command", {"command" : command}))
except Exception as exception:
print(exception)
def __run_web_socket_server(self:Self) -> None:
with web_socket_server_serve(self.__web_socket_handler, self.__web_socket_host, self.__web_socket_port) as self.__web_socket_server:
self.__web_socket_server.serve_forever()
def __web_socket_handler(self:Self, client:WebSocketClient) -> None:
if client not in self.__web_sockets_clients:
self.__web_sockets_clients.append(client)
try:
for message in client:
try:
print(message)
except Exception as exception:
print(exception)
except Exception as exception:
print(exception)
self.__web_sockets_clients.remove(client)
print("Client disconnected")
def __run_http_server(self:Self) -> None:
self.__http_server = HTTPServer((self.__http_host, self.__http_port), self.HTTPRequestHandler)
self.__http_server.aichat = self
self.__http_server.serve_forever()
def get_http_indexes(self:Self) -> list[str]:
return [*self.__http_indexes]
def get_http_directories(self:Self) -> list[str]:
return [*self.__http_directories]
class HTTPRequestHandler(BaseHTTPRequestHandler):
def do_GET(self:Self) -> None:
aichat:AIChat = self.server.aichat
data:bytes|None = None
directory:str
for directory in aichat.get_http_directories():
path:str = directory + self.path
index:str
for index in aichat.get_http_indexes():
if (data := aichat.load_file(path + ("/" + index if index else ""), "rb")) is not None:
break
if data is None:
self.send_response(404)
self.end_headers()
else:
mime:str = (read_mime_types(path) or ("application/octet-stream",))[0]
self.send_response(200)
self.send_header("Content-Type", mime)
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def do_POST(self:Self) -> None:
path:str = self.path
if path == "/api/send":
pass
else:
self.send_response(404)
self.end_headers()
def get_titles(self:Self, message:str) -> list[str]:
try:
response:Response
with Post('http://localhost:11434/api/chat', json = {**self.PAYLOAD, "prompt" : message}) as response:
with Post(self.string_variables(self.__api_url, {
"host" : self.__host,
"port" : self.__port
}), json = {
"model" : self.__titles_model,
"prompt" : self.string_variables(self.__titles_prompt, {"message" : message}),
"format" : "json",
"stream" : False,
"options" : {
"temperature" : self.__titles_temperature
}
}) as response:
if response.status_code == 200:
return response.json().get("titles")
except Exception as exception:
pass
return []
line:bytes
def send(self:Self, message:str, client:WebSocketClient) -> str:
titles:list[str] = self.get_titles(message)
try:
response:Response
with Post(self.string_variables(self.__api_url, {
"host" : self.__host,
"port" : self.__port
}), json = {
"model" : self.__titles_model,
"prompt" : self.string_variables(self.__response_with_titles, {
"message" : message,
"guides" : "\n\n".join("## " + title + "\n\n" + self.load_file(self.__index[title]) for title in titles)
}) if len(titles) else message,
"stream" : True,
"options" : {
"temperature" : self.__titles_temperature
}
}, stream = True) as response:
line:str
for line in response.iter_lines():
if line:
chunk:dict[str, Any|None] = json_decode(line)
print(chunk.get("response", ""), end = "", flush = True)
if chunk.get("done"):
break
client.send({"chunk" : json_decode(line)})
except Exception as exception:
print(f"An error occurred while sending the message: {exception}")
pass
return []
def __listener(self:Self) -> None:
while self.__working:
@classmethod
def get_keys(cls:type[Self], *items:list[Any|None]) -> list[str]:
user_input:str = input('> ')
keys:list[str] = []
item:Any|None
if user_input in ("close", "exit", "quit"):
self.__working = False
else:
self.send(user_input)
for item in items:
if isinstance(item, str):
item not in keys and cls.RE_KEY.match(item) and keys.append(item)
elif isinstance(item, list):
ai_chat = AIChat()
key:str
for key in cls.get_keys(*item):
key not in keys and keys.append(key)
return keys
@classmethod
def get_texts(cls:type[Self], *items:list[Any|None]) -> list[str]:
texts:list[str] = []
item:Any|None
for item in items:
if isinstance(item, str):
texts.append(item)
elif isinstance(item, (list, tuple)):
text:str
for text in cls.get_texts(*item):
texts.append(text)
return texts
@classmethod
def get_dictionaries(cls:type[Self], *items:list[Any|None]) -> list[dict[str, Any|None]]:
dictionaries:list[dict[str, Any|None]] = []
item:Any|None
for item in items:
if isinstance(item, dict):
dictionaries.append(item)
elif isinstance(item, (list, tuple)):
dictionary:dict[str, Any|None]
for dictionary in cls.get_dictionaries(*item):
dictionaries.append(dictionary)
return dictionaries
@classmethod
def get_dictionary(cls:type[Self], inputs:Any|None, overwrite:bool = False) -> dict[str, Any|None]:
dictionary:dict[str, Any|None] = {}
if isinstance(inputs, dict):
key:str
value:Any|None
for key, value in inputs.items():
if overwrite or key not in dictionary:
dictionary[key] = value
elif isinstance(inputs, (list, tuple)):
subinputs:dict[str, Any|None]
for subinputs in inputs:
key:str
value:Any|None
for key, value in cls.get_dictionaries(subinputs).items():
if overwrite or key not in dictionary:
dictionary[key] = value
return dictionary
@classmethod
def get_value(cls:type[Self],
keys:str|Sequence[str],
inputs:dict[str, Any|None]|Sequence[Any|None],
default:Optional[Any|None] = None
) -> Any|None:
if len(keys := cls.get_keys(keys)):
subinputs:dict[str, Any|None]
for subinputs in cls.get_dictionaries(inputs):
key:str
for key in keys:
if key in subinputs:
return subinputs[key]
return default
@classmethod
def string_variables(cls:type[Self],
string:str,
inputs:dict[str, Any|None]|Sequence[Any|None],
default:Optional[str] = None
) -> str:
inputs = cls.get_dictionary(inputs)
def replace(matches:REMatch) -> str:
key:str = matches.group(1)
return (
str(inputs[key]) if key in inputs else
str(default) if default is not None else
matches.groups(0))
return cls.RE_STRING_VARIABLES.sub(replace, string)
@classmethod
def fix_path(cls:type[Self], path:str) -> str:
path = cls.RE_SLASHES.sub(cls.SLASH, path)
# return "\\" + path if path[0] == "\\" else path
return path
@classmethod
def get_absolute_path(cls:type[Self], path:str) -> str|None:
root:str
for root in cls.ROOTS_PATH:
absolute_path:str = cls.fix_path((root + cls.SLASH if root else "") + path)
if path_exists(absolute_path):
return absolute_path
return None
@classmethod
def load_file(cls:type[Self], path:str, mode:str = "r") -> str|bytes|None:
try:
file:FileIO
if mode == "r":
for format in ("utf8", "cp1252", "cp850"):
try:
with open(cls.get_absolute_path(path), mode, encoding = format) as file:
return file.read()
except:
pass
elif mode == "rb":
with open(cls.get_absolute_path(path), mode) as file:
return file.read()
except Exception as exception:
# print(exception)
pass
return None
aichat:AIChat = AIChat()

View File

@ -0,0 +1,6 @@
{
"autostart" : true,
"default_settings_files" : "/JSON/AIChat.settings.json",
"ai_model" : "gemma",
"ai_stream" : true
}

420
Public/ecma/AIChat.ecma.js Normal file
View File

@ -0,0 +1,420 @@
"use strict";
/**
* @class AIChat
* @constructor
* @param {?(Object.<string, any|null>|Array.<any|null>)} [inputs = nulls]
* @returns {void}
* @access public
* @static
*/
export const AIChat = (function(){
/**
* @callback aichat_init_callback
* @param {!boolean} ok
* @returns {boolean}
*/
/**
* @callback aichat_preload_callback
* @param {?HTMLElement} item
* @param {!boolean} asynchronous
* @param {!boolean} ok
* @returns {void}
*/
/**
* @typedef {HTMLElement|string|[string, Object.<string, any|null>|null, HTMLElement|string|Array.<aichat_html_item>|null]} aichat_html_item
*/
/**
* @constructs AIChat
* @param {?(Object.<string, any|null>|Array.<any|null>)} [inputs = nulls]
* @returns {void}
* @access private
* @static
*/
const AIChat = function(inputs = null){
/** @type {AIChat} */
const self = this,
/** @type {Object.<string, any|null>} */
settings = {},
/** @type {Object.<string, any|null>} */
secrets = {};
/** @type {boolean} */
let started = false,
/** @type {number} */
preload_timeout = 2000,
/** @type {number} */
frames_per_second = 60,
/** @type {WebSocket|null} */
web_socket_client = null;
/**
* @returns {void}
* @access private
*/
const constructor = () => {
self.get("autostart") && self.start();
};
/**
* @param {?aichat_init_callaback} [callback = null]
* @returns {boolean}
* @access public
*/
this.start = (callback = null) => {
/** @type {aichat_init_callaback} */
const end = ok => typeof callback == "function" ? callback(ok) : ok;
if(started)
return end(false);
started = true;
preload_timeout = self.get(["preload_timeout", "timeout"], null, preload_timeout);
frames_per_second = self.get(["frames_per_second", "fps"], null, frames_per_second);
self.preload(self.get("position", null, "body"), (item, asynchronous, ok) => {
if(ok){
build(item);
try{
web_socket_client = new WebSocket(self.get("web_socket_url", null, "ws://localhost:18001/"));
web_socket_client.onopen = on_web_socket_open;
web_socket_client.onmessage = on_web_socket_message;
web_socket_client.onerror = on_web_socket_error;
web_socket_client.onclose = on_web_socket_close;
}catch(exception){
console.error("Failed to connect to the WebSocket server:", exception);
};
};
end(ok);
});
return true;
};
/**
* @param {?aichat_init_callaback} [callback = null]
* @returns {boolean}
* @access public
*/
this.stop = (callback = null) => {
/** @type {aichat_init_callaback} */
const end = ok => typeof callback == "function" ? callback(ok) : ok;
if(!started)
return end(false);
started = false;
return end(true);
};
const on_web_socket_open = event => {
web_socket_client.send("Hello, WebSocket server!");
};
const on_web_socket_message = event => {};
const on_web_socket_error = event => {};
const on_web_socket_close = event => {};
/**
* @param {!(string|Array.<string>)} keys
* @param {!(Object.<string, any|null>|Array.<any|null>)} inputs
* @param {?any} [_default = null]
* @returns {any|null}
* @access public
*/
this.get = (keys, subinputs, _default = null) => AIChat.get_value(keys, [
subinputs, inputs, secrets, settings, AIChat.DEFAULT_SETTINGS
], _default);
/**
* @param {!(string|HTMLElement)} selector
* @param {!aichat_preload_callback} callback
* @returns {void}
* @access public
*/
this.preload = (selector, callback) => {
if(typeof callback != "function")
return;
if(AIChat.is_html_item(selector))
return callback(selector, false, true);
if(typeof selector != "string")
return callback(null, false, false);
/** @type {HTMLElement|null} */
let item;
try{
if(item = document.querySelector(selector))
return callback(item, false, true);
}catch(exception){
return callback(null, false, false);
};
/** @type {number} */
const date = Date.now();
/** @type {number} */
let interval = setInterval(() => {
if(item = document.querySelector(selector)){
clearInterval(interval);
callback(item, true, true);
}else if(Date.now() - date > preload_timeout){
clearInterval(interval);
callback(null, true, false);
};
}, 1000 / frames_per_second);
};
/**
* @param {!HTMLElement} item
* @return {void}
* @access private
*/
const build = item => {
AIChat.HTML(item, ["div", {class: "aichat"}, [
["header", null, [
["h1", null, "AI Chat"]
]],
["main", null, [
["fieldset", {class : "chat"}, [
["legend", {data_i18n : "chat"}, "Chat"],
["section", {"class" : "messages"}],
["form", {
action : "#",
method : "post",
on_submit : send
}, [
["div", null, [
["textarea", {
name : "message",
data_i18n : "type_message",
data_i18n_without : true,
placeholder : "Type your message here..."
}]
]],
["button", {
type : "submit",
data_i18n : "send",
data_i18n_without : true,
title : "Send"
}, [
["span", {data_icon : "send"}],
["span", {data_i18n : "send"}, "Send"]
]],
["button", {
type : "reset",
data_i18n : "clean",
data_i18n_without : true,
title : "Clean"
}, [
["span", {data_icon : "clean"}],
["span", {data_i18n : "clean"}, "Clean"]
]]
]]
]]
]],
["footer"]
]]);
};
/**
* @param {!HTMLElement} item
* @param {!Event} event
* @returns {any|null|void}
* @access private
*/
const send = (item, event) => {
event.preventDefault();
web_socket_client.send(document.querySelector(".aichat form textarea").value);
return false;
};
constructor();
};
/** @type {Object.<string, any|null>} */
AIChat.DEFAULT_SETTINGS = {
/** @type {boolean} */
autostart : true,
/** @type {number} */
timeout : 2000,
/** @type {number} */
preload_timeout : 2000,
/** @type {number} */
frames_per_second : 60,
/** @type {string} */
position : "body"
};
/**
* @param {?any} item
* @returns {boolean}
* @access public
* @static
*/
AIChat.is_key = item => typeof item == "string" && /^[a-z_][a-z0-9_]*$/i.test(item);
/**
* @param {?any} item
* @returns {boolean}
* @access public
* @static
*/
AIChat.is_dictionary = item => item && item.constructor === Object;
/**
* @param {?any} item
* @returns {boolean}
* @access public
* @static
*/
AIChat.is_html_item = item => item && (item.tagName || item.nodeName);
/**
* @param {...?any} items
* @returns {Array.<string>}
* @access public
* @static
*/
AIChat.get_keys = (...items) => {
/** @type {Array.<string>} */
const keys = [];
items.forEach(item => {
if(AIChat.is_key(item))
keys.includes(item) || keys.push(item);
else if(Array.isArray(items))
AIChat.get_keys(...item).forEach(key => keys.includes(key) || keys.push(key));
});
return keys;
};
/**
* @param {...?any} items
* @returns {Array.<Object.<string, any|null>>}
* @access public
* @static
*/
AIChat.get_dictionaries = (...items) => {
/** @type {Array.<Object.<string, any|null>>} */
const dictionaries = [];
items.forEach(item => {
if(AIChat.is_dictionary(item))
dictionaries.includes(item) || dictionaries.push(item);
else if(item instanceof Array)
AIChat.get_dictionaries(...item).forEach(dictionary => {
dictionaries.includes(dictionary) || dictionaries.push(dictionary);
});
});
return dictionaries;
};
/**
* @param {!(string|Array.<string>)} keys
* @param {!(Object.<string, any|null>|Array.<any|null>)} inputs
* @param {?any} [_default = null]
* @returns {any|null}
* @access public
* @static
*/
AIChat.get_value = (keys, inputs, _default = null) => {
if((keys = AIChat.get_keys(keys)).length)
for(const subinputs of AIChat.get_dictionaries(inputs))
for(const key of keys)
if(key in subinputs)
return subinputs[key];
return _default;
};
/**
* @param {!(HTMLElement|string)} item
* @param {!Object.<string, any|null>} attributes
* @returns {void}
* @access public
* @static
*/
AIChat.set_attributes = (item, attributes) => {
typeof item == "string" && (item = document.querySelector(item));
if(AIChat.is_html_item(item))
for(const key in attributes){
/** @type {any|null} */
const value = attributes[key];
if(/^on/i.test(key) && typeof value == "function")
item.addEventListener(key.replace(/^on[-_]?|-/ig, ""), event => value(item, event));
else
item.setAttribute(key.replace(/[^a-z0-9]+/ig, "-"), value);
};
};
/**
* @param {...aichat_html_item} items
* @returns {Array.<HTMLElement>}
* @access public
* @static
*/
AIChat.HTML = (...items) => {
/** @type {boolean} */
const has_html_item = AIChat.is_html_item(typeof items[0] == "string" ? items[0] = document.querySelector(items[0]) || items[0] : items[0]),
/** @type {DocumentFragment} */
fragment = document.createDocumentFragment();
(has_html_item ? items.slice(1) : items).forEach(item => {
if(AIChat.is_html_item(item))
fragment.appendChild(item);
else if(typeof item == "string")
fragment.appendChild(document.createTextNode(item));
else if(Array.isArray(item)){
/** @type {[string, Object.<string, any|null>|null, HTMLElement|string|Array.<aichat_html_item>|null]} */
const [tag, attributes, children] = item.concat(null, null),
/** @type {!HTMLElement} */
child = fragment.appendChild(document.createElement(tag));
AIChat.is_dictionary(attributes) && AIChat.set_attributes(child, attributes);
if(AIChat.is_html_item(children))
child.appendChild(children);
else if(typeof children == "string")
child.appendChild(document.createTextNode(children));
else if(Array.isArray(children))
AIChat.HTML(child, ...children);
};
});
has_html_item && items[0].appendChild(fragment);
return fragment.childNodes;
};
return AIChat;
})();

24
Public/index.html Normal file
View File

@ -0,0 +1,24 @@
<!DOCTYPE html>
<html lang="en">
<head>
<title>AIChat</title>
<meta http-equiv="content-type" content="text/html; charset=UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta charset="UTF-8" />
<link type="text/css;charset=utf-8" data-language="SASS/CSS3" rel="stylesheet" href="./scss/AIChat.css" data-scss="./scss/AIChat.scss" data-css-map="./scss/AIChat.css.map" data-crossorigin="anonymous" charset="utf-8" />
<script type="module" data-type="text/javascript;charset=utf-8" data-language="ECMAScript 2015" src="./ecma/AIChat.ecma.js" data-crossorigin="anonymous" charset="utf-8"></script>
<script type="module" data-type="text/javascript;charset=utf-8" data-language="ECMAScript 2015" charset="utf-8">
"use strict";
import {AIChat} from "./ecma/AIChat.ecma.js";
/** @type {AIChat} */
const aichat = new AIChat();
</script>
</head>
<body></body>
</html>

View File

@ -0,0 +1,117 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Self, Any, Sequence, Optional, Callable
from threading import Thread
from requests import post as Post, Response
from json import loads as json_decode
from re import compile as re_compile, Pattern as REPattern, IGNORECASE as RE_IGNORECASE
from websockets.sync import server as WebSocketServer
from Utils.Utils import Utils
from Utils.Check import Check
from Drivers.FilesDriver import FilesDriver
class PayloadModel:
def __init__(self:Self) -> None:
self.model:str = "gemma"
self.stream:bool = True
class AIChat:
DEFAULT_SETTINGS:dict[str, Any|None] = {
"autostart" : True,
"ai_model" : "gemma",
"ai_stream" : True
}
def __init__(self:Self, inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None) -> None:
self.__inputs:dict[str, Any|None] = self.get_dictionary(inputs, overwrite = True)
self.__working:bool = False
self.__started:bool = False
self.__thread:Thread|None = None
self.__payload:PayloadModel = PayloadModel()
self.files:FilesDriver = FilesDriver(self)
def start(self:Self, callback:Optional[Callable[[bool], bool]] = None) -> bool:
end:Callable[[bool], bool] = lambda ok: callback(ok) if callable(callback) else ok
if self.__started:
return end(False)
self.__started = True
self.__working = True
self.__payload.model = self.get("ai_model", self.__payload.model)
self.__thread = Thread(target = self.__listener)
self.__thread.start()
return end(True)
def close(self:Self, callback:Optional[Callable[[bool], bool]] = None) -> bool:
end:Callable[[bool], bool] = lambda ok: callback(ok) if callable(callback) else ok
if not self.__started:
return end(False)
self.__started = False
self.__working = False
return end(True)
def load_json(self:Self, data:Any|None, only_dictionary:bool = True) -> list[list[Any|None]|dict[str, Any|None]]:
items:list[list[Any|None]|dict[str, Any|None]] = []
if Check.is_string(data):
items.extends(self.files.load_json(data))
elif Utils.is_array(data):
if only_dictionary:
item:Any|None
for item in data:
items.extend(self.load_json(item, only_dictionary))
else:
items.append(data)
elif Utils.is_dictionary(data):
items.append(data)
return items
def send(self:Self, message:str) -> str:
try:
response:Response
with Post('http://localhost:11434/api/chat', json = {**self.PAYLOAD, "prompt" : message}) as response:
line:bytes
for line in response.iter_lines():
if line:
chunk:dict[str, Any|None] = json_decode(line)
print(chunk.get("response", ""), end = "", flush = True)
if chunk.get("done"):
break
except Exception as exception:
print(f"An error occurred while sending the message: {exception}")
def __listener(self:Self) -> None:
while self.__working:
user_input:str = input('> ')
if user_input in ("close", "exit", "quit"):
self.__working = False
else:
self.send(user_input)
ai_chat = AIChat()

View File

@ -0,0 +1,100 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Self, Any, Optional, Sequence
from os.path import exists as path_exists, isfile as is_file, isdir as is_directory, dirname as directory_name, abspath as absolute_path
from io import FileIO
from json import loads as json_decode
from Interfaces.Application.AIChatInterface import AIChatInterface
from Utils.Patterns import RE
class FilesDriver:
ROOT:str = directory_name(absolute_path(__file__))
SLASH:str = "/" if "/" in ROOT else "\\\\"
def __init__(self:Self, aichat:AIChatInterface) -> None:
self.aichat:AIChatInterface = aichat
self.__root_paths:list[str] = ["", self.ROOT]
for _ in range(2):
self.__root_paths.append(RE.PARENT_DIRECTORY.sub(r'\1', self.__root_paths[-1]))
@classmethod
def fix_path(cls:type[Self], path:str) -> str:
return RE.SLASHES.sub(cls.SLASH, path)
def get_absolute_path(self:Self, path:str) -> str|None:
root:str
for root in self.__root_paths:
absolute:str = self.fix_path((root + self.slash if root else "") + path)
if path_exists(absolute):
return absolute
return None
def exists(self:Self, path:str) -> bool:
return self.get_absolute_path(path) is not None
def is_file(self:Self, path:str) -> bool:
absolute:str = self.get_absolute_path(path)
return is_file(absolute) if absolute else False
def is_directory(self:Self, path:str) -> bool:
absolute:str = self.get_absolute_path(path)
return is_directory(absolute) if absolute else False
def load(self:Self, path:str, mode:str = "r") -> str|bytes|None:
absolute:str = self.get_absolute_path(path)
if absolute and is_file(absolute):
file:FileIO
try:
with open(absolute, mode) as file:
return file.read()
except Exception as exception:
pass
return None
def load_json(self:Self, path:str) -> str|bytes|None:
json:Sequence[Any]|dict[str, Any|None]|None
try:
json = json_decode(path)
if json != None:
return json
except Exception as exception:
pass
data:str|None = self.load(path)
if data:
try:
return json_decode(data)
except Exception as exception:
pass
return None
def save(self:Self, path:str, content:str|bytes, mode:str = "w") -> bool:
try:
file:FileIO
with open(self.fix_path(path), mode) as file:
file.write(content)
return True
except Exception as exception:
pass
return False

View File

@ -0,0 +1,13 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Self, Any, Optional, Sequence
from abc import ABC, abstractmethod
from Interfaces.Drivers.FilesDriverInterface import FilesDriverInterface
from Interfaces.Managers.SettingsManagerInterface import SettingsManagerInterface
class AIChatInterface(ABC):
def __init__(self:Self, inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None) -> None:
self.files:FilesDriverInterface = None
self.settings:SettingsManagerInterface = None

View File

@ -0,0 +1,28 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Self
from abc import ABC, abstractmethod
class FilesDriverInterface(ABC):
@abstractmethod
def get_absolute_path(self:Self, path:str) -> str|None:pass
@abstractmethod
def exists(self:Self, path:str) -> bool:pass
@abstractmethod
def is_file(self:Self, path:str) -> bool:pass
@abstractmethod
def is_directory(self:Self, path:str) -> bool:pass
@abstractmethod
def load(self:Self, path:str, mode:str = "r") -> str|bytes|None:pass
@abstractmethod
def load_json(self:Self, path:str) -> str|bytes|None:pass
@abstractmethod
def save(self:Self, path:str, content:str|bytes, mode:str = "w") -> bool:pass

View File

@ -0,0 +1,20 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Self, Any, Optional, Sequence
from abc import ABC, abstractmethod
class SettingsManagerInterface(ABC):
@abstractmethod
def get(self:Self,
keys:str|Sequence[str],
inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None,
default:Optional[Any] = None
) -> Any|None:pass
@abstractmethod
def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None:pass
@abstractmethod
def add_secrets(self:Self, inputs:Any|None, overwrite:bool = False) -> None:pass

View File

@ -0,0 +1,71 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Self, Any, Optional, Sequence
from Utils.Utils import Utils
from Interfaces.Application.AIChatInterface import AIChatInterface
class SettingsManager:
DEFAULT_SENTENCES:dict[str, dict[str, str]] = {
"english" : {}
}
def __init__(self:Self, aichat:AIChatInterface) -> None:
self.aichat:AIChatInterface = aichat
self.__sentences:dict[str, dict[str, str|list[str]]] = {}
self.__default_language:str = "english"
self.__language:str = "english"
for key in (
"default_i18n_files", "i18n_files",
"default_i18n", "i18n"
):
self.add(self.get(key), overwrite = True)
def __get(self:Self, texts:str|Sequence[str], language:Optional[str] = None) -> Any|None:
keys:list[str] = Utils.get_keys(texts)
if len(keys):
language:str
done:list[str] = []
for language in (self.__language, self.__default_language) + tuple(Utils.get_keys(self.__sentences)):
if language in self.__sentences and language not in done:
key:str
done.append(language)
for key in keys:
if key in self.__sentences[language]:
return self.__sentences[language][key]
return Utils.get_texts(texts)[0]
def get(self:Self,
texts:str|Sequence[str],
inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None,
language:Optional[str] = None
) -> Any|None:
text:str|list[str]|None = self.__get(texts, language)
return Utils.string_variables((
"".join(text) if Utils.is_array(text) else
text), inputs)
def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None:
block:dict[str, Any|None]
for block in Utils.get_dictionaries(inputs):
language:str
sentences:dict[str, Any|None]
for key, value in block.items():
if overwrite or key not in self.__sentences:
self.__i18n[key] = value

View File

@ -0,0 +1,70 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Self, Any, Optional, Sequence
from Utils.Utils import Utils
from Interfaces.Application.AIChatInterface import AIChatInterface
class SettingsManager:
DEFAULT_SETTINGS:dict[str, Any|None] = {
"autostart" : True,
"default_settings_files" : "/JSON/AIChat.settings.json"
}
def __init__(self:Self,
aichat:AIChatInterface,
inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None
) -> None:
self.aichat:AIChatInterface = aichat
self.__inputs:dict[str, Any|None] = self.get_dictionary(inputs)
self.__settings:dict[str, Any|None] = {}
self.__secrets:dict[str, Any|None] = {}
for key in (
"default_settings_files", "settings_files",
"default_settings", "settings"
):
self.add(self.get(key), overwrite = True)
for key in (
"default_secrets_files", "secrets_files",
"default_secrets", "secrets"
):
self.add_secrets(self.get(key), overwrite = True)
def get(self:Self,
keys:str|Sequence[str],
inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None,
default:Optional[Any] = None
) -> Any|None:
return Utils.get_values(keys, (
inputs, self.__inputs, self.__secrets, self.__settings, self.aichat.DEFAULT_SETTINGS
), default)
def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None:
block:dict[str, Any|None]
for block in Utils.get_dictionaries(inputs):
key:str
value:Any|None
for key, value in block.items():
if overwrite or key not in self.__settings:
self.__settings[key] = value
def add_secrets(self:Self, inputs:Any|None, overwrite:bool = False) -> None:
block:dict[str, Any|None]
for block in Utils.get_dictionaries(inputs):
key:str
value:Any|None
for key, value in block.items():
if overwrite or key not in self.__secrets:
self.__secrets[key] = value

23
Python/Utils/Check.py Normal file
View File

@ -0,0 +1,23 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Any, Self
from Utils.Patterns import RE
class Check:
@staticmethod
def is_string(item:Any|None) -> bool:
return isinstance(item, str)
@classmethod
def is_key(cls:type[Self], item:Any|None) -> bool:
return cls.is_string(item) and RE.KEY.match(item)
@staticmethod
def is_dictionary(item:Any|None) -> bool:
return isinstance(item, dict)
@staticmethod
def is_array(item:Any|None) -> bool:
return isinstance(item, (list, tuple))

11
Python/Utils/Patterns.py Normal file
View File

@ -0,0 +1,11 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from re import compile as re_compile, Pattern as REPattern, IGNORECASE as RE_IGNORECASE
class RE:
KEY:REPattern = re_compile(r"^[a-z_][a-z0-9_]*$", RE_IGNORECASE)
PARENT_DIRECTORY:REPattern = re_compile(r'^(.+)[\/\\][^\/\\]+[\/\\]*$')
SLASHES:REPattern = re_compile(r'[\/\\]+')
STRING_VARIABLES:REPattern = re_compile(r'\{([a-z_][a-z0-9_]*)\}', RE_IGNORECASE)

121
Python/Utils/Utils.py Normal file
View File

@ -0,0 +1,121 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Self, Any, Sequence, Optional
from Utils.Check import Check
from Utils.Patterns import RE
class Utils:
@classmethod
def get_keys(cls:type[Self], *items:list[Any|None]) -> list[str]:
keys:list[str] = []
item:Any|None
for item in items:
if Check.is_key(item):
if item not in keys:
keys.append(item)
elif Check.is_array(item):
key:str
for key in cls.get_keys(*item):
if key not in keys:
keys.append(key)
return keys
@classmethod
def get_dictionaries(cls:type[Self], *items:list[Any|None]) -> list[dict[str, Any|None]]:
dictionaries:list[dict[str, Any|None]] = []
item:Any|None
for item in items:
if Check.is_dictionary:
dictionaries.append(item)
elif Check.is_array(item):
dictionary:dict[str, Any|None]
for dictionary in cls.get_dictionaries(*item):
dictionaries.append(dictionary)
return dictionaries
@classmethod
def get_dictionary(cls:type[Self], inputs:Any|None, overwrite:bool = False) -> dict[str, Any|None]:
dictionary:dict[str, Any|None] = {}
if Check.is_dictionary(inputs):
dictionary = inputs
elif Check.is_array(inputs):
key:str
value:Any|None
for key, value in cls.get_dictionary(inputs, overwrite).items():
if overwrite or key not in dictionary:
dictionary[key] = value
return dictionary
@classmethod
def get_values(cls:type[Self],
keys:str|Sequence[str],
inputs:dict[str, Any|None]|Sequence[Any|None],
default:Optional[Any] = None
) -> Any|None:
if len(keys := cls.get_keys(keys)):
dictionary:dict[str, Any|None]
for dictionary in cls.get_dictionaries(inputs):
key:str
for key in keys:
if key in dictionary:
return dictionary[key]
return default
@classmethod
def get_texts(cls:type[Self], *items:list[Any|None]) -> list[str]:
texts:list[str] = []
item:Any|None
for item in items:
if Check.is_string(item):
texts.append(item)
elif Check.is_array(item):
text:str
for text in cls.get_texts(*item):
texts.append(text)
return texts
@classmethod
def string_variables(cls:type[Self],
string:str,
inputs:dict[str, Any|None]|Sequence[Any|None],
default:Optional[str|None] = None
) -> str:
variables:dict[str, Any|None] = Utils.get_dictionary(inputs)
def replace(match:Any) -> str:
key:str = match.group(1)
return str(
variables[key] if key in variables and variables[key] is not None else
default if default is not None else
match.group(0))
return RE.STRING_VARIABLES.sub(replace, string)

View File

@ -15,6 +15,12 @@ docker exec -it ollama ollama run gemma
```
Socket Python:
- https://pypi.org/project/websockets/
- https://github.com/python-websockets/websockets/
- https://github.com/python-websockets/websockets/tree/main/src/websockets/sync
Modelos:
- gemma - 5GB

View File

@ -0,0 +1,7 @@
# Prompt del usuario
{message}
# Guías y manuales
{guides}

View File

@ -0,0 +1,21 @@
Eres un clasificador estricto de dudas técnicas.
Tu objetivo es identificar si la consulta del usuario se puede resolver con alguno de los siguientes manuales.
MANUALES DISPONIBLES:
{{titles}}
REGLAS CRÍTICAS:
1. Si la consulta es un saludo, despedida, agradecimiento o charla trivial, devuelve {{"titles": []}}.
2. Si la consulta NO tiene relación directa con los manuales, devuelve {{"titles": []}}.
3. No intentes ser amable ni ayudar, solo clasifica.
EJEMPLOS:
- Consulta: "Hola, buenas tardes" -> Respuesta: {{"titles": []}}
- Consulta: "Gracias por la ayuda" -> Respuesta: {{"titles": []}}
- Consulta: "¿Cómo instalo la impresora?" -> Respuesta: {{"titles": []}} (Si no hay manual de impresora)
- Consulta: "quiero poner el cividas" -> Respuesta: {{"titles": ["Instalar Cividas y tamaño de texto"]}}
CONSULTA ACTUAL DEL USUARIO:
"{message}"
Responde ÚNICAMENTE en formato JSON: