#!/usr/bin/env python3 # -*- coding: utf-8 -*- from typing import Self, Any, Sequence, Callable, Optional from Interfaces.Application.AnPInterface import AnPInterface from Models.PseudoLoRAModel import PseudoLoRAModel from Utils.Common import Common from Utils.Checks import Check class PseudoLoRAsManager: def __init__(self:Self, anp:AnPInterface) -> None: self.anp:AnPInterface = anp self.__maximum_cache:int = (1 << 20) * 100 self.__memory_cached:int = 0 self.__cache:dict[int, PseudoLoRAModel] = {} self.__loras:list[PseudoLoRAModel] = [] self.update() def update(self:Self) -> None: key:str self.clean_cache() self.__loras.clear() self.__maximum_cache = self.anp.settings.get(( "pseudoloras_maximum_cache_size", "maximum_cache_size" ), None, self.__maximum_cache) for key in ("default_pseudoloras_files", "pseudoloras_files", "default_pseudoloras", "pseudoloras"): self.add(self.anp.settings.get(key)) def __try_load_file(self:Self, path:str) -> bool: data:list[dict[str, Any|None]|Sequence[Any|None]] = Common.load_json(path) if len(data): self.add(data) return True return False def add(self:Self, inputs:Any|None) -> None: if isinstance(inputs, PseudoLoRAModel): self.__loras.append(inputs) else: subinputs:dict[str, Any|None]|Sequence[Any|None] for subinputs in Common.load_json(inputs, False): if Check.is_string(subinputs): self.__try_load_file(subinputs) elif Check.is_array(subinputs): if all(Check.is_string(item) for item in subinputs): item:str ok:bool = True for item in subinputs: if not self.__try_load_file(item): ok = False break if not ok: self.__loras.append(PseudoLoRAModel(*subinputs)) elif ( len(subinputs) >= 2 and len(subinputs) <= 4 and Check.is_string(subinputs[0]) and Check.is_string(subinputs[1]) and (len(subinputs) < 3 or subinputs[2] is None or (Check.is_array(subinputs[2]) and all(Check.is_string(item) for item in subinputs[2]))) and (len(subinputs) < 4 or subinputs[3] is None or Check.is_boolean(subinputs[3])) ): self.__loras.append(PseudoLoRAModel(*subinputs)) else: item:Any|None for item in subinputs: self.add(item) def clean_cache(self:Self) -> None: lora:PseudoLoRAModel self.__memory_cached = 0 self.__cache.clear() for lora in self.__loras: lora.clean_cache() def get(self:Self, callback:Callable[[list[PseudoLoRAModel]], bool], keys:list[str] = []) -> list[tuple[str, str]]: next:list[PseudoLoRAModel] = [] results:list[tuple[str, str]] = [] i:int ok:bool has_keys:bool = len(keys) > 0 for i, ok in enumerate(callback( lora.title for lora in self.__loras if not has_keys or any( key in lora.keys for key in keys ) )): if not ok: continue lora:PseudoLoRAModel = self.__loras[i] if lora.path is not None: data:str|None if lora.cacheable: if not lora.cache: lora.cache = Common.load_file(lora.path, "r") lora.memory = len(lora.cache) self.__memory_cached += lora.memory if self.__cache[lora.i]: del self.__cache[lora.i] lora.i += 1 if lora.i in self.__cache: self.__cache[lora.i].append(lora) else: self.__cache[lora.i] = [lora] if self.__memory_cached > self.__maximum_cache: i:int = min(self.__cache.keys()) for lora in self.__cache[i]: lora.clean_cache() del self.__cache[i] if (data := lora.cache or Common.load_file(lora.path, "r")): results.append((lora.title, lora.path)) else: next.extend(lora.nested) if len(next): results.extend(callback(next)) return results