780 lines
28 KiB
Python
780 lines
28 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from inspect import FrameInfo
|
|
from typing import Any, Optional
|
|
from os.path import dirname as directory_name
|
|
from os.path import abspath as path_absolute
|
|
from os.path import exists as path_exists
|
|
from threading import Thread
|
|
from json import loads as json_decode
|
|
from time import sleep
|
|
import random
|
|
from re import compile as RECompile
|
|
from re import Pattern as REPattern
|
|
from re import Match as REMatch
|
|
from re import MULTILINE as RE_MULTILINE
|
|
from time import time as timestamp
|
|
from signal import SIGINT as SIGNAL_INTERRUPTION
|
|
from signal import signal as signal_method
|
|
from inspect import stack as get_stack
|
|
from traceback import format_stack as trace_format_stack
|
|
from traceback import extract_tb as extract_traceback
|
|
import datetime
|
|
from io import TextIOWrapper
|
|
|
|
class PythonMapper:
|
|
|
|
class Settings:
|
|
random_wait_time_in_use:int = 500
|
|
project_process_timer:int = 30000
|
|
process_subtimer:int = 100
|
|
process_time:int = 1000
|
|
print_format:str = "[{type}] {yyyy}{mm}{dd} {hh}{ii}{ss}.{nnn} [{line}]{file}({method}): {message}"
|
|
exception_format:int = " '{file}({method})[{line}]'{lines}\n\n{exception_message}"
|
|
rest_time_per_process_file:int = 10
|
|
tabulation_character:str = " "
|
|
|
|
class CodeBlock:
|
|
def __init__(self):
|
|
self.class_data:str = ""
|
|
self.class_code:list[PythonMapper.CodeBlock] = []
|
|
self.methods:list[str] = []
|
|
self.static_variables:list[str] = []
|
|
self.object_variables:list[str] = []
|
|
self.constructor:str = ""
|
|
|
|
class ClassData:
|
|
def __init__(self, class_name:str, spaces:int) -> None:
|
|
self.object_done:list[str] = []
|
|
self.static_done:list[str] = []
|
|
self.spaces:int = spaces
|
|
self.class_name:str = class_name
|
|
self.code:PythonMapper.CodeBlock = PythonMapper.CodeBlock()
|
|
|
|
class SearchData:
|
|
def __init__(self) -> None:
|
|
self.levels:list[PythonMapper.ClassData] = [PythonMapper.ClassData("ROOT", 0)]
|
|
self.i:int = 0
|
|
|
|
class Project:
|
|
|
|
methods:dict[str, REPattern] = {
|
|
"class_item" : RECompile(r'^([ \t]*)class[ \t]+([a-zA-Z0-9_]+)(#[^\:]+)?(?=\:)', RE_MULTILINE),
|
|
"method" : RECompile(r'^([ \t]*\@[^\r\n]+(\n|\r\n))*([ \t]*)def[ \t]+(_?[^_][a-zA-Z0-9_]*|__init__)(#[^\:]+)(?=\:)', RE_MULTILINE),
|
|
"object_variable" : RECompile(r'^[ \t]+(self[ \t]*\.[ \t]*(_?[^_][a-zA-Z0-9_]*)([ \t]*\:[^\=\r\n]+)?)([ \t]*=|[\r\n]|$)', RE_MULTILINE),
|
|
"static_variable" : RECompile(r'^([ \t]*)(_?[^_][a-zA-Z0-9_]*)([ \t]*\:[^\r\n\=]+)?(?=[ \t]*\=)(?!\=)', RE_MULTILINE)
|
|
}
|
|
|
|
re_unclean_code:REPattern = RECompile(r'(?<!\\)("{3}((?!"{3})(.|[\r\n]))*"{3}|\'{3}((?!\'{3})(.|[\r\n]))*\'{3})|"((?!")(.|\\\\.))*"|\'((?!\')(.|\\\\.))*\'|\[((?![\[\]])(.|[\r\n]))*\]|\(((?![\(\)])(.|[\r\n]))*\)|\{((?![\{\}])(.|[\r\n]))*\}')
|
|
re_clean_code:REPattern = RECompile(r'\#{3}UNCLEAN([0-9]+)KEY#{3}')
|
|
|
|
def __init__(self, parent:Any, name:str, inputs:dict[str, str|list[str]|int]) -> None:
|
|
|
|
self.parent:PythonMapper = parent
|
|
self.name:str = name
|
|
self.timer:int = inputs["timer"] if "timer" in inputs else PythonMapper.Settings.project_process_timer
|
|
self.rest_timer:int = (
|
|
inputs["rest_timer"] if "rest_timer" in inputs else
|
|
inputs["rest"] if "rest" in inputs else
|
|
PythonMapper.Settings.rest_time_per_process_file
|
|
)
|
|
self.path:str = inputs["path"]
|
|
self.files:tuple[str] = tuple(inputs["files"])
|
|
self.map:str = inputs["map"]
|
|
self.__cache:dict[str, str] = {}
|
|
self.in_use:bool = False
|
|
self.__last_time:int = 0
|
|
self.__data:list[PythonMapper.CodeBlock|None] = []
|
|
self.dependences:list[str|list[str, str|list[str]]|list[str, str, str]|list[str]] = inputs["dependences"] if "dependences" in inputs else []
|
|
self.definitions:list[str] = inputs["definitions"] if "definitions" in inputs else []
|
|
self.tabulation:str = inputs["tabulation"] if "tabulation" in inputs else PythonMapper.Settings.tabulation_character
|
|
|
|
if len(self.definitions):
|
|
|
|
has_new_type:bool = False
|
|
dependence:str|list[str, str|list[str]]|list[str, str, str]|list[str]
|
|
|
|
for dependence in self.dependences:
|
|
if isinstance(dependence, list) and len(dependence) > 1 and dependence[0] == "typing" and (dependence[1] == "NewType" if isinstance(dependence[1], str) else "NewType" in dependence[1]):
|
|
has_new_type = True
|
|
break
|
|
|
|
if not has_new_type:
|
|
self.dependences += [["typing", "NewType"]]
|
|
|
|
def process(self) -> None:
|
|
|
|
time:int = PythonMapper.time()
|
|
|
|
if time - self.__last_time >= self.timer:
|
|
self.__update_cache() and self.__process_cache_file()
|
|
self.__last_time = time
|
|
|
|
def __process_cache_level(self, block:Any, level:int) -> str:
|
|
|
|
white_spaces:str = ""
|
|
code:str = ""
|
|
subblock:PythonMapper.CodeBlock
|
|
method:str
|
|
variable:str
|
|
has_class:bool = block.class_data != ""
|
|
|
|
block:PythonMapper.CodeBlock = block
|
|
|
|
for _ in range(level - 1):
|
|
white_spaces += self.tabulation
|
|
|
|
if has_class:
|
|
code += "\n\n" + white_spaces + block.class_data + ":"
|
|
white_spaces += self.tabulation
|
|
|
|
for subblock in block.class_code:
|
|
code += self.__process_cache_level(subblock, level + 1)
|
|
|
|
if len(block.static_variables):
|
|
code += "\n"
|
|
for variable in block.static_variables:
|
|
code += "\n" + white_spaces + variable + " = None"
|
|
|
|
if has_class:
|
|
|
|
l:int = len(block.object_variables)
|
|
|
|
if l or block.constructor:
|
|
|
|
tabulation:str = white_spaces + self.tabulation
|
|
|
|
code += "\n\n" + white_spaces + (block.constructor + ":" if block.constructor else "def __init__(self):")
|
|
|
|
if l:
|
|
for variable in block.object_variables:
|
|
code += "\n" + tabulation + variable + " = None"
|
|
else:
|
|
code += "\n" + tabulation + "pass"
|
|
|
|
for method in block.methods:
|
|
code += "\n\n" + white_spaces + method + ":pass"
|
|
|
|
return code
|
|
|
|
def __process_cache_file(self) -> None:
|
|
|
|
# b:PythonMapper.CodeBlock
|
|
|
|
# for i, block in enumerate(self.__data):
|
|
# print([i, [b.class_data for b in block.class_code]])
|
|
|
|
# return
|
|
|
|
code:str = (
|
|
"#!/usr/bin/env python\n" +
|
|
"# -*- coding: utf-8 -*-"
|
|
)
|
|
block:PythonMapper.CodeBlock
|
|
|
|
if len(self.dependences):
|
|
|
|
dependence:str|list[str, str|list[str]]|list[str, str, str]|list[str]
|
|
|
|
code += "\n"
|
|
|
|
for dependence in self.dependences:
|
|
if isinstance(dependence, str):
|
|
if " " in dependence:
|
|
code += "\n" + dependence
|
|
continue
|
|
dependence = [dependence]
|
|
|
|
l:int = len(dependence)
|
|
|
|
if l == 1:
|
|
code += "\nimport " + (dependence[0] if isinstance(dependence[0], str) else ", ".join(dependence[0]))
|
|
elif l == 2:
|
|
code += "\nfrom " + dependence[0] + " import " + (dependence[1] if isinstance(dependence[1], str) else ", ".join(dependence[1]))
|
|
elif l == 3:
|
|
code += "\nfrom " + dependence[0] + " import " + dependence[1] + " as " + dependence[2]
|
|
|
|
if len(self.definitions):
|
|
|
|
definition:str
|
|
|
|
code += "\n"
|
|
|
|
for definition in self.definitions:
|
|
code += "\n" + (definition if "=" in definition or " " in definition else definition + " = NewType(\"" + definition + "\", Any)")
|
|
|
|
for block in self.__data:
|
|
code += self.__process_cache_level(block, 0)
|
|
|
|
self.parent.save_file(self.path + '/' + self.map, code)
|
|
|
|
def __update_cache(self) -> bool:
|
|
|
|
i:int
|
|
file:str
|
|
has_changes:bool = False
|
|
|
|
for i, file in enumerate(self.files):
|
|
if not self.parent._working:
|
|
break
|
|
|
|
error:int
|
|
data:int
|
|
path:str = self.path + '/' + file
|
|
|
|
data, error = self.parent.load_file(path)
|
|
|
|
if not error and (path not in self.__cache or data != self.__cache[path]):
|
|
self.__cache[path] = data
|
|
self.__process_code(path, data, i)
|
|
if not has_changes:
|
|
has_changes = True
|
|
sleep(self.rest_timer / 1000.0)
|
|
|
|
return has_changes
|
|
|
|
@staticmethod
|
|
def unclean_code(code:str) -> tuple[str, tuple[str]]:
|
|
|
|
blocks:tuple[str] = tuple()
|
|
i:int = 0
|
|
|
|
while True:
|
|
|
|
matches:REMatch = PythonMapper.Project.re_unclean_code.search(code)
|
|
|
|
if not matches:
|
|
break
|
|
|
|
position:tuple[int, int] = matches.span()
|
|
|
|
blocks += (code[position[0]:position[1]],)
|
|
code = code[:position[0]] + "###UNCLEAN"+ str(i) + "KEY###" + code[position[1]:]
|
|
i += 1
|
|
|
|
return (code, blocks)
|
|
|
|
@staticmethod
|
|
def clean_code(code:str, blocks:tuple[str]) -> str:
|
|
|
|
while True:
|
|
|
|
matches:REMatch = PythonMapper.Project.re_clean_code.search(code)
|
|
|
|
if not matches:
|
|
break
|
|
|
|
position:tuple[int, int] = matches.span()
|
|
|
|
code = code[:position[0]] + blocks[int(matches.group(1))] + code[position[1]:]
|
|
|
|
return code
|
|
|
|
@staticmethod
|
|
def get_group(code:str, pattern:REPattern) -> list[REMatch|None, list[int, int], bool]:
|
|
|
|
matches:REMatch|None = pattern.search(code)
|
|
position:int = list(matches.span()) if matches else [-1, -1]
|
|
ok:bool = matches != None and position[0] >= 0
|
|
|
|
return [matches, position, ok]
|
|
|
|
def __process_code(self, path:str, code:str, k:int) -> None:
|
|
|
|
blocks:tuple[str]
|
|
l:int = len(code)
|
|
sets:dict[str, list[REMatch|None, int, bool]] = {}
|
|
data:PythonMapper.SearchData = PythonMapper.SearchData()
|
|
|
|
code, blocks = PythonMapper.Project.unclean_code(code)
|
|
|
|
self.parent._print("info", "The code with '{length}' characters from file '{path}' was uncleaned with '{l}' items.", {
|
|
"length" : l,
|
|
"path" : path,
|
|
"l" : len(blocks)
|
|
})
|
|
|
|
while True:
|
|
|
|
selected:str = ""
|
|
i:int = -1
|
|
key:str
|
|
pattern:REPattern
|
|
has_next:bool = False
|
|
matches:REMatch
|
|
ok:bool
|
|
span:list[int, int]
|
|
position:list[int, int]
|
|
|
|
for key, pattern in PythonMapper.Project.methods.items():
|
|
if key not in sets:
|
|
sets[key] = PythonMapper.Project.get_group(code, pattern)
|
|
|
|
matches, span, ok = sets[key]
|
|
|
|
if not ok:
|
|
continue
|
|
|
|
if span[0] < 0:
|
|
matches, span, ok = sets[key] = PythonMapper.Project.get_group(code, pattern)
|
|
|
|
if ok and (i < 0 or span[0] < i):
|
|
i = span[0]
|
|
selected = key
|
|
if not has_next:
|
|
has_next = True
|
|
|
|
if not has_next:
|
|
break
|
|
|
|
matches, span, ok = sets[selected]
|
|
|
|
getattr(PythonMapper.Project, selected)(code, PythonMapper.Project.clean_code(code[span[0]:span[1]], blocks).strip(), data, matches)
|
|
# getattr(PythonMapper.Project, selected)(code, code[span[0]:span[1]], data, matches)
|
|
|
|
code = code[span[1]:]
|
|
|
|
if not code:
|
|
break
|
|
|
|
for key, (matches, position, ok) in sets.items():
|
|
if ok:
|
|
sets[key][1] = [i - span[1] for i in position]
|
|
|
|
if k == len(self.__data):
|
|
self.__data += [None]
|
|
while data.i:
|
|
data.i -= 1
|
|
data.levels[data.i].code.class_code += [data.levels[data.i + 1].code]
|
|
self.__data[k] = data.levels[0].code
|
|
|
|
@staticmethod
|
|
def set_level(data:Any, spaces:int) -> bool:
|
|
|
|
data:PythonMapper.SearchData = data
|
|
|
|
if spaces > data.levels[data.i].spaces:
|
|
if data.i and data.levels[data.i].spaces == data.levels[data.i - 1].spaces:
|
|
data.levels[data.i].spaces = spaces
|
|
else:
|
|
|
|
i:int = len(data.levels)
|
|
|
|
while data.levels[data.i].spaces > spaces:
|
|
data.i -= 1
|
|
data.levels[data.i].code.class_code += [data.levels[data.i + 1].code]
|
|
i -= 1
|
|
|
|
data.levels = data.levels[:i]
|
|
|
|
return data.levels[data.i].spaces == spaces
|
|
|
|
@classmethod
|
|
def class_item(self, code:str, fragment:str, data:Any, matches:REMatch):
|
|
|
|
data:PythonMapper.SearchData = data
|
|
|
|
if self.set_level(data, len(matches.group(1))):
|
|
data.levels += [PythonMapper.ClassData(matches.group(2), len(matches.group(1)))]
|
|
data.i += 1
|
|
data.levels[data.i].code.class_data = fragment
|
|
# print(["class_item", fragment])
|
|
|
|
@classmethod
|
|
def method(self, code:str, fragment:str, data:Any, matches:REMatch):
|
|
|
|
data:PythonMapper.SearchData = data
|
|
name:str = matches.group(4)
|
|
|
|
if self.set_level(data, len(matches.group(3))):
|
|
if name == "__init__" and not data.levels[data.i].code.constructor and "@" not in fragment[:fragment.index("__init__")]:
|
|
data.levels[data.i].code.constructor = fragment
|
|
else:
|
|
data.levels[data.i].code.methods += [fragment]
|
|
# print(["method", fragment])
|
|
|
|
@classmethod
|
|
def object_variable(self, code:str, fragment:str, data:Any, matches:REMatch):
|
|
|
|
name:str = matches.group(1)
|
|
data:PythonMapper.SearchData = data
|
|
|
|
if name in data.levels[data.i].object_done:
|
|
return
|
|
data.levels[data.i].object_done += [name]
|
|
|
|
fragment = (fragment[:-1] if fragment[-1] == "=" else fragment).strip()
|
|
data.levels[data.i].code.object_variables += [fragment]
|
|
|
|
# print(["object_variable", fragment])
|
|
|
|
@classmethod
|
|
def static_variable(self, code:str, fragment:str, data:Any, matches:REMatch):
|
|
|
|
name:str = matches.group(2)
|
|
data:PythonMapper.SearchData = data
|
|
|
|
if not self.set_level(data, len(matches.group(1))) or name in data.levels[data.i].static_done:
|
|
return
|
|
data.levels[data.i].static_done += [name]
|
|
|
|
data.levels[data.i].code.static_variables += [fragment]
|
|
# print(["static_variable", data.i, name, len(matches.group(1)), fragment])
|
|
# print(["static_variable", fragment])
|
|
|
|
re_string_variables:REPattern = RECompile(r'\{([^\{\}]+)\}')
|
|
re_break_lines:REPattern = RECompile(r'\r\n|[\r\n]')
|
|
re_exception_line:REPattern = RECompile(r'^\s*File "([^"]+)", line ([0-9]+), in ([^\n]+)(.*|[\r\n]*)*$')
|
|
|
|
def __init__(self, inputs:dict[str, Any|None]) -> None:
|
|
|
|
key:str
|
|
|
|
self.__inputs:dict[str, Any|None] = inputs
|
|
self.__projects:dict[str, PythonMapper.Project] = {}
|
|
self.__thread:Thread = Thread(target = self.__process_projects)
|
|
self.__root_path:str = path_absolute(directory_name(__file__))
|
|
self.__root_path_l:int = len(self.__root_path) + 1
|
|
self.__files_projects:tuple[str] = (self.__root_path + "/../JSON/PythonMapper.py.projects.secrets.json",)
|
|
self._working:bool = True
|
|
self.__last_process_try:int = 0
|
|
|
|
signal_method(SIGNAL_INTERRUPTION, self.signal_handler)
|
|
|
|
for key in ("projects_files", "projects_file", "project_file"):
|
|
if key in inputs:
|
|
if isinstance(inputs[key], str):
|
|
self.__files_projects += (inputs[key],)
|
|
elif isinstance(inputs[key], list):
|
|
self.__files_projects += tuple(inputs[key])
|
|
elif isinstance(inputs[key], tuple):
|
|
self.__files_projects += inputs[key]
|
|
|
|
self.update_projects()
|
|
|
|
self.__thread.start()
|
|
|
|
# print(self.__projects)
|
|
|
|
def signal_handler(self, signal_id:int, frame:Any) -> None:
|
|
self.stop()
|
|
print("The PythonMapper was stopped successfully.")
|
|
|
|
def stop(self) -> None:
|
|
if self._working:
|
|
self._working = False
|
|
|
|
@staticmethod
|
|
def string_variables(string:str, variables:Optional[dict[str, Any|None]] = {}) -> str:
|
|
|
|
def callback(matches:REMatch) -> str:
|
|
|
|
key:str = matches.group(1)
|
|
|
|
return str(variables[key]) if key in variables else matches.group(0)
|
|
|
|
return PythonMapper.re_string_variables.sub(callback, string)
|
|
|
|
def get_action_data(self, i:int = 1) -> dict[str, str|int]:
|
|
|
|
stack:FrameInfo = get_stack()[1 + i]
|
|
|
|
return {
|
|
"file" : stack.filename[self.__root_path_l:],
|
|
"method" : stack.function,
|
|
"line" : stack.lineno
|
|
}
|
|
|
|
def _print(self, _type:str, message:str, variables:Optional[dict[str, Any|None]] = None, i:Optional[int] = 1) -> None:
|
|
|
|
type_l:int = len(_type)
|
|
date:datetime.datetime = datetime.datetime.now()
|
|
own:dict[str, Any|None] = {
|
|
"type" : " "[int(type_l / 2):] + _type[0:4].upper() + " "[int((type_l + 1) / 2):],
|
|
**self.get_action_data(i)
|
|
}
|
|
key:str
|
|
|
|
for key in ("year", "month", "day", "hour", "minute", "second"):
|
|
|
|
k:str = "i" if key == "minute" else key[0]
|
|
|
|
own[key] = own[k] = getattr(date, key) % 100
|
|
own[k + k] = ("00" + str(own[k]))[-2:]
|
|
|
|
own["year"] = own["yyyy"] = date.year
|
|
own["milliseconds"] = own["n"] = int(date.microsecond / 1000)
|
|
own["nnn"] = ("000" + str(own["n"]))[-3:]
|
|
|
|
variables = {**own, **variables}
|
|
variables["message"] = PythonMapper.string_variables(message + (
|
|
PythonMapper.Settings.exception_format if variables["type"] == "EXCE" else
|
|
"{list}" if variables["type"] == "ERRO" else
|
|
""), variables)
|
|
|
|
print(PythonMapper.string_variables(PythonMapper.Settings.print_format, variables))
|
|
|
|
def validate(self,
|
|
error:int,
|
|
messages:Optional[tuple[str]] = tuple(),
|
|
variables:Optional[dict[str, Any|None]] = {},
|
|
error_message:Optional[str|None] = None,
|
|
ok_message:Optional[str] = None
|
|
):
|
|
if error:
|
|
if error_message:
|
|
|
|
list_messages:str = ""
|
|
i:int = 0
|
|
l:int = len(messages)
|
|
|
|
while 1 << i <= error:
|
|
if error & 1 << i:
|
|
list_messages += PythonMapper.string_variables("\n - [{i}] {message}", {
|
|
"i" : i,
|
|
"message" : PythonMapper.string_variables(messages[i], variables) if i < l else "error_message_" + str(i)
|
|
})
|
|
i += 1
|
|
|
|
self._print("error", error_message, {
|
|
**variables,
|
|
"list" : list_messages,
|
|
"code" : error
|
|
})
|
|
|
|
return False
|
|
|
|
ok_message and self._print("ok", ok_message, variables)
|
|
|
|
return True
|
|
|
|
def exception(self,
|
|
exception:Exception,
|
|
message:Optional[str|list|tuple] = None,
|
|
variables:Optional[dict[str, Any]|list|tuple] = None,
|
|
i:Optional[int] = 1
|
|
) -> None:
|
|
|
|
lines:list[str]|None = extract_traceback(exception.__traceback__).format()
|
|
line_matches:REMatch[str]|None = self.re_exception_line.match(lines[-1])
|
|
key:str
|
|
value:Any|None
|
|
block:str|None
|
|
data:dict[str, Any|None] = {
|
|
**{key : value for subset in (variables if isinstance(variables, (list, tuple)) else (variables,)) for key, value in (subset if isinstance(subset, dict) else {}).items()},
|
|
**self.get_action_data(1),
|
|
"lines" : "",
|
|
"exception_message" : str(exception),
|
|
"method" : line_matches.group(3),
|
|
"line" : line_matches.group(2),
|
|
"file" : line_matches.group(1)[self.__root_path_l:]
|
|
}
|
|
|
|
for block in trace_format_stack()[:-2] + lines:
|
|
if block:
|
|
data["lines"] += "\n " + self.re_break_lines.split(block.strip())[0]
|
|
|
|
data["end"] = PythonMapper.string_variables(PythonMapper.Settings.exception_format, data)
|
|
|
|
message and self._print("exception", message, data, i + 1)
|
|
|
|
def load_file(self, path:str) -> tuple[str|None, int]:
|
|
|
|
error:int = (
|
|
1 << 0 if path == None else
|
|
1 << 1 if not isinstance(path, str) else
|
|
1 << 2 if not path else
|
|
1 << 3 if not path_exists(path) else
|
|
0) << 0
|
|
data:str|None = None
|
|
|
|
if not error:
|
|
|
|
opened:TextIOWrapper[Any]
|
|
|
|
try:
|
|
with open(path, 'r') as opened:
|
|
data = opened.read()
|
|
except Exception as exception:
|
|
error |= 1 << 0
|
|
self.exception(exception, "There was an exception trying load file '{path}'.", {
|
|
"path" : path[self.__root_path_l:] if self.__root_path in path else path
|
|
})
|
|
|
|
self.validate(
|
|
error,
|
|
(
|
|
"exception",
|
|
"path_null",
|
|
"path_not_string",
|
|
"path_empty",
|
|
"path_not_exists"
|
|
),
|
|
{
|
|
"path" : path[self.__root_path_l:] if self.__root_path in path else path
|
|
},
|
|
"There was any error with code '{code}' trying load the file '{path}'.",
|
|
"The file '{path}' was loaded successfully."
|
|
)
|
|
|
|
return data, error
|
|
|
|
def save_file(self, path:str, content:str) -> int:
|
|
|
|
error:int = (
|
|
((
|
|
1 << 0 if path == None else
|
|
1 << 1 if not isinstance(path, str) else
|
|
1 << 2 if not path else
|
|
# 1 << 3 if not path_exists(path) else
|
|
0) << 0) |
|
|
((
|
|
1 << 0 if content == None else
|
|
1 << 1 if not isinstance(content, str) else
|
|
# 1 << 2 if not content else
|
|
0) << 4) |
|
|
0) << 1
|
|
|
|
if not error:
|
|
|
|
opened:TextIOWrapper[Any]
|
|
|
|
try:
|
|
with open(path, "w") as opened:
|
|
opened.write(content)
|
|
except Exception as exception:
|
|
error |= 1 << 0
|
|
self.exception(exception, "There was an exception trying save file '{path}' with {bytes} bytes of String data.", {
|
|
"path" : path,
|
|
"bytes" : len(content)
|
|
})
|
|
|
|
self.validate(
|
|
error,
|
|
(
|
|
"exception",
|
|
"path_null",
|
|
"path_not_string",
|
|
"path_empty",
|
|
"path_not_exists"
|
|
"content_null",
|
|
"content_not_string",
|
|
"content_empty"
|
|
),
|
|
{
|
|
"path" : path,
|
|
"bytes" : len(content) if isinstance(content, str) else -1
|
|
},
|
|
"There was any error with code '{code}' trying save file '{path}' with {bytes} bytes of String data.",
|
|
"The file '{path}' was write with {bytes} bytes of String data successfully."
|
|
)
|
|
|
|
return error
|
|
|
|
def __load_projects_file(self, path:str) -> int:
|
|
|
|
error:int
|
|
data:str|None
|
|
|
|
data, error = self.load_file(path)
|
|
|
|
error <<= 3
|
|
|
|
if data:
|
|
try:
|
|
|
|
project_key:str
|
|
project_data:dict[str, str|list[str]]
|
|
projects:dict[str, dict[str, str|list[str]]] = json_decode(data)
|
|
|
|
for project_key, project_data in projects.items():
|
|
if project_key in self.__projects:
|
|
while self.__projects[project_key]["in_use"]:
|
|
if not self._working:
|
|
error |= 1 << 1
|
|
break
|
|
sleep((random.random() * PythonMapper.Settings.random_wait_time_in_use) / 1000.0)
|
|
if self._working:
|
|
del self.__projects[project_key]
|
|
if not self._working:
|
|
error |= 1 << 1
|
|
break
|
|
try:
|
|
self.__projects[project_key] = PythonMapper.Project(self, project_key, project_data)
|
|
except Exception as exception:
|
|
error |= 1 << 2
|
|
self.exception(exception, "There was an exception trying load the project '{key}' from '{path}'.", {
|
|
"path" : path[self.__root_path_l:] if self.__root_path in path else path,
|
|
"key" : project_key
|
|
})
|
|
if project_key in self.__projects:
|
|
del self.__projects[project_key]
|
|
|
|
except Exception as exception:
|
|
error |= 1 << 0
|
|
self.exception(exception, "There was an exception trying load the projects from file '{path}'.", {
|
|
"path" : path[self.__root_path_l:] if self.__root_path in path else path
|
|
})
|
|
|
|
self.validate(
|
|
error,
|
|
(
|
|
"exception",
|
|
"not_working",
|
|
"project_building_exception",
|
|
"load_exception",
|
|
"path_null",
|
|
"path_not_string",
|
|
"path_empty",
|
|
"path_not_exists"
|
|
),
|
|
{
|
|
"path" : path[self.__root_path_l:] if self.__root_path in path else path
|
|
},
|
|
"There was any error with code '{code}' trying loading the projects of file '{path}'.",
|
|
"The projects of file '{path}' was loaded successfully."
|
|
)
|
|
|
|
return error
|
|
|
|
def update_projects(self) -> int:
|
|
|
|
error:int = 0
|
|
|
|
for file in self.__files_projects:
|
|
if self.__load_projects_file(file):
|
|
error |= 1 << 1
|
|
|
|
self.validate(
|
|
error,
|
|
(
|
|
"exception",
|
|
"load_projects_errors"
|
|
),
|
|
{},
|
|
"There was any error with code '{code}' trying update the projects.",
|
|
"The projects was updated successfully."
|
|
)
|
|
|
|
return error
|
|
|
|
@staticmethod
|
|
def time() -> int:
|
|
return int(timestamp() * 1000)
|
|
|
|
def __process_projects(self) -> None:
|
|
while self._working:
|
|
|
|
time:int = PythonMapper.time()
|
|
|
|
if time - self.__last_process_try >= PythonMapper.Settings.process_time:
|
|
|
|
project:PythonMapper.Project
|
|
|
|
for project in self.__projects.values():
|
|
project.process()
|
|
|
|
self.__last_process_try = time
|
|
|
|
sleep(PythonMapper.Settings.process_subtimer / 1000.0) |