NucelarMonitor/Python/Managers/DispatchersManager.py

45 lines
1.6 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Any, Self, TypeVar
from Interfaces.Application.NucelarMonitorInterface import NucelarMonitorInterface
from Abstracts.DispatcherAbstract import DispatcherAbstract
T = TypeVar("T", bound = DispatcherAbstract)
class DispatchersManager:
def __init__(self:Self, nucelar_monitor:NucelarMonitorInterface) -> None:
self.nucelar_monitor:NucelarMonitorInterface = nucelar_monitor
key:str
self.__dispatcher:dict[str, DispatcherAbstract] = {}
for key in (
"default_dispatcher_files", "dispatcher_files",
"default_dispatcher", "dispatcher"
):
self.add(self.nucelar_monitor.settings.get(key, None, []), True)
def get(self:Self, Type:type[T], key:str) -> T|None:
if key in self.__dispatcher and isinstance(self.__dispatcher[key], Type):
return self.__dispatcher[key]
return None
def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None:
subinputs:dict[str, str|DispatcherAbstract]
for subinputs in self.nucelar_monitor.files.load_json(inputs):
key:str
dispatcher:DispatcherAbstract|str
for key, dispatcher in subinputs.items():
if isinstance(dispatcher, str):
dispatcher = self.nucelar_monitor.models.get(DispatcherAbstract, dispatcher)(self.nucelar_monitor)
if dispatcher is not None and isinstance(dispatcher, DispatcherAbstract) and (
overwrite or key not in self.__dispatcher
):
self.__dispatcher[key] = dispatcher