45 lines
1.7 KiB
Python
45 lines
1.7 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from typing import Any, Self, TypeVar
|
|
from Interfaces.Application.NucelarMonitorInterface import NucelarMonitorInterface
|
|
from Interfaces.Owners.DispatchersInterface import DispatchersInterface
|
|
|
|
T = TypeVar("T", bound = DispatchersInterface)
|
|
|
|
class DispatchersManager:
|
|
|
|
def __init__(self:Self, nucelar_monitor:NucelarMonitorInterface) -> None:
|
|
self.nucelar_monitor:NucelarMonitorInterface = nucelar_monitor
|
|
|
|
key:str
|
|
|
|
self.__dispatchers:dict[str, DispatchersInterface] = {}
|
|
|
|
for key in (
|
|
"default_dispatchers_files", "dispatchers_files",
|
|
"default_dispatchers", "dispatchers"
|
|
):
|
|
self.add(self.nucelar_monitor.settings.get(key, None, []), True)
|
|
|
|
def get(self:Self, Type:type[T], key:str) -> T|None:
|
|
if key in self.__dispatchers and isinstance(self.__dispatchers[key], Type):
|
|
return self.__dispatchers[key]
|
|
return None
|
|
|
|
def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None:
|
|
|
|
subinputs:dict[str, str|DispatchersInterface]
|
|
|
|
for subinputs in self.nucelar_monitor.files.load_json(inputs):
|
|
|
|
key:str
|
|
dispatcher:DispatchersInterface|str
|
|
|
|
for key, dispatcher in subinputs.items():
|
|
if isinstance(dispatcher, str):
|
|
dispatcher = self.nucelar_monitor.models.get(DispatchersInterface, dispatcher)(self.nucelar_monitor)
|
|
if dispatcher is not None and isinstance(dispatcher, DispatchersInterface) and (
|
|
overwrite or key not in self.__dispatchers
|
|
):
|
|
self.__dispatchers[key] = dispatcher |