#!/usr/bin/env python3 # -*- coding: utf-8 -*- from typing import Self, Optional, Any from threading import Thread from time import time as timestamp from Interfaces.Application.AnPInterface import AnPInterface from Models.SessionModel import SessionModel class SessionsManager: def __init__(self:Self, anp:AnPInterface) -> None: self.anp:AnPInterface = anp self.__sessions:dict[str, SessionModel] = {} self.__thread:Thread = Thread(target = self.__autoclean) self.__timeout:float|int = self.anp.settings.get(("sessions_timeout", "timeout"), None, 3600) self.__clean_waiter:float|int = self.anp.settings.get(("sessions_clean_waiter", "clean_waiter"), None, 60) self.update() self.__thread.start() def update(self:Self) -> None:pass def reset(self:Self) -> None: self.__sessions = {} self.update() def __autoclean(self:Self) -> None: while self.anp.working(): id:str session:SessionModel time:float = timestamp() for id, session in tuple(self.__sessions.items()): if time - session.get("date_last", time) > self.__timeout: self.remove(id) self.anp.wait(self.__clean_waiter) def create(self:Self) -> SessionModel: id:str = self.anp.unique_keys.get() self.__sessions[id] = SessionModel(id) return id def _get_instance(self:Self, id:str) -> SessionModel|None: if id in self.__sessions and not self.__sessions[id].removed: return self.__sessions[id] return None def get(self:Self, id:str, key:str, default:Optional[Any] = None) -> Any|None: if id in self.__sessions and not self.__sessions[id].removed: return self.__sessions[id].get(key, default) return default def set(self:Self, id:str, key:str, value:Any|None) -> bool: if id in self.__sessions and not self.__sessions[id].removed: self.__sessions[id].set(key, value) return True return False def remove(self:Self, id:str) -> bool: if id in self.__sessions: self.__sessions[id].remove() del self.__sessions[id] return True return False