#!/usr/bin/env python3 # -*- coding: utf-8 -*- from typing import Any, Self from Interfaces.Application.CXCVInterface import CXCVInterface from Models.HashModel import HashModel from Drivers.LocalFileDriver import LocalFileDriver from Drivers.SFTPFileDriver import SFTPFileDriver from Interfaces.FilesInterface import FilesInterface from Utils.Utils import Utils from Utils.Patterns import RE class FileModel: def __init__(self:Self, cxcv:CXCVInterface, path:str, chunk_size:int = 4096) -> None: self.cxcv:CXCVInterface = cxcv self.path:str = path self.size:int = 0 self.hash:HashModel = HashModel(self.cxcv, chunk_size) def update(self:Self, chunk:bytes) -> None: self.size += len(chunk) self.hash.update(chunk) def check_integrity(self:Self, other:type[Self]) -> bool: other:FileModel = other return ( self.size == other.size and self.hash.md5 == other.hash.md5 and self.hash.sha1 == other.hash.sha1 and self.hash.sha256 == other.hash.sha256 and self.hash.blake2b == other.hash.blake2b and self.hash.crc32 == other.hash.crc32 ) @staticmethod def get_inputs(*inputs:list[str|dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...]]) -> dict[str, Any|None]: return Utils.get_dictionary([( {"path" : subinputs} if isinstance(subinputs, str) else subinputs) for subinputs in inputs]) @staticmethod def get_driver( cxcv:CXCVInterface, inputs:str|dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...] ) -> FilesInterface: if isinstance(inputs, str): user:str|None = None password:str|None = None host:str|None = None port:int|None = None path:str = "" user, password, host, port, path = RE.FILE_PATH.match(inputs).groups() inputs = { "user": user, "password": password, "host": host, "port": int(port) if port else None, "path": path } return ( SFTPFileDriver if "host" in inputs else LocalFileDriver)(cxcv, Utils.get_value("path", inputs, "."), inputs)