#!/usr/bin/env python3 # -*- coding: utf-8 -*- from typing import Any, Optional, Self, Sequence from Interfaces.Application.AnPInterface import AnPInterface from Models.RequestModel import RequestModel from Utils.Common import Common from threading import Thread from http.server import BaseHTTPRequestHandler, HTTPServer class HTTPDriver: class HTTPRequestHandler(BaseHTTPRequestHandler): def __process(self:Self, method:str) -> None: anp:AnPInterface = self.server.anp request:RequestModel = RequestModel() request.method = method request.path = self.path request.headers = dict(self.headers) request.client_host = self.client_address[0] request.client_port = self.client_address[1] anp.routes.go(method, self.path, request) self.send_response(request.response_code) self.send_header("Content-Type", request.response_mime) self.send_header("Content-Length", str(len(request.response))) self.end_headers() self.wfile.write( request.response.encode() if isinstance(request.response, str) else request.response) def do_GET(self:Self) -> None: self.__process("get") def do_POST(self:Self) -> None: self.__process("post") def do_PUT(self:Self) -> None: self.__process("put") def do_DELETE(self:Self) -> None: self.__process("delete") def do_PATCH(self:Self) -> None: self.__process("patch") def do_HEAD(self:Self) -> None: self.__process("head") def do_OPTIONS(self:Self) -> None: self.__process("options") def do_CONNECT(self:Self) -> None: self.__process("connect") def do_TRACE(self:Self) -> None: self.__process("trace") def do_COPY(self:Self) -> None: self.__process("copy") def do_MOVE(self:Self) -> None: self.__process("move") def do_PROPFIND(self:Self) -> None: self.__process("propfind") def do_PROPPATCH(self:Self) -> None: self.__process("proppatch") def do_LOCK(self:Self) -> None: self.__process("lock") def do_UNLOCK(self:Self) -> None: self.__process("unlock") def do_REPORT(self:Self) -> None: self.__process("report") def do_MKCOL(self:Self) -> None: self.__process("mkcol") def __init__(self:Self, anp:AnPInterface, inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None ) -> None: self.anp:AnPInterface = anp self.__inputs:dict[str, Any|None] = Common.get_dictionary(inputs) self.__port:int = 8000 self.__host:str = "" self.__server:HTTPServer|None = None self.__thread:Thread|None = None self.update() def start(self:Self) -> None: if self.__server is None: self.__thread = Thread(target = self.__run_service) self.__thread.start() def close(self:Self) -> None: if self.__server is not None: self.__server.shutdown() self.__server = None def update(self:Self) -> None: self.close() self.__port = self.anp.settings.get(("http_server_port", "http_port", "port"), self.__inputs, self.__port) self.__host = self.anp.settings.get(("http_server_host", "http_host", "host"), self.__inputs, self.__host) self.start() def reset(self:Self) -> None: self.__port = 8000 self.__host = "" self.update() def __run_service(self:Self) -> None: self.__server = HTTPServer((self.__host, self.__port), self.HTTPRequestHandler) self.__server.anp = self.anp self.__server.serve_forever()