NucelarMonitor/Python/NucelarMonitor.py
2026-03-04 15:05:57 +01:00

768 lines
27 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import datetime
from typing import Any, Optional, Sequence, Self, Callable
from re import compile as re_compile, Pattern as REPattern, Match as REMatch, IGNORECASE as RE_IGNORE_CASE
from pyodbc import connect as pyodbc_connect
from socket import socket as Socket, AF_INET as ADDRESS_FAMILY_IPV4, SOCK_STREAM as SOCKET_STREAM, SOL_SOCKET as SOCKET_LAYER, SO_REUSEADDR as SOCKET_REUSE_ADDRESS
from inspect import stack as get_stack, FrameInfo
from traceback import format_stack as trace_format_stack, extract_tb as extract_traceback
from threading import Thread
from os.path import exists as path_exists, dirname as directory_name, abspath as absolute_path
from json import loads as json_decode, dumps as json_encode
from mimetypes import guess_type as get_mime_by_extension
class NucelarMonitor:
DEFAULT_SETTINGS:dict[str, Any|None] = {
"autostart" : True,
"print_format" : "[{type}] {yyyy}{mm}{dd} {hh}{ii}{ss} [{line}]{file}({method}): {message}",
"exception_format" : " '[{line}]{file}({method})'{lines}\n\n{exception_message}",
"print_types" : [
["unkn", "unknown"],
["info", "information"],
["warn", "warning"],
["erro", "error", "wrong", "failure", "fail", "no"],
["exce", "exception", "except"],
[" ok ", "ok", "success", "succeed", "yes"],
["test", "debug"]
],
"http_host" : "0.0.0.0",
"http_port" : 13000,
"http_cache_size" : 1024,
"http_maximum_connections" : 5,
"http_header_response" : (
"{http_protocol}/{http_version} {http_code} {http_message}\r\n" +
"Content-Type: {mime}\r\n" +
"Content-Length: {length}\r\n" +
"\r\n"
),
"http_protocol" : "HTTP",
"http_version" : "1.1",
"http_code" : 200,
"http_message" : "OK",
"http_encoder" : "utf-8",
"index_files" : ("index.html", "index.htm"),
"sql_host" : "127.0.0.1",
"sql_port" : 1433,
"sql_user" : "sa",
"sql_password" : "password",
"sql_database" : "NucelarMonitor",
"default_controllers" : {
"get" : {
"/" : "/Public"
},
"post" : {
"/debian/{key}" : "debian"
}
},
"default_connections" : {}
}
DEFAULT_I18N:dict[str, dict[str, str|Sequence[str]]] = {
"english" : {}
}
ROOT:str = directory_name(absolute_path(__file__))
SLASH:str = "/" if "/" in ROOT else "\\\\"
SPECIAL_REGULAR_EXPRESSION_CHARACTERS:dict[str, str] = {
"\r" : "r",
"\n" : "n",
"\t" : "t"
}
RE_KEY:REPattern = re_compile(r'^[a-z_][a-z0-9_]*$', RE_IGNORE_CASE)
RE_STRING_VARIABLE:REPattern = re_compile(r'\{([a-z_][a-z0-9_]*)\}', RE_IGNORE_CASE)
RE_EXCEPTION:REPattern = re_compile(r'^\s*File "([^"]+)", line ([0-9]+), in ([^\n]+)(.*|[\r\n]*)*$')
RE_NEW_LINE:REPattern = re_compile(r'\r\n|[\r\n]')
RE_TO_SNAKE:REPattern = re_compile(r'[^a-zA-Z0-9]*([A-Z][A-Z0-9]*)|[^a-z0-9]+')
RE_HTTP_REQUEST:REPattern = re_compile(r'^([^\s]+)\s([^\s\?\#]+)(?:\?([^#]+))?(?:\#[^\s]+)?\s([^\/]+)\/([0-9\.]+)$')
RE_HEADER_LINE:REPattern = re_compile(r'^([^\:]+)\:(.+)$')
RE_HTTP_BLOCKS:REPattern = re_compile(r'((?:(?!(?:(?:\r\n){2}|\n{2}|\r{2}))(?:.|[\r\n]+))+)(?:(?:(?:\r\n){2}|\n{2}|\r{2})((?:.+|[\r\n]+)*))?')
RE_LAST_DIRECTORY:REPattern = re_compile(r'^(.*)[\/\\\\][^\/\\\\]*[\/\\\\]?$')
RE_SLASHES:REPattern = re_compile(r'[\\\/]+')
RE_TO_REGULAR_EXPRESSION:REPattern = re_compile(r'[\(\)\{\}\/\\\.\-\+\*\^\$\?\|\!\<\>\r\n\t]')
RE_ROUTE_KEY:REPattern = re_compile(r'\\\{([a-z_][a-z0-9_]*)\\\}', RE_IGNORE_CASE)
class Request:
def __init__(self:Self, data:bytes, encoder:str = "utf-8") -> None:
self.method:str
self.request:str
self.value_get:str|None
self.variables_get:dict[str, str]
self.variables_post:dict[str, str]
self.protocol:str
self.protocol_version:str
self.body:str
self.variables_uri:dict[str, str] = {}
header, body = (lambda header, body:(
NucelarMonitor.RE_NEW_LINE.split(str(header).strip()), body
))(*NucelarMonitor.RE_HTTP_BLOCKS.match(data.decode(encoder)).groups())
(
self.method,
self.request,
self.value_get,
self.variables_get,
self.protocol,
self.protocol_version
) = (lambda method, request, variables, protocol, protocol_version:(
str(method).lower(),
request,
variables,
self.parse_variables(variables),
protocol,
protocol_version
))(*NucelarMonitor.RE_HTTP_REQUEST.match(header[0]).groups())
self.body = body
self.variables_post = self.parse_variables(body)
def set_uri_variables(self:Self, keys:list[str], matches:REMatch) -> None:
i:int
value:str
for i, value in enumerate(matches.groups()):
self.variables_uri[keys[i]] = value
def get(self:Self, keys:str|Sequence[str], default:Optional[Any] = None) -> Any|None:
return NucelarMonitor.get_value(keys, (
self.variables_uri, self.variables_get, self.variables_post
), default)
@classmethod
def parse_variables(cls:type[Self], string:Optional[str]) -> dict[str, str]:
if not string:
return {}
variables:dict[str, str] = {}
pair:str
for pair in string.split("&"):
if "=" in pair:
key, value = pair.split("=", 1)
variables[cls.to_snake(key)] = value
else:
variables[cls.to_snake(pair)] = ""
return variables
@staticmethod
def to_snake(string:str) -> str:
def callback(matches:REMatch) -> str:
upper:str|None = matches.group(1)
return "_" + upper.lower() if upper else "_"
return NucelarMonitor.RE_TO_SNAKE.sub(callback, string).lower()
class Response:
def __init__(self:Self,
nucelar_monitor:type[Self],
response:Optional[Any] = None,
mime:Optional[str] = None,
code:Optional[int] = None,
message:Optional[str] = None
) -> None:
default_code:int
default_message:str
self.nucelar_monitor:NucelarMonitor = nucelar_monitor
default_code, default_message = self.nucelar_monitor.get_http_default_code()
self.body:bytes = b""
self.mime:str
self.code:str = code or default_code
self.message:str = default_message if message is None else message
self.set_data(response, mime)
def set_data(self:Self, data:Any|None, mime:Optional[str] = None) -> None:
if isinstance(data, bytes):
self.body = data
self.mime = mime or "application/octet-stream"
elif isinstance(data, str):
self.body = data.encode(self.nucelar_monitor.get_encoder())
self.mime = mime or "text/plain;charset=" + self.nucelar_monitor.get_encoder()
elif isinstance(data, (dict, tuple, list)):
self.body = json_encode(data).encode(self.nucelar_monitor.get_encoder())
self.mime = mime or "application/json;charset=" + self.nucelar_monitor.get_encoder()
else:
self.body = str(data).encode(self.nucelar_monitor.get_encoder())
self.mime = mime or "text/plain;charset=" + self.nucelar_monitor.get_encoder()
def get_parameters(self:Self) -> dict[str, Any|None]:
return {
"http_code" : self.code,
"http_message" : self.message,
"mime" : self.mime,
"length" : len(self.body)
}
def __init__(self:Self, inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None) -> None:
key:str
self.__inputs:dict[str, Any|None] = self.get_dictionary(inputs)
self.__sentences:dict[str, dict[str, str|Sequence[str]]] = self.DEFAULT_I18N
self.__language:str = self.get("language", None, "english")
self.__print_format:str = self.get("print_format")
self.__print_types:list[list[str]] = self.DEFAULT_SETTINGS["print_types"]
self.__exception_format:str = self.get("exception_format")
self.__controllers:dict[str, list[tuple[
REPattern,
tuple[tuple[str, ...]],
Callable[[NucelarMonitor.Request, NucelarMonitor.Response], None]|None,
str|None]
]] = {}
self.__http_host:str = self.get("http_host")
self.__http_port:int = self.get("http_port")
self.__http_server:Socket
self.__http_buffer_size:int = self.get("http_cache_size")
self.__http_header_response:str = self.get("http_header_response")
self.__http_protocol:str = self.get("http_protocol")
self.__http_version:str = self.get("http_version")
self.__http_code:int = self.get("http_code")
self.__http_message:str = self.get("http_message")
self.__http_encoder:str = self.get("http_encoder")
self.__index_files:tuple[str, ...] = tuple(self.get("index_files"))
self.__started:bool = False
self.__working:bool = False
self.__root_paths:list[str] = ["", self.ROOT]
self.__commands:list[list[list[str], Callable[[dict[str, Any|None], list[Any|None]], None]]] = [
[["close", "exit", "quit", "bye"], self.__close_command]
]
for _ in range(2):
self.__root_paths.append(self.RE_LAST_DIRECTORY.sub(r'\1', self.__root_paths[-1]))
for key in ("default_controllers", "controllers"):
self.add_controllers(self.get(key))
self.get("autostart") and self.start()
def start(self:Self) -> None:
if self.__started:
return
self.__started = True
self.__http_server = Socket(ADDRESS_FAMILY_IPV4, SOCKET_STREAM)
self.__working = True
Thread(target = self.__command_listener).start()
try:
self.__http_server.setsockopt(SOCKET_LAYER, SOCKET_REUSE_ADDRESS, 1)
self.__http_server.bind((self.__http_host, self.__http_port))
self.__http_server.listen(self.get("http_maximum_connections"))
Thread(target = self.__listen).start()
except Exception as exception:
self.exception(exception, "http_server_start_exception", {
"host" : self.__http_host,
"port" : self.__http_port,
})
self.close()
def close(self:Self) -> None:
if not self.__started:
return
self.__started = False
self.__working = False
try:
self.__http_server.close()
except Exception as exception:
self.exception(exception, "http_server_close_exception", {
"host" : self.__http_host,
"port" : self.__http_port,
})
def __command_listener(self:Self) -> None:
while self.__working:
try:
parameters:dict[str, Any|None] = {}
arguments:list[Any|None] = []
command:str = input().strip().lower()
if command:
for commands, callback in self.__commands:
if command in commands:
callback(self.get_dictionary(self.__inputs), [parameters, *arguments])
break
except Exception as exception:
self.exception(exception, "command_listener_exception", {
"command" : command
})
def __close_command(self:Self, inputs:dict[str, Any|None], *arguments:list[Any|None]) -> None:
self.close()
def get_print_type(self:Self, _type:str) -> str:
group:list[str]
for group in self.__print_types:
if _type in group:
return group[0].upper()
return self.__print_types[0][0].upper()
def print(self:Self,
_type:str,
message:str|Sequence[str],
inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None,
i:int = 0
) -> None:
date:datetime = datetime.datetime.now()
own:dict[str, Any|None] = {
"raw_type" : _type,
"type" : self.get_print_type(_type),
"i18n" : self.get_texts(message),
"message" : self.i18n(message, inputs),
**self.get_dictionary(inputs),
**self.get_action_data(i + 1)
}
for key in ("year", "month", "day", "hour", "minute", "second"):
k:str = "i" if key == "minute" else key[0]
own[k] = own[key] = getattr(date, key)
own[k + k] = ("00" + str(own[key]))[-2:]
own["yyyy"] = own["year"]
print(self.string_variables(self.__print_format, own) + (own["end"] if "end" in own else ""))
def exception(self:Self,
exception:Exception,
message:str|Sequence[str],
inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None,
i:int = 0
) -> None:
lines:list[str] = extract_traceback(exception.__traceback__).format()
matches:REMatch = self.RE_EXCEPTION.match(lines[-1])
data:dict[str, Any|None] = {
**self.get_dictionary(inputs),
"lines" : "",
"exception_message" : str(exception),
"method" : matches.group(3),
"line" : matches.group(2),
"file" : matches.group(1)
}
block:str
j:int
for j, block in enumerate(trace_format_stack()[:-2] + lines):
if block:
data["lines"] += "\n " + str(j) + " - " + self.RE_NEW_LINE.split(block.strip())[0]
data["end"] = self.string_variables(self.__exception_format, data)
message and self.print("exception", message, data, i + 2)
@classmethod
def fix_path(cls:type[Self], path:str) -> str:
return cls.RE_SLASHES.sub(cls.SLASH, path)
def get_absolute_path(self:Self, path:str) -> str|None:
root:str
absolute:str
for root in self.__root_paths:
absolute = self.fix_path((root + '/' if root else "") + path)
if path_exists(absolute):
return absolute
return None
def load_file(self:Self, path:str, mode:str = "r") -> str|bytes|None:
absolute_path:str = self.get_absolute_path(path)
if absolute_path:
with open(absolute_path, mode) as file:
return file.read()
return None
def load_json(self:Self, data:str|dict[str, Any|None]|list[Any|None]) -> dict[str, Any|None]|list[Any|None]|None:
if isinstance(data, str):
json:list[Any|None]|dict[str, Any|None]|None
try:
json = json_decode(data)
except Exception as exception:
self.exception(exception, "load_json_exception", {
"data" : data,
"length" : len(data)
})
if json:
return json
try:
return json_decode(self.load_file(data))
except Exception as exception:
self.exception(exception, "load_json_by_file_exception", {
"path" : data
})
return None
elif isinstance(data, (dict, list)):
return data
return None
def get_encoder(self:Self) -> str:
return self.__http_encoder
def get_http_default_code(self:Self) -> tuple[int, str]:
return self.__http_code, self.__http_message
def get(self:Self,
keys:str|Sequence[str],
inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None,
default:Optional[Any] = None
) -> Any|None:
return self.get_value(keys, (inputs, self.__inputs, self.DEFAULT_SETTINGS), default)
def __get_text(self:Self, strings:str|Sequence[str]) -> str:
keys:list[str] = self.get_keys(strings := self.get_list(strings))
if len(keys):
language:str
used:list[str] = []
for language in [self.__language] + list(self.__sentences.keys()):
if language not in used and language in self.__sentences:
key:str
used.append(language)
for key in keys:
if key in self.__sentences[language]:
return self.__sentences[language][key]
return strings[0]
def i18n(self:Self,
strings:str|Sequence[str],
inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None
) -> str:
return self.string_variables(self.__get_text(strings), inputs)
def add_controllers(self:Self,
inputs:str|dict[str, dict[str, str|Callable[[Request], None]]]|Sequence[Any|None]
) -> None:
if isinstance(inputs, dict):
method:str
controllers:dict[str, str|Callable[[NucelarMonitor.Request], None]]
for method, controllers in inputs.items():
if (method := method.lower()) not in self.__controllers:
self.__controllers[method] = []
if isinstance(controllers, dict):
request:str
target:str|Callable[[NucelarMonitor.Request], None]
for request, target in controllers.items():
controller:Callable[[NucelarMonitor.Request, NucelarMonitor.Response], None]|None = None
path:str|None = None
if isinstance(target, str) and (controller := getattr(self, target, None)) is None:
path = self.get_absolute_path(target)
if callable(controller) or path is not None:
variables:list[str] = []
if path is not None:
variables.append("route")
def callback(matches:REMatch) -> str:
variables.append(matches.group(1))
return r'([^\/]+)'
self.__controllers[method].append((re_compile(r'^' + self.RE_ROUTE_KEY.sub(callback, self.to_regular_expression(
request[:-1] if request[-1] == "/" else request
)) + (r'' if path is None else r'(.*)') + r'\/?$'), tuple(variables), controller, path))
elif isinstance(inputs, (list, tuple)):
subinputs:Any|None
for subinputs in inputs:
self.add_controllers(subinputs)
elif isinstance(inputs, str):
self.add_controllers(self.load_json(inputs))
def __listen(self:Self) -> None:
while self.__working:
try:
client:Socket
address:str
port:int
client, (address, port) = self.__http_server.accept()
Thread(
target = self.__listen_client,
args = (client, address, port)
).start()
except Exception as exception:
self.exception(exception, "http_server_listen_exception", {
"host" : self.__http_host,
"port" : self.__http_port,
})
def __listen_client(self:Self, client:Socket, address:str, port:int) -> None:
data:bytes = b""
route:str = ""
method:str = "UNKN"
response:NucelarMonitor.Response = NucelarMonitor.Response(self)
try:
request:NucelarMonitor.Request
variables:tuple[str, ...]
controller:Callable[[NucelarMonitor.Request], Any|None]|None
path:str|None
response_data:Any|None = None
pattern:REPattern
done:bool = False
while True:
buffer:bytes = client.recv(self.__http_buffer_size)
if not buffer:
break
data += buffer
if len(buffer) != self.__http_buffer_size:
break
for pattern, variables, controller, path in self.__controllers[
method := (request := self.Request(data, self.__http_encoder)).method
]:
matches:REMatch = pattern.match(route := request.request)
if matches is not None:
request.set_uri_variables(variables, matches)
if done := path is not None:
for index in self.__index_files:
full_path = path + "/" + route + ("" if index == "" else "/" + index)
if done := (response_data := self.load_file(full_path, "rb")) is not None:
response.set_data(
response_data,
get_mime_by_extension(full_path)[0] or "application/octet-stream"
)
break
elif done := controller is not None:
controller(request, response)
if done:
break
if not done:
response.body = b"<h1>Not Found</h1>"
response.mime = "text/html;charset=" + self.__http_encoder
response.code = "404"
response.message = "Not Found"
client.sendall(self.string_variables(self.__http_header_response, {
"http_protocol" : self.__http_protocol,
"http_version" : self.__http_version,
**response.get_parameters()
}).encode(self.__http_encoder) + response.body)
client.close()
except Exception as exception:
self.exception(exception, "http_server_client_exception", {
"host" : self.__http_host,
"port" : self.__http_port,
"client_address" : address,
"client_port" : port,
"length" : len(data),
"method" : method,
"route" : route,
"response_length" : len(response.body)
})
def debian(self:Self, request:Request, response:Response) -> None:
key:str = request.get("key")
hostnames:list[str]
domain:str|None
interfaces:list[list[int, str, bool, str, int]]
disks:list[list[str, int, int, str|None]]
iterations:int
candle_times:list[int, int]
cpu:list[float, float, float, float, float]
memory:list[int, int, int, int, int, float]
net_use:list[list[list[str, int, int, int, int, int, int]]]
hostnames, domain, interfaces, disks, iterations, candle_times, cpu, memory, net_use = json_decode(request.body)
@classmethod
def get_dictionary(cls:type[Self], *items:Sequence[Any|None]) -> dict[str, Any|None]:
dictionary:dict[str, Any|None] = {}
item:Any|None
for item in items:
if isinstance(item, dict):
dictionary.update(item)
elif isinstance(item, (list, tuple)):
subitem:Any|None
for subitem in item:
dictionary.update(cls.get_dictionary(subitem))
return dictionary
@classmethod
def get_keys(cls:type[Self], *items:Sequence[Any|None]) -> list[str]:
keys:list[str] = []
item:Any|None
for item in items:
if isinstance(item, str):
cls.RE_KEY.match(item) and keys.append(item)
elif isinstance(item, (list, tuple)):
subitem:Any|None
for subitem in item:
keys.extend(cls.get_keys(subitem))
return keys
@classmethod
def get_dictionaries(cls:type[Self], *items:Sequence[Any|None]) -> list[dict[str, Any|None]]:
dictionaries:list[dict[str, Any|None]] = []
item:Any|None
for item in items:
if isinstance(item, dict):
dictionaries.append(item)
elif isinstance(item, (list, tuple)):
subitem:Any|None
for subitem in item:
dictionaries.extend(cls.get_dictionaries(subitem))
return dictionaries
@classmethod
def get_value(cls:type[Self],
keys:str|Sequence[str],
inputs:dict[str, Any|None]|Sequence[Any|None],
default:Optional[Any] = None
) -> Any|None:
if len(cls.get_keys(keys)):
dictionary:dict[str, Any|None]
for dictionary in cls.get_dictionaries(inputs):
key:str
for key in cls.get_keys(keys):
if key in dictionary:
return dictionary[key]
return default
@staticmethod
def get_list(item:Any|None) -> list[Any|None]:
return item if isinstance(item, (list, tuple)) else [item]
@classmethod
def string_variables(cls:type[Self],
string:str,
inputs:dict[str, Any|None]|Sequence[Any|None],
default:Optional[str] = None
) -> str:
variables:dict[str, Any|None] = cls.get_dictionary(inputs)
def callback(matches:REMatch) -> str:
key:str = matches.group(1)
return (
str(variables[key]) if key in variables else
default if default is not None else
matches.group(0))
return cls.RE_STRING_VARIABLE.sub(callback, string)
@staticmethod
def get_texts(*items:list[Any|None]) -> list[str]:
texts:list[str] = []
item:Any|None
for item in items:
if isinstance(item, str):
texts.append(item)
elif isinstance(item, (list, tuple)):
subitem:Any|None
for subitem in item:
texts.extend(NucelarMonitor.get_texts(subitem))
return texts
@staticmethod
def get_action_data(i:int = 0) -> dict[str, str|int]:
stack:FrameInfo = get_stack()[i]
return {
"file" : stack.filename,
"method" : stack.function,
"line" : stack.lineno
}
@classmethod
def to_regular_expression(cls:type[Self], string:str) -> str:
def callback(matches:REMatch) -> str:
character:str = matches.group(0)
return "\\" + (
cls.SPECIAL_REGULAR_EXPRESSION_CHARACTERS[character] if character in cls.SPECIAL_REGULAR_EXPRESSION_CHARACTERS else
character)
return cls.RE_TO_REGULAR_EXPRESSION.sub(callback, string)