#!/usr/bin/env python3 # -*- coding: utf-8 -*- from typing import Any, Self, Optional, Sequence from threading import Thread from socket import socket as Socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR, SHUT_RDWR from Interfaces.Application.NucelarMonitorInterface import NucelarMonitorInterface from Abstracts.WebServerAbstract import WebServerAbstract from Models.ResponseModel import ResponseModel from Models.RequestModel import RequestModel from Utils.Utils import Utils class WebServerDriver(WebServerAbstract): def __init__(self:Self, nucelar_monitor:NucelarMonitorInterface, inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None ) -> None: super().__init__(nucelar_monitor, inputs) self.__server:Socket self.__buffer_size:int = self.nucelar_monitor.settings.get("cache_size", inputs, 4096) self.__started:bool = False self.__thread:Thread|None = None self.__clients:list[Socket] = [] def start(self:Self) -> bool: if self.__started: return False self.__started = True self.__server = Socket(AF_INET, SOCK_STREAM) self.__server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) try: self.__server.bind((self._host, self._port)) self.__server.listen() self.__thread = Thread(target = self.__listen) self.__thread.start() return True except Exception as exception: self.nucelar_monitor.exception(exception, "nucelar_monitor_web_socket_driver_start_exception", { "host" : self._host, "port" : self._port, }) self.close() return False def close(self:Self) -> bool: if not self.__started: return False self.__started = False i:int for i in range(len(self.__clients)): self.__close_client(i, True) return True def stop(self:Self) -> bool: return self.close() def __close_client(self:Self, i:int, send_close:bool = False) -> None: if self.__clients[i] is None: return client:Socket = self.__clients[i] self.__clients[i] = None try: if client: client.shutdown(SHUT_RDWR) client.close() except Exception as _: pass def __listen_client(self:Self, client:Socket, address:str, port:int) -> None: data:bytes = b"" route:str = "" method:str = "UNKN" response:ResponseModel = ResponseModel(self._encoder, {}, self._code, self._message) i:int = 0 while i < len(self.__clients): if self.__clients[i] is None: break i += 1 if i == len(self.__clients): self.__clients.append(client) else: self.__clients[i] = client try: request:RequestModel while True: buffer:bytes = client.recv(self.__buffer_size) if not buffer: break data += buffer if len(buffer) != self.__buffer_size: break if data: request = RequestModel(data, self._index_files, self._encoder) self.nucelar_monitor.routes.get(request, response) client.sendall(Utils.string_variables(self._header_response, { "protocol" : self._protocol, "version" : self._version, **response.get_parameters() }).encode(self._encoder) + response.body) except Exception as exception: self.nucelar_monitor.exception(exception, "server_client_exception", { "host" : self._host, "port" : self._port, "client_address" : address, "client_port" : port, "length" : len(data), "method" : method, "route" : route, "response_length" : len(response.body) }) finally: self.__close_client(i) def __listen(self:Self) -> None: while self.nucelar_monitor.is_working(): try: client:Socket address:str port:int client, (address, port) = self.__server.accept() Thread( target = self.__listen_client, args = (client, address, port) ).start() except Exception as exception: self.nucelar_monitor.exception(exception, "server_listen_exception", { "host" : self._host, "port" : self._port, }) try: self.__server.close() except Exception as exception: self.nucelar_monitor.exception(exception, "nucelar_monitor_web_socket_driver_close_exception", { "host" : self._host, "port" : self._port, })