NucelarMonitor/Python/Managers/DispatchersManager.py
2026-03-23 14:56:59 +01:00

41 lines
1.5 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Any, Self
from Interfaces.Application.NucelarMonitorInterface import NucelarMonitorInterface
from Abstracts.DispatcherAbstract import DispatcherAbstract
class DispatchersManager:
def __init__(self:Self, nucelar_monitor:NucelarMonitorInterface) -> None:
self.nucelar_monitor:NucelarMonitorInterface = nucelar_monitor
key:str
self.__dispatcher:dict[str, DispatcherAbstract] = {}
for key in (
"default_dispatcher_files", "dispatcher_files",
"default_dispatcher", "dispatcher"
):
self.add(self.nucelar_monitor.settings.get(key, None, []), True)
def get(self:Self, key:str) -> DispatcherAbstract|None:
return self.__dispatcher.get(key, None)
def add(self:Self, inputs:Any|None, overwrite:bool = False) -> None:
subinputs:dict[str, str|DispatcherAbstract]
for subinputs in self.nucelar_monitor.files.load_json(inputs):
key:str
dispatcher:DispatcherAbstract|str
for key, dispatcher in subinputs.items():
if isinstance(dispatcher, str):
dispatcher = self.nucelar_monitor.models.get(DispatcherAbstract, dispatcher)(self.nucelar_monitor)
if dispatcher is not None and isinstance(dispatcher, DispatcherAbstract) and (
overwrite or key not in self.__dispatcher
):
self.__dispatcher[key] = dispatcher