146 lines
4.7 KiB
Python
146 lines
4.7 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from typing import Self, Any, Sequence, Callable
|
|
from Interfaces.Application.AnPInterface import AnPInterface
|
|
from Models.PseudoLoRAModel import PseudoLoRAModel
|
|
from Utils.Common import Common
|
|
from Utils.Checks import Check
|
|
|
|
class PseudoLoRAsManager:
|
|
|
|
def __init__(self:Self, anp:AnPInterface) -> None:
|
|
|
|
self.anp:AnPInterface = anp
|
|
self.__maximum_cache:int = (1 << 20) * 100
|
|
self.__memory_cached:int = 0
|
|
self.__cache:dict[int, PseudoLoRAModel] = {}
|
|
self.__loras:list[PseudoLoRAModel] = []
|
|
|
|
self.update()
|
|
|
|
def update(self:Self) -> None:
|
|
|
|
key:str
|
|
|
|
self.clean_cache()
|
|
self.__loras.clear()
|
|
self.__maximum_cache = self.anp.settings.get((
|
|
"pseudoloras_maximum_cache_size", "maximum_cache_size"
|
|
), None, self.__maximum_cache)
|
|
|
|
for key in ("default_pseudoloras_files", "pseudoloras_files", "default_pseudoloras", "pseudoloras"):
|
|
self.add(self.anp.settings.get(key))
|
|
|
|
def __try_load_file(self:Self, path:str) -> bool:
|
|
|
|
data:list[dict[str, Any|None]|Sequence[Any|None]] = Common.load_json(path)
|
|
|
|
if len(data):
|
|
self.add(data)
|
|
return True
|
|
return False
|
|
|
|
def add(self:Self, inputs:Any|None) -> None:
|
|
if isinstance(inputs, PseudoLoRAModel):
|
|
self.__loras.append(inputs)
|
|
else:
|
|
|
|
subinputs:dict[str, Any|None]|Sequence[Any|None]
|
|
|
|
for subinputs in Common.load_json(inputs, False):
|
|
if Check.is_string(subinputs):
|
|
self.__try_load_file(subinputs)
|
|
elif Check.is_array(subinputs):
|
|
if all(Check.is_string(item) for item in subinputs):
|
|
|
|
item:str
|
|
ok:bool = True
|
|
|
|
for item in subinputs:
|
|
if not self.__try_load_file(item):
|
|
ok = False
|
|
break
|
|
|
|
if not ok:
|
|
self.__loras.append(PseudoLoRAModel(*subinputs))
|
|
|
|
elif (
|
|
len(subinputs) >= 2 and len(subinputs) <= 4 and
|
|
Check.is_string(subinputs[0]) and Check.is_string(subinputs[1]) and
|
|
(len(subinputs) < 3 or subinputs[2] is None or (Check.is_array(subinputs[2]) and all(Check.is_string(item) for item in subinputs[2]))) and
|
|
(len(subinputs) < 4 or subinputs[3] is None or Check.is_boolean(subinputs[3]))
|
|
):
|
|
self.__loras.append(PseudoLoRAModel(*subinputs))
|
|
else:
|
|
|
|
item:Any|None
|
|
|
|
for item in subinputs:
|
|
self.add(item)
|
|
|
|
def clean_cache(self:Self) -> None:
|
|
|
|
lora:PseudoLoRAModel
|
|
|
|
self.__memory_cached = 0
|
|
self.__cache.clear()
|
|
|
|
for lora in self.__loras:
|
|
lora.clean_cache()
|
|
|
|
def get(self:Self, callback:Callable[[list[PseudoLoRAModel]], bool], keys:list[str] = []) -> list[tuple[str, str]]:
|
|
|
|
next:list[PseudoLoRAModel] = []
|
|
results:list[tuple[str, str]] = []
|
|
i:int
|
|
ok:bool
|
|
has_keys:bool = len(keys) > 0
|
|
|
|
for i, ok in enumerate(callback(
|
|
lora.title for lora in self.__loras if not has_keys or any(
|
|
key in lora.keys for key in keys
|
|
)
|
|
)):
|
|
if not ok:
|
|
continue
|
|
|
|
lora:PseudoLoRAModel = self.__loras[i]
|
|
|
|
if lora.path is not None:
|
|
|
|
data:str|None
|
|
|
|
if lora.cacheable:
|
|
if not lora.cache:
|
|
|
|
lora.cache = Common.load_file(lora.path, "r")
|
|
lora.memory = len(lora.cache)
|
|
self.__memory_cached += lora.memory
|
|
|
|
if self.__cache[lora.i]:
|
|
del self.__cache[lora.i]
|
|
lora.i += 1
|
|
if lora.i in self.__cache:
|
|
self.__cache[lora.i].append(lora)
|
|
else:
|
|
self.__cache[lora.i] = [lora]
|
|
|
|
if self.__memory_cached > self.__maximum_cache:
|
|
|
|
i:int = min(self.__cache.keys())
|
|
|
|
for lora in self.__cache[i]:
|
|
lora.clean_cache()
|
|
del self.__cache[i]
|
|
|
|
if (data := lora.cache or Common.load_file(lora.path, "r")):
|
|
results.append((lora.title, lora.path))
|
|
|
|
else:
|
|
next.extend(lora.nested)
|
|
|
|
if len(next):
|
|
results.extend(callback(next))
|
|
|
|
return results |