AnP/Python/Managers/PseudoLoRAsManager.py

155 lines
5.1 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Self, Any, Sequence, Callable, Optional
from Interfaces.Application.AnPInterface import AnPInterface
from Models.PseudoLoRAModel import PseudoLoRAModel
from Utils.Common import Common
from Utils.Checks import Check
class PseudoLoRAsManager:
def __init__(self:Self, anp:AnPInterface) -> None:
self.anp:AnPInterface = anp
self.__maximum_cache:int = (1 << 20) * 100
self.__memory_cached:int = 0
self.__cache:dict[int, PseudoLoRAModel] = {}
self.__loras:list[PseudoLoRAModel] = []
self.update()
def update(self:Self) -> None:
key:str
self.clean_cache()
self.__loras.clear()
self.__maximum_cache = self.anp.settings.get((
"pseudoloras_maximum_cache_size", "maximum_cache_size"
), None, self.__maximum_cache)
for key in ("default_pseudoloras_files", "pseudoloras_files", "default_pseudoloras", "pseudoloras"):
self.add(self.anp.settings.get(key))
def __try_load_file(self:Self, path:str) -> bool:
data:list[dict[str, Any|None]|Sequence[Any|None]] = Common.load_json(path)
if len(data):
self.add(data)
return True
return False
def add(self:Self, inputs:Any|None) -> None:
if isinstance(inputs, PseudoLoRAModel):
self.__loras.append(inputs)
else:
subinputs:dict[str, Any|None]|Sequence[Any|None]
for subinputs in Common.load_json(inputs, False):
if Check.is_string(subinputs):
self.__try_load_file(subinputs)
elif Check.is_array(subinputs):
if all(Check.is_string(item) for item in subinputs):
item:str
ok:bool = True
for item in subinputs:
if not self.__try_load_file(item):
ok = False
break
if not ok:
self.__loras.append(PseudoLoRAModel(*subinputs))
elif (
len(subinputs) >= 2 and len(subinputs) <= 4 and
Check.is_string(subinputs[0]) and Check.is_string(subinputs[1]) and
(len(subinputs) < 3 or subinputs[2] is None or (Check.is_array(subinputs[2]) and all(Check.is_string(item) for item in subinputs[2]))) and
(len(subinputs) < 4 or subinputs[3] is None or Check.is_boolean(subinputs[3]))
):
self.__loras.append(PseudoLoRAModel(*subinputs))
else:
item:Any|None
for item in subinputs:
self.add(item)
def clean_cache(self:Self) -> None:
lora:PseudoLoRAModel
self.__memory_cached = 0
self.__cache.clear()
for lora in self.__loras:
lora.clean_cache()
def get_asynchronous(self:Self,
each_callback:Callable[[list[PseudoLoRAModel], list[PseudoLoRAModel], Callable[[list[PseudoLoRAModel]], None]], bool],
end_callback:Callable[[list[PseudoLoRAModel]], None],
keys:list[str] = [],
loras:Optional[list[PseudoLoRAModel]] = None,
results:list[PseudoLoRAModel] = []
) -> None:
Common.execute(each_callback, )
def get(self:Self, callback:Callable[[list[PseudoLoRAModel]], bool], keys:list[str] = []) -> list[tuple[str, str]]:
next:list[PseudoLoRAModel] = []
results:list[tuple[str, str]] = []
i:int
ok:bool
has_keys:bool = len(keys) > 0
for i, ok in enumerate(callback(
lora.title for lora in self.__loras if not has_keys or any(
key in lora.keys for key in keys
)
)):
if not ok:
continue
lora:PseudoLoRAModel = self.__loras[i]
if lora.path is not None:
data:str|None
if lora.cacheable:
if not lora.cache:
lora.cache = Common.load_file(lora.path, "r")
lora.memory = len(lora.cache)
self.__memory_cached += lora.memory
if self.__cache[lora.i]:
del self.__cache[lora.i]
lora.i += 1
if lora.i in self.__cache:
self.__cache[lora.i].append(lora)
else:
self.__cache[lora.i] = [lora]
if self.__memory_cached > self.__maximum_cache:
i:int = min(self.__cache.keys())
for lora in self.__cache[i]:
lora.clean_cache()
del self.__cache[i]
if (data := lora.cache or Common.load_file(lora.path, "r")):
results.append((lora.title, lora.path))
else:
next.extend(lora.nested)
if len(next):
results.extend(callback(next))
return results