OpoTests/Python/Drivers/HTTPSocketDriver.py

123 lines
4.8 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from threading import Thread
from typing import Any, Self, Optional
from Interfaces.OpoTestsInterface import OpoTestsInterface
from Models.RequestModel import RequestModel
from socket import socket as Socket
from socket import AF_INET as ADDRESS_FAMILY_IPV4
from socket import SOCK_STREAM as SOCKET_STREAM
from socket import SOL_SOCKET as SOCKET_LAYER
from socket import SO_REUSEADDR as SOCKET_REUSE_ADDRESS
from Utils.Utils import Utils
from Utils.Check import Check
class HTTPSocketDriver:
def __init__(self:Self, ot:OpoTestsInterface, inputs:Optional[dict[str, Any|None]|tuple[Any|None]|list] = None) -> None:
self.ot:OpoTestsInterface = ot
self.__inputs:dict[str, Any|None] = Utils.get_dictionary(inputs)
self.__socket:Socket|None = None
self.__address:str = "localhost"
self.__port:int = 8080
self.__maximum_connections:int = 5
self.__cache_size:int = 1024
self.__public_path:str = "/Public"
self.__index_files:list[str] = ["index.html", "index.htm"]
self.__working:bool = False
def update(self:Self) -> None:
self.__address = self.ot.settings.get(("address", "http_address"), self.__inputs, self.__address)
self.__port = self.ot.settings.get(("port", "http_port"), self.__inputs, self.__port)
self.__maximum_connections = self.ot.settings.get(("maximum_connections", "http_maximum_connections"), self.__inputs, self.__maximum_connections)
self.__cache_size = self.ot.settings.get(("cache_size", "http_cache_size"), self.__inputs, self.__cache_size)
self.__public_path = self.ot.settings.get(("public_path", "http_public_path"), self.__inputs, self.__public_path)
self.__index_files = self.ot.settings.get(("index_files", "http_index_files"), self.__inputs, self.__index_files)
def __listen(self:Self) -> None:
while self.__working:
try:
client:Socket
address:str
port:int
data:bytes = b""
response:Any|None
parameters:dict[str, Any|None]
client, (address, port) = self.__socket.accept()
while True:
buffer:bytes = client.recv(self.__cache_size)
data += buffer
if len(buffer) < self.__cache_size:
break
response, parameters = self.ot.routes.go(RequestModel(data.decode("utf-8"), {
"address" : address,
"port" : port,
"client" : client,
"public_path" : self.__public_path,
"index_files" : ["", *self.__index_files]
}))
client.sendall(
str(Utils.string_variables((
"{http_protocol}/{http_version} {http_code} {http_message}\r\n" +
"Content-Type: {mime}"
), {
"http_protocol" : "HTTP",
"http_version" : "1.1",
"http_code" : 200,
"http_message" : "OK",
"mime" : "text/plain",
**parameters
})).encode("utf-8") +
b"\r\n\r\n" +
(
str(response).encode("utf-8") if Check.is_string(response) else
response if Check.is_bytes(response) else
Utils.json_encode(response).encode("utf-8") if Check.is_json_object(response) else
bytes(response))
)
client.close()
except Exception as exception:
self.ot.exception(exception, "opo_tests_http_error_listen", {
"address" : self.__address,
"port" : self.__port
})
def start(self:Self) -> None:
self.__working = True
self.__socket = Socket(ADDRESS_FAMILY_IPV4, SOCKET_STREAM)
self.update()
try:
self.__socket.setsockopt(SOCKET_LAYER, SOCKET_REUSE_ADDRESS, 1)
self.__socket.bind((self.__address, self.__port))
self.__socket.listen(self.__maximum_connections)
Thread(target=self.__listen).start()
self.ot.print("info", "opo_tests_http_started", {
"address" : self.__address,
"port" : self.__port,
}, "HTTP server started on '{address}:{port}'.")
except Exception as exception:
self.close()
self.ot.exception(exception, "opo_tests_http_error_start", {
"address" : self.__address,
"port" : self.__port
})
def close(self:Self) -> None:
self.__working = False