#!/usr/bin/env python3 # -*- coding: utf-8 -*- import datetime from typing import Any, Optional, Sequence, Self, Callable from re import compile as re_compile, Pattern as REPattern, Match as REMatch, IGNORECASE as RE_IGNORE_CASE from pyodbc import connect as pyodbc_connect from socket import socket as Socket, AF_INET as ADDRESS_FAMILY_IPV4, SOCK_STREAM as SOCKET_STREAM, SOL_SOCKET as SOCKET_LAYER, SO_REUSEADDR as SOCKET_REUSE_ADDRESS from inspect import stack as get_stack, FrameInfo from traceback import format_stack as trace_format_stack, extract_tb as extract_traceback from threading import Thread from os.path import exists as path_exists, dirname as directory_name, abspath as absolute_path from json import loads as json_decode, dumps as json_encode from mimetypes import guess_type as get_mime_by_extension class NucelarMonitor: DEFAULT_SETTINGS:dict[str, Any|None] = { "autostart" : True, "print_format" : "[{type}] {yyyy}{mm}{dd} {hh}{ii}{ss} [{line}]{file}({method}): {message}", "exception_format" : " '[{line}]{file}({method})'{lines}\n\n{exception_message}", "print_types" : [ ["unkn", "unknown"], ["info", "information"], ["warn", "warning"], ["erro", "error", "wrong", "failure", "fail", "no"], ["exce", "exception", "except"], [" ok ", "ok", "success", "succeed", "yes"], ["test", "debug"] ], "http_host" : "0.0.0.0", "http_port" : 13000, "http_cache_size" : 1024, "http_maximum_connections" : 5, "http_header_response" : ( "{http_protocol}/{http_version} {http_code} {http_message}\r\n" + "Content-Type: {mime}\r\n" + "Content-Length: {length}\r\n" + "\r\n" ), "http_protocol" : "HTTP", "http_version" : "1.1", "http_code" : 200, "http_message" : "OK", "http_encoder" : "utf-8", "index_files" : ("index.html", "index.htm"), "sql_host" : "127.0.0.1", "sql_port" : 1433, "sql_user" : "sa", "sql_password" : "password", "sql_database" : "NucelarMonitor", "default_controllers" : { "get" : { "/" : "/Public" }, "post" : { "/debian/{key}" : "debian" } }, "default_connections" : {} } DEFAULT_I18N:dict[str, dict[str, str|Sequence[str]]] = { "english" : {} } ROOT:str = directory_name(absolute_path(__file__)) SLASH:str = "/" if "/" in ROOT else "\\\\" SPECIAL_REGULAR_EXPRESSION_CHARACTERS:dict[str, str] = { "\r" : "r", "\n" : "n", "\t" : "t" } RE_KEY:REPattern = re_compile(r'^[a-z_][a-z0-9_]*$', RE_IGNORE_CASE) RE_STRING_VARIABLE:REPattern = re_compile(r'\{([a-z_][a-z0-9_]*)\}', RE_IGNORE_CASE) RE_EXCEPTION:REPattern = re_compile(r'^\s*File "([^"]+)", line ([0-9]+), in ([^\n]+)(.*|[\r\n]*)*$') RE_NEW_LINE:REPattern = re_compile(r'\r\n|[\r\n]') RE_TO_SNAKE:REPattern = re_compile(r'[^a-zA-Z0-9]*([A-Z][A-Z0-9]*)|[^a-z0-9]+') RE_HTTP_REQUEST:REPattern = re_compile(r'^([^\s]+)\s([^\s\?\#]+)(?:\?([^#]+))?(?:\#[^\s]+)?\s([^\/]+)\/([0-9\.]+)$') RE_HEADER_LINE:REPattern = re_compile(r'^([^\:]+)\:(.+)$') RE_HTTP_BLOCKS:REPattern = re_compile(r'((?:(?!(?:(?:\r\n){2}|\n{2}|\r{2}))(?:.|[\r\n]+))+)(?:(?:(?:\r\n){2}|\n{2}|\r{2})((?:.+|[\r\n]+)*))?') RE_LAST_DIRECTORY:REPattern = re_compile(r'^(.*)[\/\\\\][^\/\\\\]*[\/\\\\]?$') RE_SLASHES:REPattern = re_compile(r'[\\\/]+') RE_TO_REGULAR_EXPRESSION:REPattern = re_compile(r'[\(\)\{\}\/\\\.\-\+\*\^\$\?\|\!\<\>\r\n\t]') RE_ROUTE_KEY:REPattern = re_compile(r'\\\{([a-z_][a-z0-9_]*)\\\}', RE_IGNORE_CASE) class Request: def __init__(self:Self, data:bytes, encoder:str = "utf-8") -> None: self.method:str self.request:str self.value_get:str|None self.variables_get:dict[str, str] self.variables_post:dict[str, str] self.protocol:str self.protocol_version:str self.body:str self.variables_uri:dict[str, str] = {} header, body = (lambda header, body:( NucelarMonitor.RE_NEW_LINE.split(str(header).strip()), body ))(*NucelarMonitor.RE_HTTP_BLOCKS.match(data.decode(encoder)).groups()) ( self.method, self.request, self.value_get, self.variables_get, self.protocol, self.protocol_version ) = (lambda method, request, variables, protocol, protocol_version:( str(method).lower(), request, variables, self.parse_variables(variables), protocol, protocol_version ))(*NucelarMonitor.RE_HTTP_REQUEST.match(header[0]).groups()) self.body = body self.variables_post = self.parse_variables(body) def set_uri_variables(self:Self, keys:list[str], matches:REMatch) -> None: i:int value:str for i, value in enumerate(matches.groups()): self.variables_uri[keys[i]] = value def get(self:Self, keys:str|Sequence[str], default:Optional[Any] = None) -> Any|None: return NucelarMonitor.get_value(keys, ( self.variables_uri, self.variables_get, self.variables_post ), default) @classmethod def parse_variables(cls:type[Self], string:Optional[str]) -> dict[str, str]: if not string: return {} variables:dict[str, str] = {} pair:str for pair in string.split("&"): if "=" in pair: key, value = pair.split("=", 1) variables[cls.to_snake(key)] = value else: variables[cls.to_snake(pair)] = "" return variables @staticmethod def to_snake(string:str) -> str: def callback(matches:REMatch) -> str: upper:str|None = matches.group(1) return "_" + upper.lower() if upper else "_" return NucelarMonitor.RE_TO_SNAKE.sub(callback, string).lower() class Response: def __init__(self:Self, nucelar_monitor:type[Self], response:Optional[Any] = None, mime:Optional[str] = None, code:Optional[int] = None, message:Optional[str] = None ) -> None: default_code:int default_message:str self.nucelar_monitor:NucelarMonitor = nucelar_monitor default_code, default_message = self.nucelar_monitor.get_http_default_code() self.body:bytes = b"" self.mime:str self.code:str = code or default_code self.message:str = default_message if message is None else message self.set_data(response, mime) def set_data(self:Self, data:Any|None, mime:Optional[str] = None) -> None: if isinstance(data, bytes): self.body = data self.mime = mime or "application/octet-stream" elif isinstance(data, str): self.body = data.encode(self.nucelar_monitor.get_encoder()) self.mime = mime or "text/plain;charset=" + self.nucelar_monitor.get_encoder() elif isinstance(data, (dict, tuple, list)): self.body = json_encode(data).encode(self.nucelar_monitor.get_encoder()) self.mime = mime or "application/json;charset=" + self.nucelar_monitor.get_encoder() else: self.body = str(data).encode(self.nucelar_monitor.get_encoder()) self.mime = mime or "text/plain;charset=" + self.nucelar_monitor.get_encoder() def get_parameters(self:Self) -> dict[str, Any|None]: return { "http_code" : self.code, "http_message" : self.message, "mime" : self.mime, "length" : len(self.body) } def __init__(self:Self, inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None) -> None: key:str self.__inputs:dict[str, Any|None] = self.get_dictionary(inputs) self.__sentences:dict[str, dict[str, str|Sequence[str]]] = self.DEFAULT_I18N self.__language:str = self.get("language", None, "english") self.__print_format:str = self.get("print_format") self.__print_types:list[list[str]] = self.DEFAULT_SETTINGS["print_types"] self.__exception_format:str = self.get("exception_format") self.__controllers:dict[str, list[tuple[ REPattern, tuple[tuple[str, ...]], Callable[[NucelarMonitor.Request, NucelarMonitor.Response], None]|None, str|None] ]] = {} self.__http_host:str = self.get("http_host") self.__http_port:int = self.get("http_port") self.__http_server:Socket self.__http_buffer_size:int = self.get("http_cache_size") self.__http_header_response:str = self.get("http_header_response") self.__http_protocol:str = self.get("http_protocol") self.__http_version:str = self.get("http_version") self.__http_code:int = self.get("http_code") self.__http_message:str = self.get("http_message") self.__http_encoder:str = self.get("http_encoder") self.__index_files:tuple[str, ...] = tuple(self.get("index_files")) self.__started:bool = False self.__working:bool = False self.__root_paths:list[str] = ["", self.ROOT] self.__commands:list[list[list[str], Callable[[dict[str, Any|None], list[Any|None]], None]]] = [ [["close", "exit", "quit", "bye"], self.__close_command] ] for _ in range(2): self.__root_paths.append(self.RE_LAST_DIRECTORY.sub(r'\1', self.__root_paths[-1])) for key in ("default_controllers", "controllers"): self.add_controllers(self.get(key)) self.get("autostart") and self.start() def start(self:Self) -> None: if self.__started: return self.__started = True self.__http_server = Socket(ADDRESS_FAMILY_IPV4, SOCKET_STREAM) self.__working = True Thread(target = self.__command_listener).start() try: self.__http_server.setsockopt(SOCKET_LAYER, SOCKET_REUSE_ADDRESS, 1) self.__http_server.bind((self.__http_host, self.__http_port)) self.__http_server.listen(self.get("http_maximum_connections")) Thread(target = self.__listen).start() except Exception as exception: self.exception(exception, "http_server_start_exception", { "host" : self.__http_host, "port" : self.__http_port, }) self.close() def close(self:Self) -> None: if not self.__started: return self.__started = False self.__working = False try: self.__http_server.close() except Exception as exception: self.exception(exception, "http_server_close_exception", { "host" : self.__http_host, "port" : self.__http_port, }) def __command_listener(self:Self) -> None: while self.__working: try: parameters:dict[str, Any|None] = {} arguments:list[Any|None] = [] command:str = input().strip().lower() if command: for commands, callback in self.__commands: if command in commands: callback(self.get_dictionary(self.__inputs), [parameters, *arguments]) break except Exception as exception: self.exception(exception, "command_listener_exception", { "command" : command }) def __close_command(self:Self, inputs:dict[str, Any|None], *arguments:list[Any|None]) -> None: self.close() def get_print_type(self:Self, _type:str) -> str: group:list[str] for group in self.__print_types: if _type in group: return group[0].upper() return self.__print_types[0][0].upper() def print(self:Self, _type:str, message:str|Sequence[str], inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None, i:int = 0 ) -> None: date:datetime = datetime.datetime.now() own:dict[str, Any|None] = { "raw_type" : _type, "type" : self.get_print_type(_type), "i18n" : self.get_texts(message), "message" : self.i18n(message, inputs), **self.get_dictionary(inputs), **self.get_action_data(i + 1) } for key in ("year", "month", "day", "hour", "minute", "second"): k:str = "i" if key == "minute" else key[0] own[k] = own[key] = getattr(date, key) own[k + k] = ("00" + str(own[key]))[-2:] own["yyyy"] = own["year"] print(self.string_variables(self.__print_format, own) + (own["end"] if "end" in own else "")) def exception(self:Self, exception:Exception, message:str|Sequence[str], inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None, i:int = 0 ) -> None: lines:list[str] = extract_traceback(exception.__traceback__).format() matches:REMatch = self.RE_EXCEPTION.match(lines[-1]) data:dict[str, Any|None] = { **self.get_dictionary(inputs), "lines" : "", "exception_message" : str(exception), "method" : matches.group(3), "line" : matches.group(2), "file" : matches.group(1) } block:str j:int for j, block in enumerate(trace_format_stack()[:-2] + lines): if block: data["lines"] += "\n " + str(j) + " - " + self.RE_NEW_LINE.split(block.strip())[0] data["end"] = self.string_variables(self.__exception_format, data) message and self.print("exception", message, data, i + 2) @classmethod def fix_path(cls:type[Self], path:str) -> str: return cls.RE_SLASHES.sub(cls.SLASH, path) def get_absolute_path(self:Self, path:str) -> str|None: root:str absolute:str for root in self.__root_paths: absolute = self.fix_path((root + '/' if root else "") + path) if path_exists(absolute): return absolute return None def load_file(self:Self, path:str, mode:str = "r") -> str|bytes|None: absolute_path:str = self.get_absolute_path(path) if absolute_path: with open(absolute_path, mode) as file: return file.read() return None def load_json(self:Self, data:str|dict[str, Any|None]|list[Any|None]) -> dict[str, Any|None]|list[Any|None]|None: if isinstance(data, str): json:list[Any|None]|dict[str, Any|None]|None try: json = json_decode(data) except Exception as exception: self.exception(exception, "load_json_exception", { "data" : data, "length" : len(data) }) if json: return json try: return json_decode(self.load_file(data)) except Exception as exception: self.exception(exception, "load_json_by_file_exception", { "path" : data }) return None elif isinstance(data, (dict, list)): return data return None def get_encoder(self:Self) -> str: return self.__http_encoder def get_http_default_code(self:Self) -> tuple[int, str]: return self.__http_code, self.__http_message def get(self:Self, keys:str|Sequence[str], inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None, default:Optional[Any] = None ) -> Any|None: return self.get_value(keys, (inputs, self.__inputs, self.DEFAULT_SETTINGS), default) def __get_text(self:Self, strings:str|Sequence[str]) -> str: keys:list[str] = self.get_keys(strings := self.get_list(strings)) if len(keys): language:str used:list[str] = [] for language in [self.__language] + list(self.__sentences.keys()): if language not in used and language in self.__sentences: key:str used.append(language) for key in keys: if key in self.__sentences[language]: return self.__sentences[language][key] return strings[0] def i18n(self:Self, strings:str|Sequence[str], inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None ) -> str: return self.string_variables(self.__get_text(strings), inputs) def add_controllers(self:Self, inputs:str|dict[str, dict[str, str|Callable[[Request], None]]]|Sequence[Any|None] ) -> None: if isinstance(inputs, dict): method:str controllers:dict[str, str|Callable[[NucelarMonitor.Request], None]] for method, controllers in inputs.items(): if (method := method.lower()) not in self.__controllers: self.__controllers[method] = [] if isinstance(controllers, dict): request:str target:str|Callable[[NucelarMonitor.Request], None] for request, target in controllers.items(): controller:Callable[[NucelarMonitor.Request, NucelarMonitor.Response], None]|None = None path:str|None = None if isinstance(target, str) and (controller := getattr(self, target, None)) is None: path = self.get_absolute_path(target) if callable(controller) or path is not None: variables:list[str] = [] if path is not None: variables.append("route") def callback(matches:REMatch) -> str: variables.append(matches.group(1)) return r'([^\/]+)' self.__controllers[method].append((re_compile(r'^' + self.RE_ROUTE_KEY.sub(callback, self.to_regular_expression( request[:-1] if request[-1] == "/" else request )) + (r'' if path is None else r'(.*)') + r'\/?$'), tuple(variables), controller, path)) elif isinstance(inputs, (list, tuple)): subinputs:Any|None for subinputs in inputs: self.add_controllers(subinputs) elif isinstance(inputs, str): self.add_controllers(self.load_json(inputs)) def __listen(self:Self) -> None: while self.__working: try: client:Socket address:str port:int client, (address, port) = self.__http_server.accept() Thread( target = self.__listen_client, args = (client, address, port) ).start() except Exception as exception: self.exception(exception, "http_server_listen_exception", { "host" : self.__http_host, "port" : self.__http_port, }) def __listen_client(self:Self, client:Socket, address:str, port:int) -> None: data:bytes = b"" route:str = "" method:str = "UNKN" response:NucelarMonitor.Response = NucelarMonitor.Response(self) try: request:NucelarMonitor.Request variables:tuple[str, ...] controller:Callable[[NucelarMonitor.Request], Any|None]|None path:str|None response_data:Any|None = None pattern:REPattern done:bool = False while True: buffer:bytes = client.recv(self.__http_buffer_size) if not buffer: break data += buffer if len(buffer) != self.__http_buffer_size: break for pattern, variables, controller, path in self.__controllers[ method := (request := self.Request(data, self.__http_encoder)).method ]: matches:REMatch = pattern.match(route := request.request) if matches is not None: request.set_uri_variables(variables, matches) if done := path is not None: for index in self.__index_files: full_path = path + "/" + route + ("" if index == "" else "/" + index) if done := (response_data := self.load_file(full_path, "rb")) is not None: response.set_data( response_data, get_mime_by_extension(full_path)[0] or "application/octet-stream" ) break elif done := controller is not None: controller(request, response) if done: break if not done: response.body = b"