#!/usr/bin/env python3 # -*- coding: utf-8 -*- from typing import Any, Self, Optional, Sequence from abc import ABC, abstractmethod from Interfaces.Application.AnPInterface import AnPInterface from Application.Event import Event from Utils.Common import Common class WebSocketServersAbstract(ABC): def __init__(self:Self, anp:AnPInterface, key:str, inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None) -> None: self.anp:AnPInterface = anp self.key:str = key self.host:str = self.anp.settings.get(("web_socket_server_host", "host"), inputs, "") self.port:int = self.anp.settings.get(("web_socket_server_port", "port"), inputs, 18765) self.__sessions:dict[str, str] = {} self.on_new_client:Event = Event() self.on_message:Event = Event() self.on_close:Event = Event() self.on_error:Event = Event() @abstractmethod def start(self:Self) -> None:pass @abstractmethod def close(self:Self) -> None:pass @abstractmethod def close_client(self:Self, id:int) -> None:pass def format_data(self:Self, controller:str, action:str, data:Any|None, id:str, code:int = 200) -> str: return Common.data_encode({ "ok" : code >= 200 and code < 300, "code" : code, "controller": controller, "action": action, "data": data, "id": id }) @abstractmethod def send(self:Self, controller:str, action:str, data:Any|None, ids:Optional[str|Sequence[str]] = None, code:int = 200) -> None:pass def set_session(self:Self, id:str, session:str) -> None: if id not in self.__sessions: self.__sessions[id] = session def get_session(self:Self, id:str) -> str|None: return self.__sessions.get(id) def remove_session(self:Self, id:str) -> None: if id in self.__sessions: del self.__sessions[id]