70 lines
2.3 KiB
Python
70 lines
2.3 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from typing import Any, Self
|
|
from Interfaces.Application.CXCVInterface import CXCVInterface
|
|
from Models.HashModel import HashModel
|
|
from Drivers.LocalFileDriver import LocalFileDriver
|
|
from Drivers.SFTPFileDriver import SFTPFileDriver
|
|
from Interfaces.FilesInterface import FilesInterface
|
|
from Utils.Utils import Utils
|
|
from Utils.Patterns import RE
|
|
|
|
class FileModel:
|
|
|
|
def __init__(self:Self, cxcv:CXCVInterface, path:str, chunk_size:int = 4096) -> None:
|
|
self.cxcv:CXCVInterface = cxcv
|
|
self.path:str = path
|
|
self.size:int = 0
|
|
self.hash:HashModel = HashModel(self.cxcv, chunk_size)
|
|
|
|
def update(self:Self, chunk:bytes) -> None:
|
|
self.size += len(chunk)
|
|
self.hash.update(chunk)
|
|
|
|
def check_integrity(self:Self, other:type[Self]) -> bool:
|
|
|
|
other:FileModel = other
|
|
|
|
return (
|
|
self.size == other.size and
|
|
self.hash.md5 == other.hash.md5 and
|
|
self.hash.sha1 == other.hash.sha1 and
|
|
self.hash.sha256 == other.hash.sha256 and
|
|
self.hash.blake2b == other.hash.blake2b and
|
|
self.hash.crc32 == other.hash.crc32
|
|
)
|
|
|
|
@staticmethod
|
|
def get_inputs(*inputs:list[str|dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...]]) -> dict[str, Any|None]:
|
|
return Utils.get_dictionary([(
|
|
{"path" : subinputs} if isinstance(subinputs, str) else
|
|
subinputs) for subinputs in inputs])
|
|
|
|
@staticmethod
|
|
def get_driver(
|
|
cxcv:CXCVInterface,
|
|
inputs:str|dict[str, Any|None]|list[Any|None]|tuple[Any|None, ...]
|
|
) -> FilesInterface:
|
|
|
|
if isinstance(inputs, str):
|
|
|
|
user:str|None = None
|
|
password:str|None = None
|
|
host:str|None = None
|
|
port:int|None = None
|
|
path:str = ""
|
|
|
|
user, password, host, port, path = RE.FILE_PATH.match(inputs).groups()
|
|
|
|
inputs = {
|
|
"user": user,
|
|
"password": password,
|
|
"host": host,
|
|
"port": int(port) if port else None,
|
|
"path": path
|
|
}
|
|
|
|
return (
|
|
SFTPFileDriver if "host" in inputs else
|
|
LocalFileDriver)(cxcv, Utils.get_value("path", inputs, "."), inputs) |