NucelarMonitor/Python/Managers/TerminalManager.py

89 lines
2.9 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Self, Any, Callable
from threading import Thread
from Interfaces.Application.NucelarMonitorInterface import NucelarMonitorInterface
from Utils.Utils import Utils
class TerminalManager:
def __init__(self:Self, nucelar_monitor:NucelarMonitorInterface) -> None:
self.nucelar_monitor:NucelarMonitorInterface = nucelar_monitor
key:str
self.__commands:list[tuple[list[str], Callable[[dict[str, Any|None], list[Any|None]], None]]] = [
[["close", "exit", "quit", "bye"], self.__close_command]
]
self.__thread:Thread|None
self.__started:bool = False
for key in (
"default_commands_files", "commands_files",
"default_commands", "commands"
):
self.add(self.nucelar_monitor.settings.get(key, None, []), True)
def start(self:Self) -> None:
if self.__started:
return
self.__started = True
self.__thread = Thread(target = self.__listener)
self.__thread.start()
def close(self:Self) -> None:
if not self.__started:
return
self.__started = False
def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None:
alternatives:list[str]
callback:Callable[[dict[str, Any|None], list[Any|None]], None]|str
for alternatives, callback in self.nucelar_monitor.files.load_json(inputs, False):
done:bool = False
command:tuple[list[str], Callable[[dict[str, Any|None], list[Any|None]], None]]
i:int
if isinstance(callback, str):
callback = Utils.get_function(callback, [self.nucelar_monitor])
for i, command in enumerate(self.__commands):
key:str
for key in alternatives:
if key in command[0]:
if overwrite:
self.__commands[i] = (alternatives, callback)
done = True
break
if not done:
self.__commands.append((alternatives, callback))
def __listener(self:Self) -> None:
while self.nucelar_monitor.is_working():
try:
parameters:dict[str, Any|None] = {}
arguments:list[Any|None] = []
command:str = input().strip().lower()
if command:
for commands, callback in self.__commands:
if command in commands:
callback(parameters, *arguments)
break
except Exception as exception:
self.nucelar_monitor.exception(exception, "command_listener_exception", {
"command" : command
})
def __close_command(self:Self, inputs:dict[str, Any|None], *arguments:list[Any|None]) -> None:
self.nucelar_monitor.close()