#!/usr/bin/env python3 # -*- coding: utf-8 -*- from typing import Any, Self, Optional, Sequence from Interfaces.Application.AnPInterface import AnPInterface from Abstracts.WebSocketsServerAbstract import WebSocketsServerAbstract from Application.Event import Event from Models.RequestModel import RequestModel from Utils.Checks import Check from Utils.Common import Common class WebSocketsServerManager: def __init__(self:Self, anp:AnPInterface, inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None) -> None: self.anp:AnPInterface = anp self.__web_sockets:dict[str, WebSocketsServerAbstract] = {} self.on_new_client:Event = Event() self.on_message:Event = Event() self.on_close:Event = Event() self.on_error:Event = Event() self.on_message.add(self.__receive) self.update() def update(self:Self) -> None: key:str for key in ("default_web_sockets_server_files", "web_sockets_server_files", "default_web_sockets_server", "web_sockets_server"): self.add(self.anp.settings.get(key), True) def reset(self:Self) -> None: self.__web_sockets = {} self.update() def close(self:Self) -> None: for web_socket in self.__web_sockets.values(): web_socket.close() self.__web_sockets = {} def __set(self:Self, name:str, web_socket:WebSocketsServerAbstract) -> None: self.__web_sockets[name] = web_socket web_socket.on_new_client.add(lambda id:self.on_new_client(web_socket, id, name)) web_socket.on_message.add(lambda id, message:self.on_message(web_socket, id, message, name)) web_socket.on_close.add(lambda id:self.on_close(web_socket, id, name)) web_socket.on_error.add(lambda id, exception:self.on_error(web_socket, id, exception, name)) def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None: subinputs:dict[str, Any|None] for subinputs in Common.load_json(inputs, True): key:str value:Any|None for key, value in subinputs.items(): if isinstance(value, WebSocketsServerAbstract): if overwrite or key not in self.__web_sockets: self.__set(key, value) elif Check.is_dictionary(value): if overwrite or key not in self.__web_sockets: _type:str|None = Common.get_value("type", value) if _type is None: continue Class:type[WebSocketsServerAbstract] = self.anp.models.get(WebSocketsServerAbstract, _type) Class and issubclass(Class, WebSocketsServerAbstract) and self.__set(key, Class(self.anp, value)) def remove(self:Self, names:str|Sequence[str]) -> None: for name in names if Check.is_array(names) else [names]: if name in self.__web_sockets: try: self.__web_sockets[name].close() del self.__web_sockets[name] except Exception as exception: self.anp.exception(exception, "web_socket_server_close_exception", {"name": name}) def get(self:Self, name:str) -> WebSocketsServerAbstract|None: return self.__web_sockets.get(name) def send(self:Self, name:str, controller:str, method:str, data:Optional[Any] = None, clients:Optional[int|Sequence[int]] = None, code:int = 200 ) -> None: if name in self.__web_sockets: try: self.__web_sockets[name].send(Common.data_encode({ "ok" : code >= 200 and code < 300, "code" : code, "controller" : controller, "method" : method, "data" : data }), clients) except Exception as exception: self.anp.exception(exception, "web_socket_server_send_exception", {"name": name}) def __receive(self:Self, web_socket:WebSocketsServerAbstract, client:int, raw_data:str, name:str) -> None: data:dict[str, Any|None] = Common.data_decode(raw_data) if "controller" in data and "method" in data: request:RequestModel = RequestModel() request.data = data.get("data", None) request.variables["web_socket"] = web_socket request.variables["client_id"] = client request.variables["web_socket_name"] = name self.anp.controllers.execute( data["controller"], data["method"], request )