AnP/Python/Managers/PseudoLoRAsManager.py
2026-06-11 13:17:31 +02:00

151 lines
4.8 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Self, Any, Sequence, Callable, Optional
from Interfaces.Application.AnPInterface import AnPInterface
from Models.PseudoLoRAModel import PseudoLoRAModel
from Utils.Common import Common
from Utils.Checks import Check
class PseudoLoRAsManager:
def __init__(self:Self, anp:AnPInterface) -> None:
self.anp:AnPInterface = anp
self.__maximum_cache:int = (1 << 20) * 100
self.__memory_cached:int = 0
self.__cache:dict[int, PseudoLoRAModel] = {}
self.__cache_i:int = 0
self.__loras:list[PseudoLoRAModel] = []
self.update()
def update(self:Self) -> None:
key:str
self.clean_cache()
self.__loras.clear()
self.__maximum_cache = self.anp.settings.get((
"pseudoloras_maximum_cache_size", "maximum_cache_size"
), None, self.__maximum_cache)
for key in ("default_pseudoloras_files", "pseudoloras_files", "default_pseudoloras", "pseudoloras"):
self.add(self.anp.settings.get(key))
def __try_load_file(self:Self, path:str) -> bool:
data:list[dict[str, Any|None]|Sequence[Any|None]] = Common.load_json(path)
if len(data):
self.add(data)
return True
return False
def add(self:Self, inputs:Any|None) -> None:
if isinstance(inputs, PseudoLoRAModel):
self.__loras.append(inputs)
else:
subinputs:dict[str, Any|None]|Sequence[Any|None]
for subinputs in Common.load_json(inputs, False):
if Check.is_string(subinputs):
self.__try_load_file(subinputs)
elif Check.is_array(subinputs):
if all(Check.is_string(item) for item in subinputs):
item:str
ok:bool = True
for item in subinputs:
if not self.__try_load_file(item):
ok = False
break
if not ok:
self.__loras.append(PseudoLoRAModel(*subinputs))
elif (
len(subinputs) >= 2 and len(subinputs) <= 4 and
Check.is_string(subinputs[0]) and Check.is_string(subinputs[1]) and
(len(subinputs) < 3 or subinputs[2] is None or (Check.is_array(subinputs[2]) and all(Check.is_string(item) for item in subinputs[2]))) and
(len(subinputs) < 4 or subinputs[3] is None or Check.is_boolean(subinputs[3]))
):
self.__loras.append(PseudoLoRAModel(*subinputs))
else:
item:Any|None
for item in subinputs:
self.add(item)
def clean_cache(self:Self) -> None:
lora:PseudoLoRAModel
self.__memory_cached = 0
self.__cache.clear()
for lora in self.__loras:
lora.clean_cache()
def get(self:Self,
callback:Callable[[list[PseudoLoRAModel]], list[bool]],
loras:Optional[list[PseudoLoRAModel]] = None
) -> list[tuple[str, str]]:
results:list[tuple[str, str]] = []
i:int
ok:bool
if loras is None:
loras = self.__loras.copy()
loras_is:list[bool] = callback([lora.title for lora in loras])
new_loras:list[PseudoLoRAModel] = []
for i, ok in enumerate(loras_is):
if not ok:
continue
lora:PseudoLoRAModel = loras[i]
if lora.path is not None:
results.append((lora.title, Common.load_file(lora.path, "r")))
# data:str|None
# if lora.cacheable:
# if not lora.cache:
# lora.cache = Common.load_file(lora.path, "r")
# lora.memory = len(lora.cache)
# self.__memory_cached += lora.memory
# if self.__cache[lora.i]:
# del self.__cache[lora.i]
# lora.i = self.__cache_i
# self.__cache_i += 1
# self.__cache[lora.i] = lora
# if self.__memory_cached > self.__maximum_cache:
# i:int = min(self.__cache.keys())
# for lora in self.__cache[i]:
# lora.clean_cache()
# del self.__cache[i]
# if (data := lora.cache or Common.load_file(lora.path, "r")):
# results.append((lora.title, lora.path))
else:
new_loras.extend(lora.nested)
if len(new_loras):
results.extend(self.get(callback, new_loras))
return results