738 lines
26 KiB
Python
738 lines
26 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import datetime
|
|
from typing import Any, Optional, Sequence, Self, Callable
|
|
from re import compile as re_compile, Pattern as REPattern, Match as REMatch, IGNORECASE as RE_IGNORE_CASE
|
|
from pyodbc import connect as pyodbc_connect
|
|
from socket import socket as Socket, AF_INET as ADDRESS_FAMILY_IPV4, SOCK_STREAM as SOCKET_STREAM, SOL_SOCKET as SOCKET_LAYER, SO_REUSEADDR as SOCKET_REUSE_ADDRESS
|
|
from inspect import stack as get_stack, FrameInfo
|
|
from traceback import format_stack as trace_format_stack, extract_tb as extract_traceback
|
|
from threading import Thread
|
|
from os.path import exists as path_exists, dirname as directory_name, abspath as absolute_path
|
|
from json import loads as json_decode, dumps as json_encode
|
|
from mimetypes import guess_type as get_mime_by_extension
|
|
|
|
class NucelarMonitor:
|
|
|
|
DEFAULT_SETTINGS:dict[str, Any|None] = {
|
|
"autostart" : True,
|
|
"print_format" : "[{type}] {yyyy}{mm}{dd} {hh}{ii}{ss} [{line}]{file}({method}): {message}",
|
|
"exception_format" : " '[{line}]{file}({method})'{lines}\n\n{exception_message}",
|
|
"print_types" : [
|
|
["unkn", "unknown"],
|
|
["info", "information"],
|
|
["warn", "warning"],
|
|
["erro", "error", "wrong", "failure", "fail", "no"],
|
|
["exce", "exception", "except"],
|
|
[" ok ", "ok", "success", "succeed", "yes"],
|
|
["test", "debug"]
|
|
],
|
|
"http_host" : "0.0.0.0",
|
|
"http_port" : 13000,
|
|
"http_cache_size" : 1024,
|
|
"http_maximum_connections" : 5,
|
|
"http_header_response" : (
|
|
"{http_protocol}/{http_version} {http_code} {http_message}\r\n" +
|
|
"Content-Type: {mime}\r\n" +
|
|
"Content-Length: {length}\r\n" +
|
|
"\r\n"
|
|
),
|
|
"http_protocol" : "HTTP",
|
|
"http_version" : "1.1",
|
|
"http_code" : 200,
|
|
"http_message" : "OK",
|
|
"http_encoder" : "utf-8",
|
|
"index_files" : ("index.html", "index.htm"),
|
|
"sql_host" : "127.0.0.1",
|
|
"sql_port" : 1433,
|
|
"sql_user" : "sa",
|
|
"sql_password" : "password",
|
|
"sql_database" : "NucelarMonitor",
|
|
"default_controllers" : {
|
|
"get" : {
|
|
"/" : "/Public"
|
|
},
|
|
"post" : {
|
|
"/debian/{key}" : "debian"
|
|
}
|
|
},
|
|
"default_connections" : {}
|
|
}
|
|
DEFAULT_I18N:dict[str, dict[str, str|Sequence[str]]] = {
|
|
"english" : {}
|
|
}
|
|
ROOT:str = directory_name(absolute_path(__file__))
|
|
SLASH:str = "/" if "/" in ROOT else "\\\\"
|
|
SPECIAL_REGULAR_EXPRESSION_CHARACTERS:dict[str, str] = {
|
|
"\r" : "r",
|
|
"\n" : "n",
|
|
"\t" : "t"
|
|
}
|
|
|
|
RE_KEY:REPattern = re_compile(r'^[a-z_][a-z0-9_]*$', RE_IGNORE_CASE)
|
|
RE_STRING_VARIABLE:REPattern = re_compile(r'\{([a-z_][a-z0-9_]*)\}', RE_IGNORE_CASE)
|
|
RE_EXCEPTION:REPattern = re_compile(r'^\s*File "([^"]+)", line ([0-9]+), in ([^\n]+)(.*|[\r\n]*)*$')
|
|
RE_NEW_LINE:REPattern = re_compile(r'\r\n|[\r\n]')
|
|
RE_TO_SNAKE:REPattern = re_compile(r'[^a-zA-Z0-9]*([A-Z][A-Z0-9]*)|[^a-z0-9]+')
|
|
RE_HTTP_REQUEST:REPattern = re_compile(r'^([^\s]+)\s([^\s\?\#]+)(?:\?([^#]+))?(?:\#[^\s]+)?\s([^\/]+)\/([0-9\.]+)$')
|
|
RE_HEADER_LINE:REPattern = re_compile(r'^([^\:]+)\:(.+)$')
|
|
RE_HTTP_BLOCKS:REPattern = re_compile(r'((?:(?!(?:(?:\r\n){2}|\n{2}|\r{2}))(?:.|[\r\n]+))+)(?:(?:(?:\r\n){2}|\n{2}|\r{2})((?:.+|[\r\n]+)*))?')
|
|
RE_LAST_DIRECTORY:REPattern = re_compile(r'^(.*)[\/][^\/]*\/?$')
|
|
RE_SLASHES:REPattern = re_compile(r'[\\\/]+')
|
|
RE_TO_REGULAR_EXPRESSION:REPattern = re_compile(r'[\(\)\{\}\/\\\.\-\+\*\^\$\?\|\!\<\>\r\n\t]')
|
|
RE_ROUTE_KEY:REPattern = re_compile(r'\\\{([a-z_][a-z0-9_]*)\\\}', RE_IGNORE_CASE)
|
|
|
|
class Request:
|
|
|
|
def __init__(self:Self, data:bytes, encoder:str = "utf-8") -> None:
|
|
|
|
self.method:str
|
|
self.request:str
|
|
self.value_get:str|None
|
|
self.variables_get:dict[str, str]
|
|
self.variables_post:dict[str, str]
|
|
self.protocol:str
|
|
self.protocol_version:str
|
|
self.body:str
|
|
self.variables_uri:dict[str, str] = {}
|
|
|
|
header, body = (lambda header, body:(
|
|
NucelarMonitor.RE_NEW_LINE.split(str(header).strip()), body
|
|
))(*NucelarMonitor.RE_HTTP_BLOCKS.match(data.decode(encoder)).groups())
|
|
|
|
(
|
|
self.method,
|
|
self.request,
|
|
self.value_get,
|
|
self.variables_get,
|
|
self.protocol,
|
|
self.protocol_version
|
|
) = (lambda method, request, variables, protocol, protocol_version:(
|
|
str(method).lower(),
|
|
request,
|
|
variables,
|
|
self.parse_variables(variables),
|
|
protocol,
|
|
protocol_version
|
|
))(*NucelarMonitor.RE_HTTP_REQUEST.match(header[0]).groups())
|
|
self.body = body
|
|
self.variables_post = self.parse_variables(body)
|
|
|
|
def set_uri_variables(self:Self, keys:list[str], matches:REMatch) -> None:
|
|
|
|
i:int
|
|
value:str
|
|
|
|
for i, value in enumerate(matches.groups()):
|
|
self.variables_uri[keys[i]] = value
|
|
|
|
def get(self:Self, keys:str|Sequence[str], default:Optional[Any] = None) -> Any|None:
|
|
return NucelarMonitor.get_value(keys, (
|
|
self.variables_uri, self.variables_get, self.variables_post
|
|
), default)
|
|
|
|
@classmethod
|
|
def parse_variables(cls:type[Self], string:Optional[str]) -> dict[str, str]:
|
|
if not string:
|
|
return {}
|
|
|
|
variables:dict[str, str] = {}
|
|
pair:str
|
|
|
|
for pair in string.split("&"):
|
|
if "=" in pair:
|
|
key, value = pair.split("=", 1)
|
|
variables[cls.to_snake(key)] = value
|
|
else:
|
|
variables[cls.to_snake(pair)] = ""
|
|
|
|
return variables
|
|
|
|
@staticmethod
|
|
def to_snake(string:str) -> str:
|
|
|
|
def callback(matches:REMatch) -> str:
|
|
|
|
upper:str|None = matches.group(1)
|
|
|
|
return "_" + upper.lower() if upper else "_"
|
|
|
|
return NucelarMonitor.RE_TO_SNAKE.sub(callback, string).lower()
|
|
|
|
class Response:
|
|
|
|
def __init__(self:Self,
|
|
nucelar_monitor:type[Self],
|
|
response:Optional[Any] = None,
|
|
mime:Optional[str] = None,
|
|
code:Optional[int] = None,
|
|
message:Optional[str] = None
|
|
) -> None:
|
|
|
|
default_code:int
|
|
default_message:str
|
|
|
|
self.nucelar_monitor:NucelarMonitor = nucelar_monitor
|
|
default_code, default_message = self.nucelar_monitor.get_http_default_code()
|
|
self.body:bytes = b""
|
|
self.mime:str
|
|
self.code:str = code or default_code
|
|
self.message:str = default_message if message is None else message
|
|
|
|
self.set_data(response, mime)
|
|
|
|
def set_data(self:Self, data:Any|None, mime:Optional[str] = None) -> None:
|
|
if isinstance(data, bytes):
|
|
self.body = data
|
|
self.mime = mime or "application/octet-stream"
|
|
elif isinstance(data, str):
|
|
self.body = data.encode(self.nucelar_monitor.get_encoder())
|
|
self.mime = mime or "text/plain;charset=" + self.nucelar_monitor.get_encoder()
|
|
elif isinstance(data, (dict, tuple, list)):
|
|
self.body = json_encode(data).encode(self.nucelar_monitor.get_encoder())
|
|
self.mime = mime or "application/json;charset=" + self.nucelar_monitor.get_encoder()
|
|
else:
|
|
self.body = str(data).encode(self.nucelar_monitor.get_encoder())
|
|
self.mime = mime or "text/plain;charset=" + self.nucelar_monitor.get_encoder()
|
|
|
|
def get_parameters(self:Self) -> dict[str, Any|None]:
|
|
return {
|
|
"http_code" : self.code,
|
|
"http_message" : self.message,
|
|
"mime" : self.mime,
|
|
"length" : len(self.body)
|
|
}
|
|
|
|
def __init__(self:Self, inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None) -> None:
|
|
|
|
key:str
|
|
|
|
self.__inputs:dict[str, Any|None] = self.get_dictionary(inputs)
|
|
self.__sentences:dict[str, dict[str, str|Sequence[str]]] = self.DEFAULT_I18N
|
|
self.__language:str = self.get("language", None, "english")
|
|
self.__print_format:str = self.get("print_format")
|
|
self.__print_types:list[list[str]] = self.DEFAULT_SETTINGS["print_types"]
|
|
self.__exception_format:str = self.get("exception_format")
|
|
self.__controllers:dict[str, list[tuple[
|
|
REPattern,
|
|
tuple[tuple[str, ...]],
|
|
Callable[[NucelarMonitor.Request, NucelarMonitor.Response], None]|None,
|
|
str|None]
|
|
]] = {}
|
|
self.__http_host:str = self.get("http_host")
|
|
self.__http_port:int = self.get("http_port")
|
|
self.__http_server:Socket
|
|
self.__http_buffer_size:int = self.get("http_cache_size")
|
|
self.__http_header_response:str = self.get("http_header_response")
|
|
self.__http_protocol:str = self.get("http_protocol")
|
|
self.__http_version:str = self.get("http_version")
|
|
self.__http_code:int = self.get("http_code")
|
|
self.__http_message:str = self.get("http_message")
|
|
self.__http_encoder:str = self.get("http_encoder")
|
|
self.__index_files:tuple[str, ...] = tuple(self.get("index_files"))
|
|
self.__started:bool = False
|
|
self.__working:bool = False
|
|
self.__root_paths:list[str] = ["", self.ROOT]
|
|
|
|
for _ in range(2):
|
|
self.__root_paths.append(self.RE_LAST_DIRECTORY.sub(r'\1', self.__root_paths[-1]))
|
|
|
|
for key in ("default_controllers", "controllers"):
|
|
self.add_controllers(self.get(key))
|
|
|
|
self.get("autostart") and self.start()
|
|
|
|
def start(self:Self) -> None:
|
|
if self.__started:
|
|
return
|
|
self.__started = True
|
|
|
|
self.__http_server = Socket(ADDRESS_FAMILY_IPV4, SOCKET_STREAM)
|
|
self.__working = True
|
|
|
|
try:
|
|
|
|
self.__http_server.setsockopt(SOCKET_LAYER, SOCKET_REUSE_ADDRESS, 1)
|
|
self.__http_server.bind((self.__http_host, self.__http_port))
|
|
self.__http_server.listen(self.get("http_maximum_connections"))
|
|
|
|
Thread(target = self.__listen).start()
|
|
|
|
except Exception as exception:
|
|
self.exception(exception, "http_server_start_exception", {
|
|
"host" : self.__http_host,
|
|
"port" : self.__http_port,
|
|
})
|
|
self.close()
|
|
|
|
def close(self:Self) -> None:
|
|
if not self.__started:
|
|
return
|
|
|
|
self.__started = False
|
|
self.__working = False
|
|
|
|
try:
|
|
self.__http_server.close()
|
|
except Exception as exception:
|
|
self.exception(exception, "http_server_close_exception", {
|
|
"host" : self.__http_host,
|
|
"port" : self.__http_port,
|
|
})
|
|
|
|
def get_print_type(self:Self, _type:str) -> str:
|
|
|
|
group:list[str]
|
|
|
|
for group in self.__print_types:
|
|
if _type in group:
|
|
return group[0].upper()
|
|
return self.__print_types[0][0].upper()
|
|
|
|
def print(self:Self,
|
|
_type:str,
|
|
message:str|Sequence[str],
|
|
inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None,
|
|
i:int = 0
|
|
) -> None:
|
|
|
|
date:datetime = datetime.datetime.now()
|
|
own:dict[str, Any|None] = {
|
|
"raw_type" : _type,
|
|
"type" : self.get_print_type(_type),
|
|
"i18n" : self.get_texts(message),
|
|
"message" : self.i18n(message, inputs),
|
|
**self.get_dictionary(inputs),
|
|
**self.get_action_data(i + 1)
|
|
}
|
|
|
|
for key in ("year", "month", "day", "hour", "minute", "second"):
|
|
|
|
k:str = "i" if key == "minute" else key[0]
|
|
|
|
own[k] = own[key] = getattr(date, key)
|
|
own[k + k] = ("00" + str(own[key]))[-2:]
|
|
|
|
own["yyyy"] = own["year"]
|
|
|
|
print(self.string_variables(self.__print_format, own) + (own["end"] if "end" in own else ""))
|
|
|
|
def exception(self:Self,
|
|
exception:Exception,
|
|
message:str|Sequence[str],
|
|
inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None,
|
|
i:int = 0
|
|
) -> None:
|
|
|
|
lines:list[str] = extract_traceback(exception.__traceback__).format()
|
|
matches:REMatch = self.RE_EXCEPTION.match(lines[-1])
|
|
data:dict[str, Any|None] = {
|
|
**self.get_dictionary(inputs),
|
|
"lines" : "",
|
|
"exception_message" : str(exception),
|
|
"method" : matches.group(3),
|
|
"line" : matches.group(2),
|
|
"file" : matches.group(1)
|
|
}
|
|
block:str
|
|
j:int
|
|
|
|
for j, block in enumerate(trace_format_stack()[:-2] + lines):
|
|
if block:
|
|
data["lines"] += "\n " + str(j) + " - " + self.RE_NEW_LINE.split(block.strip())[0]
|
|
|
|
data["end"] = self.string_variables(self.__exception_format, data)
|
|
|
|
message and self.print("exception", message, data, i + 2)
|
|
|
|
@classmethod
|
|
def fix_path(cls:type[Self], path:str) -> str:
|
|
return cls.RE_SLASHES.sub(cls.SLASH, path)
|
|
|
|
def get_absolute_path(self:Self, path:str) -> str|None:
|
|
|
|
root:str
|
|
absolute:str
|
|
|
|
for root in self.__root_paths:
|
|
absolute = self.fix_path(root + '/' + path)
|
|
if path_exists(absolute):
|
|
return absolute
|
|
return None
|
|
|
|
def load_file(self:Self, path:str, mode:str = "r") -> str|bytes|None:
|
|
|
|
path:str = self.get_absolute_path(path)
|
|
|
|
if path:
|
|
with open(path, mode) as file:
|
|
return file.read()
|
|
return None
|
|
|
|
def load_json(self:Self, data:str|dict[str, Any|None]|list[Any|None]) -> dict[str, Any|None]|list[Any|None]|None:
|
|
if isinstance(data, str):
|
|
|
|
json:list[Any|None]|dict[str, Any|None]|None
|
|
|
|
try:
|
|
json = json_decode(data)
|
|
except Exception as exception:
|
|
self.exception(exception, "load_json_exception", {
|
|
"data" : data,
|
|
"length" : len(data)
|
|
})
|
|
|
|
if json:
|
|
return json
|
|
try:
|
|
return json_decode(self.load_file(data))
|
|
except Exception as exception:
|
|
self.exception(exception, "load_json_by_file_exception", {
|
|
"path" : data
|
|
})
|
|
return None
|
|
elif isinstance(data, (dict, list)):
|
|
return data
|
|
return None
|
|
|
|
def get_encoder(self:Self) -> str:
|
|
return self.__http_encoder
|
|
|
|
def get_http_default_code(self:Self) -> tuple[int, str]:
|
|
return self.__http_code, self.__http_message
|
|
|
|
def get(self:Self,
|
|
keys:str|Sequence[str],
|
|
inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None,
|
|
default:Optional[Any] = None
|
|
) -> Any|None:
|
|
return self.get_value(keys, (inputs, self.__inputs, self.DEFAULT_SETTINGS), default)
|
|
|
|
def __get_text(self:Self, strings:str|Sequence[str]) -> str:
|
|
|
|
keys:list[str] = self.get_keys(strings := self.get_list(strings))
|
|
|
|
if len(keys):
|
|
|
|
language:str
|
|
used:list[str] = []
|
|
|
|
for language in [self.__language] + list(self.__sentences.keys()):
|
|
if language not in used and language in self.__sentences:
|
|
|
|
key:str
|
|
|
|
used.append(language)
|
|
for key in keys:
|
|
if key in self.__sentences[language]:
|
|
return self.__sentences[language][key]
|
|
return strings[0]
|
|
|
|
def i18n(self:Self,
|
|
strings:str|Sequence[str],
|
|
inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None
|
|
) -> str:
|
|
return self.string_variables(self.__get_text(strings), inputs)
|
|
|
|
def add_controllers(self:Self,
|
|
inputs:str|dict[str, dict[str, str|Callable[[Request], None]]]|Sequence[Any|None]
|
|
) -> None:
|
|
if isinstance(inputs, dict):
|
|
|
|
method:str
|
|
controllers:dict[str, str|Callable[[NucelarMonitor.Request], None]]
|
|
|
|
for method, controllers in inputs.items():
|
|
if (method := method.lower()) not in self.__controllers:
|
|
self.__controllers[method] = []
|
|
if isinstance(controllers, dict):
|
|
|
|
request:str
|
|
target:str|Callable[[NucelarMonitor.Request], None]
|
|
|
|
for request, target in controllers.items():
|
|
|
|
controller:Callable[[NucelarMonitor.Request, NucelarMonitor.Response], None]|None = None
|
|
path:str|None = None
|
|
|
|
if isinstance(target, str) and (controller := getattr(self, target, None)) is None:
|
|
path = self.get_absolute_path(target)
|
|
|
|
if callable(controller) or path is not None:
|
|
|
|
variables:list[str] = []
|
|
|
|
def callback(matches:REMatch) -> str:
|
|
|
|
variables.append(matches.group(1))
|
|
|
|
return r'([^\/]+)'
|
|
|
|
self.__controllers[method].append((re_compile(r'^' + self.RE_ROUTE_KEY.sub(callback, self.to_regular_expression(
|
|
request[:-1] if request[-1] == "/" else request
|
|
)) + (r'' if path is None else r'(.*)') + r'\/?$'), tuple(variables), controller, path))
|
|
|
|
elif isinstance(inputs, (list, tuple)):
|
|
|
|
subinputs:Any|None
|
|
|
|
for subinputs in inputs:
|
|
self.add_controllers(subinputs)
|
|
|
|
elif isinstance(inputs, str):
|
|
self.add_controllers(self.load_json(inputs))
|
|
|
|
def __listen(self:Self) -> None:
|
|
while self.__working:
|
|
try:
|
|
|
|
client:Socket
|
|
address:str
|
|
port:int
|
|
|
|
client, (address, port) = self.__http_server.accept()
|
|
|
|
Thread(
|
|
target = self.__listen_client,
|
|
args = (client, address, port)
|
|
).start()
|
|
|
|
except Exception as exception:
|
|
self.exception(exception, "http_server_listen_exception", {
|
|
"host" : self.__http_host,
|
|
"port" : self.__http_port,
|
|
})
|
|
|
|
def __listen_client(self:Self, client:Socket, address:str, port:int) -> None:
|
|
|
|
data:bytes = b""
|
|
route:str = ""
|
|
method:str = "UNKN"
|
|
response:NucelarMonitor.Response = NucelarMonitor.Response(self)
|
|
|
|
try:
|
|
|
|
request:NucelarMonitor.Request
|
|
variables:tuple[str, ...]
|
|
controller:Callable[[NucelarMonitor.Request], Any|None]|None
|
|
path:str|None
|
|
response_data:Any|None = None
|
|
pattern:REPattern
|
|
done:bool = False
|
|
|
|
while True:
|
|
|
|
buffer:bytes = client.recv(self.__http_buffer_size)
|
|
|
|
if not buffer:
|
|
break
|
|
data += buffer
|
|
if len(buffer) != self.__http_buffer_size:
|
|
break
|
|
|
|
for pattern, variables, controller, path in self.__controllers[
|
|
method := (request := self.Request(data, self.__http_encoder)).method
|
|
]:
|
|
|
|
matches:REMatch = pattern.match(route := request.request)
|
|
|
|
if matches is not None:
|
|
request.set_uri_variables(variables, matches)
|
|
if done := path is not None:
|
|
for index in self.__index_files:
|
|
|
|
full_path = path + "/" + route + ("" if index == "" else "/" + index)
|
|
|
|
if done := (response_data := self.load_file(full_path, "rb")) is not None:
|
|
response.set_data(
|
|
response_data,
|
|
get_mime_by_extension(full_path)[0] or "application/octet-stream"
|
|
)
|
|
break
|
|
|
|
elif done := controller is not None:
|
|
controller(request, response)
|
|
|
|
if done:
|
|
break
|
|
|
|
|
|
if not done:
|
|
response.body = b"<h1>Not Found</h1>"
|
|
response.mime = "text/html;charset=" + self.__http_encoder
|
|
response.code = "404"
|
|
response.message = "Not Found"
|
|
|
|
client.sendall(self.string_variables(self.__http_header_response, {
|
|
"http_protocol" : self.__http_protocol,
|
|
"http_version" : self.__http_version,
|
|
**response.get_parameters()
|
|
}).encode(self.__http_encoder) + response.body)
|
|
client.close()
|
|
|
|
except Exception as exception:
|
|
self.exception(exception, "http_server_client_exception", {
|
|
"host" : self.__http_host,
|
|
"port" : self.__http_port,
|
|
"client_address" : address,
|
|
"client_port" : port,
|
|
"length" : len(data),
|
|
"method" : method,
|
|
"route" : route,
|
|
"response_length" : len(response.body)
|
|
})
|
|
|
|
def debian(self:Self, request:Request, response:Response) -> None:
|
|
|
|
key:str = request.get("key")
|
|
hostnames:list[str]
|
|
domain:str|None
|
|
interfaces:list[list[int, str, bool, str, int]]
|
|
disks:list[list[str, int, int, str|None]]
|
|
iterations:int
|
|
candle_times:list[int, int]
|
|
cpu:list[float, float, float, float, float]
|
|
memory:list[int, int, int, int, int, float]
|
|
net_use:list[list[list[str, int, int, int, int, int, int]]]
|
|
|
|
hostnames, domain, interfaces, disks, iterations, candle_times, cpu, memory, net_use = json_decode(request.body)
|
|
|
|
@classmethod
|
|
def get_dictionary(cls:type[Self], *items:Sequence[Any|None]) -> dict[str, Any|None]:
|
|
|
|
dictionary:dict[str, Any|None] = {}
|
|
item:Any|None
|
|
|
|
for item in items:
|
|
if isinstance(item, dict):
|
|
dictionary.update(item)
|
|
elif isinstance(item, (list, tuple)):
|
|
|
|
subitem:Any|None
|
|
|
|
for subitem in item:
|
|
dictionary.update(cls.get_dictionary(subitem))
|
|
|
|
return dictionary
|
|
|
|
@classmethod
|
|
def get_keys(cls:type[Self], *items:Sequence[Any|None]) -> list[str]:
|
|
|
|
keys:list[str] = []
|
|
item:Any|None
|
|
|
|
for item in items:
|
|
if isinstance(item, str):
|
|
cls.RE_KEY.match(item) and keys.append(item)
|
|
elif isinstance(item, (list, tuple)):
|
|
|
|
subitem:Any|None
|
|
|
|
for subitem in item:
|
|
keys.extend(cls.get_keys(subitem))
|
|
|
|
return keys
|
|
|
|
@classmethod
|
|
def get_dictionaries(cls:type[Self], *items:Sequence[Any|None]) -> list[dict[str, Any|None]]:
|
|
|
|
dictionaries:list[dict[str, Any|None]] = []
|
|
item:Any|None
|
|
|
|
for item in items:
|
|
if isinstance(item, dict):
|
|
dictionaries.append(item)
|
|
elif isinstance(item, (list, tuple)):
|
|
|
|
subitem:Any|None
|
|
|
|
for subitem in item:
|
|
dictionaries.extend(cls.get_dictionaries(subitem))
|
|
|
|
return dictionaries
|
|
|
|
@classmethod
|
|
def get_value(cls:type[Self],
|
|
keys:str|Sequence[str],
|
|
inputs:dict[str, Any|None]|Sequence[Any|None],
|
|
default:Optional[Any] = None
|
|
) -> Any|None:
|
|
if len(cls.get_keys(keys)):
|
|
|
|
dictionary:dict[str, Any|None]
|
|
|
|
for dictionary in cls.get_dictionaries(inputs):
|
|
|
|
key:str
|
|
|
|
for key in cls.get_keys(keys):
|
|
if key in dictionary:
|
|
return dictionary[key]
|
|
return default
|
|
|
|
@staticmethod
|
|
def get_list(item:Any|None) -> list[Any|None]:
|
|
return item if isinstance(item, (list, tuple)) else [item]
|
|
|
|
@classmethod
|
|
def string_variables(cls:type[Self],
|
|
string:str,
|
|
inputs:dict[str, Any|None]|Sequence[Any|None],
|
|
default:Optional[str] = None
|
|
) -> str:
|
|
|
|
variables:dict[str, Any|None] = cls.get_dictionary(inputs)
|
|
|
|
def callback(matches:REMatch) -> str:
|
|
|
|
key:str = matches.group(1)
|
|
|
|
return (
|
|
str(variables[key]) if key in variables else
|
|
default if default is not None else
|
|
matches.group(0))
|
|
|
|
return cls.RE_STRING_VARIABLE.sub(callback, string)
|
|
|
|
@staticmethod
|
|
def get_texts(*items:list[Any|None]) -> list[str]:
|
|
|
|
texts:list[str] = []
|
|
item:Any|None
|
|
|
|
for item in items:
|
|
if isinstance(item, str):
|
|
texts.append(item)
|
|
elif isinstance(item, (list, tuple)):
|
|
|
|
subitem:Any|None
|
|
|
|
for subitem in item:
|
|
texts.extend(NucelarMonitor.get_texts(subitem))
|
|
|
|
return texts
|
|
|
|
@staticmethod
|
|
def get_action_data(i:int = 0) -> dict[str, str|int]:
|
|
|
|
stack:FrameInfo = get_stack()[i]
|
|
|
|
return {
|
|
"file" : stack.filename,
|
|
"method" : stack.function,
|
|
"line" : stack.lineno
|
|
}
|
|
|
|
@classmethod
|
|
def to_regular_expression(cls:type[Self], string:str) -> str:
|
|
|
|
def callback(matches:REMatch) -> str:
|
|
|
|
character:str = matches.group(0)
|
|
|
|
return "\\" + (
|
|
cls.SPECIAL_REGULAR_EXPRESSION_CHARACTERS[character] if character in cls.SPECIAL_REGULAR_EXPRESSION_CHARACTERS else
|
|
character)
|
|
|
|
return cls.RE_TO_REGULAR_EXPRESSION.sub(callback, string) |