#!/usr/bin/env python3 # -*- coding: utf-8 -*- from typing import Self, Any, Callable from threading import Thread from Interfaces.Application.NucelarMonitorInterface import NucelarMonitorInterface from Utils.Utils import Utils class TerminalManager: def __init__(self:Self, nucelar_monitor:NucelarMonitorInterface) -> None: self.nucelar_monitor:NucelarMonitorInterface = nucelar_monitor key:str self.__commands:list[tuple[list[str], Callable[[dict[str, Any|None], list[Any|None]], None]]] = [ [["close", "exit", "quit", "bye"], self.__close_command] ] self.__thread:Thread|None self.__started:bool = False for key in ( "default_commands_files", "commands_files", "default_commands", "commands" ): self.add(self.nucelar_monitor.settings.get(key, None, []), True) def start(self:Self) -> None: if self.__started: return self.__started = True self.__thread = Thread(target = self.__listener) self.__thread.start() def close(self:Self) -> None: if not self.__started: return self.__started = False def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None: alternatives:list[str] callback:Callable[[dict[str, Any|None], list[Any|None]], None]|str for alternatives, callback in self.nucelar_monitor.files.load_json(inputs, False): done:bool = False command:tuple[list[str], Callable[[dict[str, Any|None], list[Any|None]], None]] i:int if isinstance(callback, str): callback = Utils.get_function(callback, [self.nucelar_monitor]) for i, command in enumerate(self.__commands): key:str for key in alternatives: if key in command[0]: if overwrite: self.__commands[i] = (alternatives, callback) done = True break if not done: self.__commands.append((alternatives, callback)) def __listener(self:Self) -> None: while self.nucelar_monitor.is_working(): try: parameters:dict[str, Any|None] = {} arguments:list[Any|None] = [] command:str = input().strip().lower() if command: for commands, callback in self.__commands: if command in commands: callback(parameters, *arguments) break except Exception as exception: self.nucelar_monitor.exception(exception, "command_listener_exception", { "command" : command }) def __close_command(self:Self, inputs:dict[str, Any|None], *arguments:list[Any|None]) -> None: self.nucelar_monitor.close()