709 lines
26 KiB
Python
709 lines
26 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from typing import Self, Any, Optional, Sequence, Callable
|
|
from requests import post as Post, Response
|
|
from os.path import dirname as directory_name, abspath as absolute_path, exists as path_exists
|
|
from io import FileIO
|
|
from re import compile as re_compile, Pattern as REPattern, Match as REMatch, IGNORECASE as RE_IGNORECASE
|
|
from threading import Thread
|
|
from json import loads as json_decode, dumps as json_encode
|
|
from time import sleep
|
|
from http import server as http_server
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
from mimetypes import guess_type as read_mime_types
|
|
from websockets.sync.server import serve as web_socket_server_serve
|
|
from websockets import ServerConnection as WebSocketServer, ClientConnection as WebSocketClient
|
|
|
|
class AIChat:
|
|
|
|
ROOT_PATH:str = directory_name(absolute_path(__file__))
|
|
SLASH:str = "/" if "/" in ROOT_PATH else "\\\\"
|
|
ROOTS_PATH:list[str] = ["", ROOT_PATH, ROOT_PATH + "/..", ROOT_PATH + "/../.."]
|
|
DEFAULT_SETTINGS:dict[str, Any|None] = {
|
|
"default_settings_files" : "/JSON/AIChat.settings.json",
|
|
"titles_model" : "gemma3:1b",
|
|
"titles_temperature" : 0.0,
|
|
"titles_prompt_file" : "/TXT/AIChat.titles-prompt.md",
|
|
"responses_model" : "gemma3:1b",
|
|
"responses_temperature" : 7.0,
|
|
"response_with_titles_prompt_file" : "/TXT/AIChat.response-with-titles.md",
|
|
}
|
|
DEFAULT_SENTENCES:dict[str, dict[str, str|Sequence[str]]] = {
|
|
"espanol" : {}
|
|
}
|
|
|
|
RE_KEY:REPattern = re_compile(r"^[a-z_][a-z0-9_]*$", RE_IGNORECASE)
|
|
RE_STRING_VARIABLES:REPattern = re_compile(r"\{([a-z_][a-z0-9_]*)\}", RE_IGNORECASE)
|
|
RE_SLASHES:REPattern = re_compile(r"[\\\/]+")
|
|
RE_NEW_LINES:REPattern = re_compile(r"\r\n|\r|\n")
|
|
|
|
def __init__(self:Self, inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None) -> None:
|
|
|
|
self.__working:bool = True
|
|
self.__inputs:dict[str, Any|None] = self.get_dictionary(inputs)
|
|
self.__settings:dict[str, Any|None] = {}
|
|
self.__secrets:dict[str, Any|None] = {}
|
|
self.__sentences:dict[str, dict[str, str|Sequence[str]]] = {language : {
|
|
key : value for key, value in sentences.items()
|
|
} for language, sentences in self.DEFAULT_SENTENCES.items()}
|
|
|
|
self.__default_language:str = self.get("default_language", inputs, "espanol")
|
|
self.__language:str = self.get("language", inputs, self.__default_language)
|
|
|
|
self.__host:str = "localhost"
|
|
self.__port:int = 11434
|
|
self.__api_url:str = "http://{host}:{port}/api/generate"
|
|
|
|
self.__index:dict[str, str] = {}
|
|
self.__requests_i:int = 0
|
|
self.__requests_order:list[int] = []
|
|
self.__requests:dict[int, dict[str, Any|None]] = {}
|
|
self.__web_socket_server:WebSocketServer
|
|
self.__web_socket_host:str = "localhost"
|
|
self.__web_socket_port:int = 18001
|
|
self.__web_socket_server_thread:Thread = Thread(target = self.__run_web_socket_server)
|
|
self.__requests_thread:Thread = Thread(target = self.__run_orders)
|
|
self.__requests_sleep:float = 0.1
|
|
|
|
self.__titles_model:str = "gemma3:1b"
|
|
self.__titles_temperature:float = 0.0
|
|
self.__titles_prompt:str = self.RE_NEW_LINES.sub("\\n", self.load_file("/TXT/AIChat.titles-prompt.md"))
|
|
self.__response_model:str = "gemma3:1b"
|
|
self.__response_temperature:float = 7.0
|
|
self.__response_with_titles:str = self.RE_NEW_LINES.sub("\\n", self.load_file("/TXT/AIChat.response-with-titles.md"))
|
|
|
|
self.__http_server:HTTPServer
|
|
self.__http_server_thread:Thread = Thread(target = self.__run_http_server)
|
|
self.__http_host:str = ""
|
|
self.__http_port:int = 18000
|
|
self.__http_directories:list[str] = ["/Public"]
|
|
self.__http_indexes:list[str] = ["", "index.html", "index.htm", "index.md", "index.txt"]
|
|
self.__terminal_thread:Thread = Thread(target = self.__terminal)
|
|
self.__web_sockets_clients:list[WebSocketClient] = []
|
|
|
|
self.update()
|
|
|
|
self.__terminal_thread.start()
|
|
self.__http_server_thread.start()
|
|
self.__web_socket_server_thread.start()
|
|
self.__requests_thread.start()
|
|
|
|
def close(self:Self) -> None:
|
|
|
|
client:WebSocketClient
|
|
|
|
self.__working = False
|
|
|
|
for client in self.__web_sockets_clients:
|
|
try:
|
|
client.close()
|
|
except Exception as exception:
|
|
pass
|
|
self.__web_socket_server.shutdown()
|
|
self.__web_socket_server_thread.join()
|
|
|
|
self.__http_server.shutdown()
|
|
|
|
def update(self:Self) -> None:
|
|
|
|
titles_promp_file:str
|
|
response_with_titles_prompt_file:str
|
|
key:str
|
|
|
|
for key in ("default_settings_files", "settings_files", "default_settings", "settings"):
|
|
self.add_settings(self.get(key), True)
|
|
for key in ("default_secrets_files", "secrets_files", "default_secrets", "secrets"):
|
|
self.add_secrets(self.get(key), True)
|
|
for key in ("default_i18n_files", "i18n_files", "default_i18n", "i18n"):
|
|
self.add_i18n(self.get(key), True)
|
|
for key in (
|
|
"default_index_files", "index_files", "default_index", "index",
|
|
"default_secrets_index_files", "secrets_index_files", "default_secrets_index", "secrets_index"
|
|
):
|
|
self.add_index(self.get(key), True)
|
|
|
|
titles_promp_file = self.get("titles_prompt_file")
|
|
response_with_titles_prompt_file = self.get("response_with_titles_prompt_file")
|
|
|
|
self.__default_language = self.get("default_language", None, self.__default_language)
|
|
self.__language = self.get("language", None, self.__language)
|
|
|
|
self.__host = self.get("host", None, self.__host)
|
|
self.__port = self.get("port", None, self.__port)
|
|
self.__api_url = self.get("api_url", None, self.__api_url)
|
|
|
|
self.__web_socket_host = self.get("web_socket_host", None, self.__web_socket_host)
|
|
self.__web_socket_port = self.get("web_socket_port", None, self.__web_socket_port)
|
|
self.__requests_sleep = self.get("requests_sleep", None, self.__requests_sleep)
|
|
|
|
self.__titles_model = self.get("titles_model", None, self.__http_host)
|
|
self.__titles_temperature = self.get("titles_temperature", None, self.__http_host)
|
|
if titles_promp_file is not None:
|
|
self.__titles_prompt = self.RE_NEW_LINES.sub("\\n", self.load_file(titles_promp_file))
|
|
self.__response_model = self.get("responses_model", None, self.__http_host)
|
|
self.__response_temperature = self.get("responses_temperature", None, self.__http_host)
|
|
if response_with_titles_prompt_file is not None:
|
|
self.__response_with_titles = self.RE_NEW_LINES.sub("\\n", self.load_file(response_with_titles_prompt_file))
|
|
|
|
self.__http_host = self.get("http_host", None, self.__http_host)
|
|
self.__http_port = self.get("http_port", None, self.__http_port)
|
|
self.__http_directories = self.get("http_indexes", None, self.__http_directories)
|
|
self.__http_indexes = self.get("http_indexes", None, self.__http_indexes)
|
|
|
|
def reset(self:Self) -> None:
|
|
self.__settings.clear()
|
|
self.__secrets.clear()
|
|
self.__sentences.clear()
|
|
self.__index.clear()
|
|
self.update()
|
|
|
|
##########################################################################################################
|
|
##########################################################################################################
|
|
##########################################################################################################
|
|
|
|
def get(self:Self,
|
|
keys:str|Sequence[str],
|
|
inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None,
|
|
default:Optional[Any|None] = None
|
|
) -> Any|None:
|
|
return self.get_value(keys, (
|
|
inputs, self.__inputs, self.__secrets, self.__settings, self.DEFAULT_SETTINGS
|
|
), default)
|
|
|
|
def __get_sentence(self:Self, texts:str|Sequence[str], custom_language:Optional[str] = None) -> str|Sequence[str]:
|
|
|
|
language:str
|
|
done:list[str] = []
|
|
keys:list[str] = self.get_keys(texts)
|
|
|
|
for language in [
|
|
custom_language, self.__language, self.__default_language
|
|
] + list(self.__sentences.keys()):
|
|
if language not in done and language in self.__sentences:
|
|
|
|
key:str
|
|
|
|
done.append(language)
|
|
|
|
for key in keys:
|
|
if key in self.__sentences[language]:
|
|
return self.__sentences[language][key]
|
|
return self.get_texts(texts)[0]
|
|
|
|
def i18n(self:Self,
|
|
texts:str|Sequence[str],
|
|
inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None,
|
|
language:Optional[str] = None
|
|
) -> str:
|
|
|
|
text:str|Sequence[str] = self.__get_sentence(texts, language)
|
|
|
|
return self.string_variables("".join(text) if isinstance(text, (list, tuple)) else str(text), inputs)
|
|
|
|
def add_settings(self:Self, inputs:Any|None, overwrite:bool = False) -> None:
|
|
|
|
subinputs:dict[str, Any|None]
|
|
|
|
for subinputs in self.get_dictionaries(inputs, True):
|
|
|
|
key:str
|
|
value:Any|None
|
|
|
|
for key, value in subinputs.items():
|
|
if overwrite or key not in self.__settings:
|
|
self.__settings[key] = value
|
|
|
|
def add_secrets(self:Self, inputs:Any|None, overwrite:bool = False) -> None:
|
|
|
|
subinputs:dict[str, Any|None]
|
|
|
|
for subinputs in self.get_dictionaries(inputs, True):
|
|
|
|
key:str
|
|
value:Any|None
|
|
|
|
for key, value in subinputs.items():
|
|
if overwrite or key not in self.__secrets:
|
|
self.__secrets[key] = value
|
|
|
|
def add_i18n(self:Self, inputs:dict[str, Any|None], overwrite:bool = False) -> None:
|
|
|
|
subinputs:dict[str, Any|None]
|
|
|
|
for subinputs in self.get_dictionaries(inputs, True):
|
|
|
|
language:str
|
|
sentences:dict[str, str|Sequence[str]]
|
|
|
|
for language, sentences in subinputs.items():
|
|
if language not in self.__sentences:
|
|
self.__sentences[language] = {}
|
|
if overwrite or language not in self.__sentences:
|
|
|
|
key:str
|
|
value:str|Sequence[str]
|
|
|
|
for key, value in sentences.items():
|
|
if overwrite or key not in self.__sentences[language]:
|
|
self.__sentences[language][key] = value
|
|
|
|
def add_index(self:Self, inputs:dict[str, str], overwrite:bool = False, subgroup:Optional[dict[str, str|dict]] = None) -> None:
|
|
|
|
group:dict[str, Any|None]
|
|
|
|
if subgroup is None:
|
|
subgroup = self.__index
|
|
|
|
for group in self.get_dictionaries(inputs, True):
|
|
|
|
key:str
|
|
value:str|dict
|
|
|
|
for key, value in group.items():
|
|
if isinstance(value, str):
|
|
if overwrite or key not in subgroup:
|
|
subgroup[key] = value
|
|
elif isinstance(value, dict):
|
|
if key not in subgroup:
|
|
subgroup[key] = {}
|
|
self.add_index(value, overwrite, subgroup[key])
|
|
|
|
##########################################################################################################
|
|
##########################################################################################################
|
|
##########################################################################################################
|
|
|
|
def __terminal(self:Self) -> None:
|
|
while self.__working:
|
|
try:
|
|
|
|
command:str = input()
|
|
|
|
if command in ("exit", "close", "quit", "stop", "bye"):
|
|
self.close()
|
|
elif command in ("update",):
|
|
self.update()
|
|
elif command in ("reset",):
|
|
self.reset()
|
|
elif command in ("help", "h", "?"):
|
|
print(self.i18n("commands_help"))
|
|
else:
|
|
print(self.i18n("unknown_command", {"command" : command}))
|
|
|
|
except Exception as exception:
|
|
print(exception)
|
|
|
|
##########################################################################################################
|
|
##########################################################################################################
|
|
##########################################################################################################
|
|
|
|
def __run_web_socket_server(self:Self) -> None:
|
|
with web_socket_server_serve(self.__web_socket_handler, self.__web_socket_host, self.__web_socket_port) as self.__web_socket_server:
|
|
self.__web_socket_server.serve_forever()
|
|
|
|
def __web_socket_handler(self:Self, client:WebSocketClient) -> None:
|
|
if client not in self.__web_sockets_clients:
|
|
self.__web_sockets_clients.append(client)
|
|
try:
|
|
for message in client:
|
|
try:
|
|
self.__requests_i += 1
|
|
|
|
i:int = self.__requests_i
|
|
new_order:dict[str, Any|None] = {
|
|
**json_decode(message),
|
|
"__client__" : client,
|
|
"__i__" : i
|
|
}
|
|
|
|
print(new_order)
|
|
|
|
if new_order["action"] == "cancel":
|
|
self.__remove_order(new_order["i"])
|
|
continue
|
|
|
|
i:int
|
|
order:dict[str, Any|None]
|
|
|
|
for i, order in self.__requests.items():
|
|
if new_order["action"] == order["action"] and order["__client__"] == client:
|
|
self.__remove_order(i)
|
|
|
|
self.__requests[i] = new_order
|
|
self.__requests_order.append(i)
|
|
|
|
except Exception as exception:
|
|
print(exception)
|
|
except Exception as exception:
|
|
print(exception)
|
|
self.__web_sockets_clients.remove(client)
|
|
print("Client disconnected")
|
|
|
|
def __run_orders(self:Self) -> None:
|
|
while self.__working:
|
|
if len(self.__requests_order):
|
|
|
|
order:dict[Any|None] = self.__requests[self.__requests_order[0]]
|
|
|
|
if order["action"] == "new_message":
|
|
Thread(target = lambda:self.send_to_ai(order["message"], lambda chunk:order["__client__"].send(json_encode({
|
|
"order" : order["__i__"],
|
|
"key" : order["key"],
|
|
"chunk" : json_decode(chunk)
|
|
})), lambda:self.__remove_order_and_continue(order["__i__"]), order)).start()
|
|
else:
|
|
Thread(target = lambda:self.__remove_order_and_continue(order["__i__"])).start()
|
|
break
|
|
|
|
sleep(0.1)
|
|
|
|
def __remove_order(self:Self, i:int) -> None:
|
|
print("PASA REMOVE")
|
|
if i in self.__requests:
|
|
del self.__requests[i]
|
|
if i in self.__requests_order:
|
|
self.__requests_order.remove(i)
|
|
|
|
def __remove_order_and_continue(self:Self, i:int) -> None:
|
|
print("PASA REMOVE AND CONTINUE")
|
|
self.__remove_order(i)
|
|
sleep(0.1)
|
|
self.__run_orders()
|
|
|
|
##########################################################################################################
|
|
##########################################################################################################
|
|
##########################################################################################################
|
|
|
|
def __run_http_server(self:Self) -> None:
|
|
self.__http_server = HTTPServer((self.__http_host, self.__http_port), self.HTTPRequestHandler)
|
|
self.__http_server.aichat = self
|
|
self.__http_server.serve_forever()
|
|
|
|
def get_http_indexes(self:Self) -> list[str]:
|
|
return [*self.__http_indexes]
|
|
|
|
def get_http_directories(self:Self) -> list[str]:
|
|
return [*self.__http_directories]
|
|
|
|
class HTTPRequestHandler(BaseHTTPRequestHandler):
|
|
|
|
def do_GET(self:Self) -> None:
|
|
|
|
aichat:AIChat = self.server.aichat
|
|
data:bytes|None = None
|
|
directory:str
|
|
|
|
for directory in aichat.get_http_directories():
|
|
|
|
path:str = directory + self.path
|
|
index:str
|
|
|
|
for index in aichat.get_http_indexes():
|
|
if (data := aichat.load_file(path + ("/" + index if index else ""), "rb")) is not None:
|
|
break
|
|
|
|
if data is None:
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
else:
|
|
|
|
mime:str = (read_mime_types(path) or ("application/octet-stream",))[0]
|
|
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", mime)
|
|
self.send_header("Content-Length", str(len(data)))
|
|
self.end_headers()
|
|
|
|
self.wfile.write(data)
|
|
|
|
def do_POST(self:Self) -> None:
|
|
|
|
path:str = self.path
|
|
|
|
if path == "/api/send":
|
|
pass
|
|
else:
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
|
|
##########################################################################################################
|
|
##########################################################################################################
|
|
##########################################################################################################
|
|
|
|
def get_titles(self:Self, message:str) -> list[str]:
|
|
try:
|
|
|
|
response:Response
|
|
|
|
with Post(self.string_variables(self.__api_url, {
|
|
"host" : self.__host,
|
|
"port" : self.__port
|
|
}), json = {
|
|
"model" : self.__titles_model,
|
|
"prompt" : self.string_variables(self.__titles_prompt, {"message" : message}),
|
|
"format" : "json",
|
|
"stream" : False,
|
|
"options" : {
|
|
"temperature" : self.__titles_temperature
|
|
}
|
|
}) as response:
|
|
if response.status_code == 200:
|
|
return response.json().get("titles")
|
|
except Exception as exception:
|
|
pass
|
|
return []
|
|
|
|
def send_to_ai(self:Self,
|
|
message:str,
|
|
each_callback:Callable[[bytes], None],
|
|
end_callback:Optional[Callable[[], None]] = None,
|
|
order:Optional[dict[str, Any|None]] = None
|
|
) -> None:
|
|
|
|
# titles:list[str] = self.get_titles(message)
|
|
titles:list[str] = []
|
|
|
|
print([titles, message])
|
|
|
|
try:
|
|
|
|
response:Response
|
|
|
|
print(self.string_variables(self.__response_with_titles, {
|
|
"message" : message,
|
|
"guides" : "\n\n".join("## " + title + "\n\n" + self.load_file(self.__index[title]) for title in titles)
|
|
}) if len(titles) else message)
|
|
|
|
with Post(self.string_variables(self.__api_url, {
|
|
"host" : self.__host,
|
|
"port" : self.__port
|
|
}), json = {
|
|
"model" : self.__response_model,
|
|
"prompt" : self.string_variables(self.__response_with_titles, {
|
|
"message" : message,
|
|
"guides" : "\n\n".join("## " + title + "\n\n" + self.load_file(self.__index[title]) for title in titles)
|
|
}) if len(titles) else message,
|
|
"stream" : True,
|
|
"options" : {
|
|
"temperature" : self.__response_temperature
|
|
}
|
|
}, stream = True) as response:
|
|
|
|
line:bytes
|
|
|
|
for line in response.iter_lines():
|
|
each_callback(line)
|
|
# print(["LINE", line, order is not None and order["__i__"] not in self.__requests])
|
|
# if order is not None and order["__i__"] not in self.__requests:
|
|
# break
|
|
# if line:
|
|
# print("EACH START")
|
|
# each_callback(line)
|
|
# print("EACH END")
|
|
# print(json_decode(line).get("done", False))
|
|
# if json_decode(line).get("done", False):
|
|
# print("BREAK")
|
|
# end_callback and end_callback()
|
|
# break
|
|
|
|
except Exception as exception:
|
|
print(exception)
|
|
|
|
##########################################################################################################
|
|
##########################################################################################################
|
|
##########################################################################################################
|
|
|
|
@classmethod
|
|
def get_keys(cls:type[Self], *items:list[Any|None]) -> list[str]:
|
|
|
|
keys:list[str] = []
|
|
item:Any|None
|
|
|
|
for item in items:
|
|
if isinstance(item, str):
|
|
item not in keys and cls.RE_KEY.match(item) and keys.append(item)
|
|
elif isinstance(item, list):
|
|
|
|
key:str
|
|
|
|
for key in cls.get_keys(*item):
|
|
key not in keys and keys.append(key)
|
|
|
|
return keys
|
|
|
|
@classmethod
|
|
def get_texts(cls:type[Self], *items:list[Any|None]) -> list[str]:
|
|
|
|
texts:list[str] = []
|
|
item:Any|None
|
|
|
|
for item in items:
|
|
if isinstance(item, str):
|
|
texts.append(item)
|
|
elif isinstance(item, (list, tuple)):
|
|
|
|
text:str
|
|
|
|
for text in cls.get_texts(*item):
|
|
texts.append(text)
|
|
|
|
return texts
|
|
|
|
@classmethod
|
|
def get_dictionaries(cls:type[Self], *items:list[Any|None]) -> list[dict[str, Any|None]]:
|
|
|
|
dictionaries:list[dict[str, Any|None]] = []
|
|
item:Any|None
|
|
|
|
for item in items:
|
|
if isinstance(item, dict):
|
|
dictionaries.append(item)
|
|
elif isinstance(item, (list, tuple)):
|
|
|
|
dictionary:dict[str, Any|None]
|
|
|
|
for dictionary in cls.get_dictionaries(*item):
|
|
dictionaries.append(dictionary)
|
|
|
|
return dictionaries
|
|
|
|
@classmethod
|
|
def get_dictionary(cls:type[Self], inputs:Any|None, overwrite:bool = False) -> dict[str, Any|None]:
|
|
|
|
dictionary:dict[str, Any|None] = {}
|
|
|
|
if isinstance(inputs, dict):
|
|
|
|
key:str
|
|
value:Any|None
|
|
|
|
for key, value in inputs.items():
|
|
if overwrite or key not in dictionary:
|
|
dictionary[key] = value
|
|
|
|
elif isinstance(inputs, (list, tuple)):
|
|
|
|
subinputs:dict[str, Any|None]
|
|
|
|
for subinputs in inputs:
|
|
|
|
key:str
|
|
value:Any|None
|
|
|
|
for key, value in cls.get_dictionaries(subinputs).items():
|
|
if overwrite or key not in dictionary:
|
|
dictionary[key] = value
|
|
|
|
return dictionary
|
|
|
|
@classmethod
|
|
def get_value(cls:type[Self],
|
|
keys:str|Sequence[str],
|
|
inputs:dict[str, Any|None]|Sequence[Any|None],
|
|
default:Optional[Any|None] = None
|
|
) -> Any|None:
|
|
if len(keys := cls.get_keys(keys)):
|
|
|
|
subinputs:dict[str, Any|None]
|
|
|
|
for subinputs in cls.get_dictionaries(inputs):
|
|
|
|
key:str
|
|
|
|
for key in keys:
|
|
if key in subinputs:
|
|
return subinputs[key]
|
|
return default
|
|
|
|
@classmethod
|
|
def string_variables(cls:type[Self],
|
|
string:str,
|
|
inputs:dict[str, Any|None]|Sequence[Any|None],
|
|
default:Optional[str] = None
|
|
) -> str:
|
|
|
|
inputs = cls.get_dictionary(inputs)
|
|
|
|
def replace(matches:REMatch) -> str:
|
|
|
|
key:str = matches.group(1)
|
|
|
|
return (
|
|
str(inputs[key]) if key in inputs else
|
|
str(default) if default is not None else
|
|
matches.groups(0))
|
|
|
|
return cls.RE_STRING_VARIABLES.sub(replace, string)
|
|
|
|
@classmethod
|
|
def fix_path(cls:type[Self], path:str) -> str:
|
|
path = cls.RE_SLASHES.sub(cls.SLASH, path)
|
|
# return "\\" + path if path[0] == "\\" else path
|
|
return path
|
|
|
|
@classmethod
|
|
def get_absolute_path(cls:type[Self], path:str) -> str|None:
|
|
|
|
root:str
|
|
|
|
for root in cls.ROOTS_PATH:
|
|
|
|
absolute_path:str = cls.fix_path((root + cls.SLASH if root else "") + path)
|
|
|
|
if path_exists(absolute_path):
|
|
return absolute_path
|
|
return None
|
|
|
|
@classmethod
|
|
def load_file(cls:type[Self], path:str, mode:str = "r") -> str|bytes|None:
|
|
try:
|
|
|
|
file:FileIO
|
|
|
|
if mode == "r":
|
|
for format in ("utf8", "cp1252", "cp850"):
|
|
try:
|
|
with open(cls.get_absolute_path(path), mode, encoding = format) as file:
|
|
return file.read()
|
|
except:
|
|
pass
|
|
elif mode == "rb":
|
|
with open(cls.get_absolute_path(path), mode) as file:
|
|
return file.read()
|
|
except Exception as exception:
|
|
# print(exception)
|
|
pass
|
|
return None
|
|
|
|
@classmethod
|
|
def load_json(cls:type[Self],
|
|
data:str|dict[str, Any|None]|Sequence[Any|None],
|
|
only_dictionaries:bool = True
|
|
) -> list[dict[str, Any|None]|Sequence[Any|None]]:
|
|
|
|
json:list[dict[str, Any|None]|Sequence[Any|None]] = []
|
|
|
|
if isinstance(data, str):
|
|
|
|
subdata:dict[str, Any|None]|Sequence[Any|None]|None
|
|
|
|
try:
|
|
if subdata := json_decode(data):
|
|
json.extend(cls.load_json(subdata, only_dictionaries))
|
|
return
|
|
except Exception as exception:
|
|
pass
|
|
|
|
json.extends(cls.load_json(json_decode(cls.load_file(cls.fix_path(data), "r"))))
|
|
|
|
elif isinstance(data, (list, tuple)):
|
|
if only_dictionaries:
|
|
|
|
item:Any|None
|
|
|
|
for item in data:
|
|
json.extend(cls.load_json(item, only_dictionaries))
|
|
else:
|
|
json.append(data)
|
|
|
|
aichat:AIChat = AIChat() |