#!/usr/bin/env python3 # -*- coding: utf-8 -*- from typing import Any, Optional, Self, Sequence from Interfaces.Application.AnPInterface import AnPInterface from Models.RequestModel import RequestModel from Utils.Common import Common from threading import Thread from http.server import BaseHTTPRequestHandler, HTTPServer from Abstracts.HTTPServersAbstract import HTTPServersAbstract class HTTPDriver(HTTPServersAbstract): class HTTPRequestHandler(BaseHTTPRequestHandler): def __process(self:Self, method:str) -> None: anp:AnPInterface = self.server.anp request:RequestModel = RequestModel() request.method = method request.path = self.path request.headers = dict(self.headers) request.client_host = self.client_address[0] request.client_port = self.client_address[1] anp.routes.go(method, self.path, request) self.send_response(request.response_code) self.send_header("Content-Type", request.response_mime) self.send_header("Content-Length", str(len(request.response))) self.end_headers() self.wfile.write( request.response.encode() if isinstance(request.response, str) else request.response) def do_GET(self:Self) -> None: self.__process("get") def do_POST(self:Self) -> None: self.__process("post") def do_PUT(self:Self) -> None: self.__process("put") def do_DELETE(self:Self) -> None: self.__process("delete") def do_PATCH(self:Self) -> None: self.__process("patch") def do_HEAD(self:Self) -> None: self.__process("head") def do_OPTIONS(self:Self) -> None: self.__process("options") def do_CONNECT(self:Self) -> None: self.__process("connect") def do_TRACE(self:Self) -> None: self.__process("trace") def do_COPY(self:Self) -> None: self.__process("copy") def do_MOVE(self:Self) -> None: self.__process("move") def do_PROPFIND(self:Self) -> None: self.__process("propfind") def do_PROPPATCH(self:Self) -> None: self.__process("proppatch") def do_LOCK(self:Self) -> None: self.__process("lock") def do_UNLOCK(self:Self) -> None: self.__process("unlock") def do_REPORT(self:Self) -> None: self.__process("report") def do_MKCOL(self:Self) -> None: self.__process("mkcol") def __init__(self:Self, anp:AnPInterface, inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None ) -> None: self.anp:AnPInterface = anp self.__inputs:dict[str, Any|None] = Common.get_dictionary(inputs) super().__init__(inputs) self.__server:HTTPServer|None = None self.__thread:Thread|None = None self.update() def start(self:Self) -> None: if self.__server is None: self.__thread = Thread(target = self.__run_service) self.__thread.start() def close(self:Self) -> None: if self.__server is not None: self.__server.shutdown() self.__server = None def __run_service(self:Self) -> None: self.__server = HTTPServer((self.__host, self.__port), self.HTTPRequestHandler) self.__server.anp = self.anp self.__server.serve_forever()