AnP/Python/Drivers/WebSocketServerDriver.py
2026-06-05 15:00:29 +02:00

115 lines
4.1 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from threading import Thread
from typing import Any, Self, Sequence, Optional
from Abstracts.WebSocketServersAbstract import WebSocketServersAbstract
from Abstracts.ModelAbstract import ModelAbstract
from websockets.sync.server import serve as server_serve
from websockets import Server as WebSocketServer, ClientConnection as WebSocketClient
from Interfaces.Application.AnPInterface import AnPInterface
from Utils.Checks import Check
class WebSocketServerDriver(WebSocketServersAbstract, ModelAbstract):
def __init__(self:Self, anp:AnPInterface, inputs:Optional[dict[str, Any|None]|Sequence[Any|None]] = None) -> None:
super().__init__(anp, inputs)
self.__server:WebSocketServer
self.__clients:dict[str, WebSocketClient] = {}
self.__host:str = anp.settings.get(("web_socket_server_host", "host"), inputs, "")
self.__port:int = anp.settings.get(("web_socket_server_port", "port"), inputs, 8765)
self.__thread:Thread = None
anp.settings.get(("web_socket_server_autostart", "autostart"), inputs, True) and self.start()
def __run(self:Self) -> None:
self.__server = server_serve(self.__handler, self.__host, self.__port)
self.__server.serve_forever()
def start(self:Self) -> None:
self.__thread = Thread(target = self.__run)
self.__thread.start()
def close(self:Self) -> None:
id:str
for id in tuple(self.__clients.keys()):
self.close_client(id)
self.__server.shutdown()
def close_client(self:Self, id:int) -> None:
if id in self.__clients:
try:
self.__clients[id].close()
except Exception as exception:
self.anp.exception(exception, "web_socket_server_client_close_exception", {
"client": id,
"port": self.port,
"host": self.host
})
del self.__clients[id]
def __handler(self:Self, client:WebSocketClient) -> None:
id:int = self.anp.unique_keys.create()
self.__clients[id] = client
self.on_new_client.execute(id)
self.anp.print("info", "web_socket_server_client_connected", {
"client": id,
"port": self.port,
"host": self.host,
"client_host" : client.remote_address[0],
"client_port" : client.remote_address[1]
})
self.send("web_socket_client", "set_id", id, id)
try:
while self.anp.working():
message:str = client.recv()
if message is None:
break
self.on_message.execute(id, message)
except Exception as exception:
self.anp.exception(exception, "web_socket_server_client_exception", {
"client": id,
"port": self.port,
"host": self.host
})
self.on_error.execute(id, exception)
finally:
self.close_client(id)
self.anp.unique_keys.remove(id)
self.on_close.execute(id)
self.anp.print("info", "web_socket_server_client_disconnected", {
"client": id,
"port": self.port,
"host": self.host
})
def send(self:Self, controller:str, method:str, data:str, ids:Optional[int|Sequence[int]] = None, code:int = 200) -> bool:
success:bool = True
id:int
for id in (
list(self.__clients.keys()) if ids is None else
ids if Check.is_array(ids) else
[ids]):
if id in self.__clients:
try:
self.__clients[id].send(self.format_data(controller, method, data, id, code))
except Exception as exception:
self.anp.exception(exception, "web_socket_server_client_send_exception", {
"client": id,
"port": self.port,
"host": self.host
})
success = False
return success