#!/usr/bin/env python3 # -*- coding: utf-8 -*- from typing import Any, Optional, Self, Sequence from threading import Thread from socket import ( socket as Socket, AF_INET as ADDRESS_FAMILY_IPV4, SOCK_STREAM as SOCKET_STREAM, SOL_SOCKET as SOCKET_LAYER, SO_REUSEADDR as SOCKET_REUSE_ADDRESS ) import datetime from Interfaces.Application.AnPInterface import AnPInterface from Abstracts.HTTPServersAbstract import HTTPServersAbstract from Models.RequestModel import RequestModel from Utils.Common import Common from Utils.Checks import Check from Utils.Patterns import RE class HTTPSocketServerDriver(HTTPServersAbstract): def __get(self:Self, keys:str|Sequence[str], default:Any|None = None) -> Any|None: real_keys:list[str] = [] key:str for key in Common.get_keys(keys): header:str for header in ( "http_socket_server_driver", "http_server_driver", "http_server", "http", "" ): final_key:str = (header + "_" if header else "") + key if final_key not in keys: real_keys.append(final_key) return self.anp.settings.get(real_keys, self._inputs, default) def __init__(self:Self, anp:AnPInterface, key:str, inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None ) -> None: self.__server:Socket|None = None self.__thread:Thread|None = None self.__working:bool = False super().__init__(anp, key, inputs) self.__maximum_connections:int = self.__get("maximum_connections", 5) self.__cache_size:int = self.__get("cache_size", 1024) self.__response_header:str = self.__get("response_header") def __listen(self:Self) -> None: while self.__working: try: client:Socket address:str port:int data:bytes = b"" header:str body:str header_lines:list[str] line:str request:RequestModel = RequestModel() get_block:str hash_block:str response:bytes response_body:bytes client, (address, port) = self.__server.accept() while True: buffer:bytes = client.recv(self.__cache_size) data += buffer if len(buffer) < self.__cache_size: break header, body = RE.DOUBLE_NEW_LINE.split(data.decode("utf-8", errors = "ignore"), 1) header_lines = RE.NEW_LINE.split(header) ( request.method, request.path, get_block, hash_block, request.protocol, request.protocol_version ) = RE.HTTP_REQUEST.match(header_lines[0]).groups() for line in header_lines[1:]: if RE.HTTP_HEADER_PARAMETER.match(line): key:str value:str key, value = RE.HTTP_HEADER_PARAMETER.match(line).groups() key = key.strip().lower() value = value.strip() if key in request.request_headers: if Check.is_array(request.request_headers[key]): request.request_headers[key].append(value) else: request.request_headers[key] = [request.request_headers[key], value] else: request.request_headers[key] = value request.get_variables = self.get_variables_from(get_block) request.post_variables = self.get_variables_from(body) request.cookies = self.load_cookies(request.request_headers.get("cookie")) request.hash_variables = self.get_variables_from(hash_block) self.anp.routes.go([self.key], request.method, request.path, request) response_body = ( request.response if Check.is_binary(request.response) else request.response.encode() if Check.is_string(request.response) else str(request.response).encode()) response = Common.string_variables(self.__response_header, { "content_length" : len(response_body), "protocol" : request.get("protocol", request.protocol or self.protocol), "protocol_version" : request.get("protocol_version", request.protocol_version or self.protocol_version), "http_code" : request.response_code, "http_message" : self.get_http_message(request.response_code), "date" : self.format_datetime(datetime.datetime.now(datetime.timezone.utc)), "last_modified" : self.format_datetime(request.last_modified or request.get("last_modified") or datetime.datetime.now(datetime.timezone.utc)), "accept_range" : request.get("accept_range", self.accept_range), "response_length" : len(response_body), "access_control_max_age" : request.get("access_control_max_age", self.access_control_max_age), "keep_alive_maximum" : request.get("keep_alive_maximum", self.keep_alive_maximum), "keep_alive_timeout" : request.get("keep_alive_timeout", self.keep_alive_timeout), "cors" : request.get("cors", self.cors), "mime" : request.response_mime or request.get("mime", self.mime), "charset" : request.response_charset or request.get("charset", self.charset) }).encode() + response_body client.sendall(response) client.close() except Exception as exception: self.anp.exception(exception, "anp_http_socket_server_driver_run_exception", { "key" : self.key, "host" : self.host, "port" : self.port }) def start(self:Self) -> None: self.__working = True self.__server = Socket(ADDRESS_FAMILY_IPV4, SOCKET_STREAM) try: self.__server.setsockopt(SOCKET_LAYER, SOCKET_REUSE_ADDRESS, 1) self.__server.bind((self.host, self.port)) self.__server.listen(self.__maximum_connections) self.__thread = Thread(target = self.__listen) self.__thread.start() except Exception as exception: self.anp.exception(exception, "anp_http_socket_server_driver_start_exception", { "key" : self.key, "host" : self.host, "port" : self.port }) def close(self:Self) -> None: self.__working = False if self.__server: try: self.__server.close() except Exception as _: pass self.__server = None if self.__thread: try: self.__thread.join() except Exception as _: pass self.__thread = None