AnP/Python/Managers/HTTPServersManager.py

75 lines
2.4 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Self, Sequence, Any
from Interfaces.Application.AnPInterface import AnPInterface
from Abstracts.HTTPServersAbstract import HTTPServersAbstract
from Utils.Common import Common
from Utils.Checks import Check
class HTTPServersManager:
def __init__(self:Self, anp:AnPInterface) -> None:
self.anp:AnPInterface = anp
self.__servers:dict[str, HTTPServersAbstract] = {}
self.update()
def update(self:Self) -> None:
key:str
for key in ("default_http_servers_files", "http_servers_files", "default_http_servers", "http_servers"):
self.add(self.anp.settings.get(key), True)
def reset(self:Self) -> None:
self.__servers = {}
self.update()
def close(self:Self) -> None:
server:HTTPServersAbstract
for server in self.__servers.values():
server.close()
self.__servers = {}
def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None:
subinputs:dict[str, Any|None]
for subinputs in Common.load_json(inputs, True):
key:str
value:Any|None
for key, value in subinputs.items():
if isinstance(value, HTTPServersAbstract):
if overwrite or key not in self.__servers:
self.__servers[key] = value
elif Check.is_dictionary(value):
if overwrite or key not in self.__servers:
_type:str|None = Common.get_value("type", value)
if _type is None:
continue
Class:type[HTTPServersAbstract] = self.anp.models.get(HTTPServersAbstract, _type)
if Class and issubclass(Class, HTTPServersAbstract):
self.__servers[key] = Class(self.anp, key, value)
def remove(self:Self, names:str|Sequence[str]) -> None:
for name in Common.get_key(names):
if name in self.__servers:
try:
self.__servers[name].close()
del self.__servers[name]
except Exception as exception:
self.anp.exception(exception, "http_server_close_exception", {"name": name})
def get(self:Self, name:str) -> HTTPServersAbstract|None:
return self.__servers.get(name)