#!/usr/bin/env python # -*- coding: utf-8 -*- from inspect import FrameInfo from typing import Any, Optional from os.path import dirname as directory_name from os.path import abspath as path_absolute from os.path import exists as path_exists from threading import Thread from json import loads as json_decode from time import sleep import random from re import compile as RECompile from re import Pattern as REPattern from re import Match as REMatch from re import MULTILINE as RE_MULTILINE from time import time as timestamp from signal import SIGINT as SIGNAL_INTERRUPTION from signal import signal as signal_method from inspect import stack as get_stack from traceback import format_stack as trace_format_stack from traceback import extract_tb as extract_traceback import datetime from io import TextIOWrapper class PythonMapper: class Settings: random_wait_time_in_use:int = 500 project_process_timer:int = 30000 process_subtimer:int = 100 process_time:int = 1000 print_format:str = "[{type}] {yyyy}{mm}{dd} {hh}{ii}{ss}.{nnn} [{line}]{file}({method}): {message}" exception_format:int = " '{file}({method})[{line}]'{lines}\n\n{exception_message}" rest_time_per_process_file:int = 10 tabulation_character:str = " " class CodeBlock: def __init__(self): self.class_data:str = "" self.class_code:list[PythonMapper.CodeBlock] = [] self.methods:list[str] = [] self.static_variables:list[str] = [] self.object_variables:list[str] = [] self.constructor:str = "" class ClassData: def __init__(self, class_name:str, spaces:int) -> None: self.object_done:list[str] = [] self.static_done:list[str] = [] self.spaces:int = spaces self.class_name:str = class_name self.code:PythonMapper.CodeBlock = PythonMapper.CodeBlock() class SearchData: def __init__(self) -> None: self.levels:list[PythonMapper.ClassData] = [PythonMapper.ClassData("ROOT", 0)] self.i:int = 0 class Project: methods:dict[str, REPattern] = { "class_item" : RECompile(r'^([ \t]*)class[ \t]+([a-zA-Z0-9_]+)(#[^\:]+)?(?=\:)', RE_MULTILINE), "method" : RECompile(r'^([ \t]*\@[^\r\n]+(\n|\r\n))*([ \t]*)def[ \t]+(_?[^_][a-zA-Z0-9_]*|__init__)(#[^\:]+)(?=\:)', RE_MULTILINE), "object_variable" : RECompile(r'^[ \t]+(self[ \t]*\.[ \t]*(_?[^_][a-zA-Z0-9_]*)([ \t]*\:[^\=\r\n]+)?)([ \t]*=|[\r\n]|$)', RE_MULTILINE), "static_variable" : RECompile(r'^([ \t]*)(_?[^_][a-zA-Z0-9]*)([ \t]*\:[^\r\n\=]+)?(?=[ \t]*\=)(?!\=)', RE_MULTILINE) } re_unclean_code:REPattern = RECompile(r'(? None: self.parent:PythonMapper = parent self.name:str = name self.timer:int = inputs["timer"] if "timer" in inputs else PythonMapper.Settings.project_process_timer self.rest_timer:int = ( inputs["rest_timer"] if "rest_timer" in inputs else inputs["rest"] if "rest" in inputs else PythonMapper.Settings.rest_time_per_process_file ) self.path:str = inputs["path"] self.files:tuple[str] = tuple(inputs["files"]) self.map:str = inputs["map"] self.__cache:dict[str, str] = {} self.in_use:bool = False self.__last_time:int = 0 self.__data:list[PythonMapper.CodeBlock|None] = [] self.dependences:list[str|list[str, str|list[str]]|list[str, str, str]|list[str]] = inputs["dependences"] if "dependences" in inputs else [] self.definitions:list[str] = inputs["definitions"] if "definitions" in inputs else [] self.tabulation:str = inputs["tabulation"] if "tabulation" in inputs else PythonMapper.Settings.tabulation_character if len(self.definitions): has_new_type:bool = False dependence:str|list[str, str|list[str]]|list[str, str, str]|list[str] for dependence in self.dependences: if isinstance(dependence, list) and len(dependence) > 1 and dependence[0] == "typing" and (dependence[1] == "NewType" if isinstance(dependence[1], str) else "NewType" in dependence[1]): has_new_type = True break if not has_new_type: self.dependences += [["typing", "NewType"]] def process(self) -> None: time:int = PythonMapper.time() if time - self.__last_time >= self.timer: self.__update_cache() and self.__process_cache_file() self.__last_time = time def __process_cache_level(self, block:Any, level:int) -> str: white_spaces:str = "" code:str = "" subblock:PythonMapper.CodeBlock method:str variable:str has_class:bool = block.class_data != "" block:PythonMapper.CodeBlock = block for _ in range(level - 1): white_spaces += self.tabulation if has_class: code += "\n\n" + white_spaces + block.class_data + ":" white_spaces += self.tabulation for subblock in block.class_code: code += self.__process_cache_level(subblock, level + 1) if len(block.static_variables): code += "\n" for variable in block.static_variables: code += "\n" + white_spaces + variable + " = None" if has_class: l:int = len(block.object_variables) if l or block.constructor: tabulation:str = white_spaces + self.tabulation code += "\n\n" + white_spaces + (block.constructor + ":" if block.constructor else "def __init__(self):") if l: for variable in block.object_variables: code += "\n" + tabulation + variable + " = None" else: code += "\n" + tabulation + "pass" for method in block.methods: code += "\n\n" + white_spaces + method + ":pass" return code def __process_cache_file(self) -> None: # b:PythonMapper.CodeBlock # for i, block in enumerate(self.__data): # print([i, [b.class_data for b in block.class_code]]) # return code:str = ( "#!/usr/bin/env python\n" + "# -*- coding: utf-8 -*-" ) block:PythonMapper.CodeBlock if len(self.dependences): dependence:str|list[str, str|list[str]]|list[str, str, str]|list[str] code += "\n" for dependence in self.dependences: if isinstance(dependence, str): if " " in dependence: code += "\n" + dependence continue dependence = [dependence] l:int = len(dependence) if l == 1: code += "\nimport " + (dependence[0] if isinstance(dependence[0], str) else ", ".join(dependence[0])) elif l == 2: code += "\nfrom " + dependence[0] + " import " + (dependence[1] if isinstance(dependence[1], str) else ", ".join(dependence[1])) elif l == 3: code += "\nfrom " + dependence[0] + " import " + dependence[1] + " as " + dependence[2] if len(self.definitions): definition:str code += "\n" for definition in self.definitions: code += "\n" + (definition if "=" in definition or " " in definition else definition + " = NewType(\"" + definition + "\", Any)") for block in self.__data: code += self.__process_cache_level(block, 0) self.parent.save_file(self.path + '/' + self.map, code) def __update_cache(self) -> bool: i:int file:str has_changes:bool = False for i, file in enumerate(self.files): if not self.parent._working: break error:int data:int path:str = self.path + '/' + file data, error = self.parent.load_file(path) if not error and (path not in self.__cache or data != self.__cache[path]): self.__cache[path] = data self.__process_code(path, data, i) if not has_changes: has_changes = True sleep(self.rest_timer / 1000.0) return has_changes @staticmethod def unclean_code(code:str) -> tuple[str, tuple[str]]: blocks:tuple[str] = tuple() i:int = 0 while True: matches:REMatch = PythonMapper.Project.re_unclean_code.search(code) if not matches: break position:tuple[int, int] = matches.span() blocks += (code[position[0]:position[1]],) code = code[:position[0]] + "###UNCLEAN"+ str(i) + "KEY###" + code[position[1]:] i += 1 return (code, blocks) @staticmethod def clean_code(code:str, blocks:tuple[str]) -> str: while True: matches:REMatch = PythonMapper.Project.re_clean_code.search(code) if not matches: break position:tuple[int, int] = matches.span() code = code[:position[0]] + blocks[int(matches.group(1))] + code[position[1]:] return code @staticmethod def get_group(code:str, pattern:REPattern) -> list[REMatch|None, list[int, int], bool]: matches:REMatch|None = pattern.search(code) position:int = list(matches.span()) if matches else [-1, -1] ok:bool = matches != None and position[0] >= 0 return [matches, position, ok] def __process_code(self, path:str, code:str, k:int) -> None: blocks:tuple[str] l:int = len(code) sets:dict[str, list[REMatch|None, int, bool]] = {} data:PythonMapper.SearchData = PythonMapper.SearchData() code, blocks = PythonMapper.Project.unclean_code(code) self.parent._print("info", "The code with '{length}' characters from file '{path}' was uncleaned with '{l}' items.", { "length" : l, "path" : path, "l" : len(blocks) }) while True: selected:str = "" i:int = -1 key:str pattern:REPattern has_next:bool = False matches:REMatch ok:bool span:list[int, int] position:list[int, int] for key, pattern in PythonMapper.Project.methods.items(): if key not in sets: sets[key] = PythonMapper.Project.get_group(code, pattern) matches, span, ok = sets[key] if not ok: continue if span[0] < 0: matches, span, ok = sets[key] = PythonMapper.Project.get_group(code, pattern) if ok and (i < 0 or span[0] < i): i = span[0] selected = key if not has_next: has_next = True if not has_next: break matches, span, ok = sets[selected] getattr(PythonMapper.Project, selected)(code, PythonMapper.Project.clean_code(code[span[0]:span[1]], blocks).strip(), data, matches) # getattr(PythonMapper.Project, selected)(code, code[span[0]:span[1]], data, matches) code = code[span[1]:] if not code: break for key, (matches, position, ok) in sets.items(): if ok: sets[key][1] = [i - span[1] for i in position] if k == len(self.__data): self.__data += [None] while data.i: data.i -= 1 data.levels[data.i].code.class_code += [data.levels[data.i + 1].code] self.__data[k] = data.levels[0].code @staticmethod def set_level(data:Any, spaces:int) -> bool: data:PythonMapper.SearchData = data if spaces > data.levels[data.i].spaces: if data.i and data.levels[data.i].spaces == data.levels[data.i - 1].spaces: data.levels[data.i].spaces = spaces else: i:int = len(data.levels) while data.levels[data.i].spaces > spaces: data.i -= 1 data.levels[data.i].code.class_code += [data.levels[data.i + 1].code] i -= 1 data.levels = data.levels[:i] return data.levels[data.i].spaces == spaces @classmethod def class_item(self, code:str, fragment:str, data:Any, matches:REMatch): data:PythonMapper.SearchData = data if self.set_level(data, len(matches.group(1))): data.levels += [PythonMapper.ClassData(matches.group(2), len(matches.group(1)))] data.i += 1 data.levels[data.i].code.class_data = fragment # print(["class_item", fragment]) @classmethod def method(self, code:str, fragment:str, data:Any, matches:REMatch): data:PythonMapper.SearchData = data name:str = matches.group(4) if self.set_level(data, len(matches.group(3))): if name == "__init__" and not data.levels[data.i].code.constructor and "@" not in fragment[:fragment.index("__init__")]: data.levels[data.i].code.constructor = fragment else: data.levels[data.i].code.methods += [fragment] # print(["method", fragment]) @classmethod def object_variable(self, code:str, fragment:str, data:Any, matches:REMatch): name:str = matches.group(1) data:PythonMapper.SearchData = data if name in data.levels[data.i].object_done: return data.levels[data.i].object_done += [name] fragment = (fragment[:-1] if fragment[-1] == "=" else fragment).strip() data.levels[data.i].code.object_variables += [fragment] # print(["object_variable", fragment]) @classmethod def static_variable(self, code:str, fragment:str, data:Any, matches:REMatch): name:str = matches.group(2) data:PythonMapper.SearchData = data if not self.set_level(data, len(matches.group(1))) or name in data.levels[data.i].static_done: return data.levels[data.i].static_done += [name] data.levels[data.i].code.static_variables += [fragment] # print(["static_variable", data.i, name, len(matches.group(1)), fragment]) # print(["static_variable", fragment]) re_string_variables:REPattern = RECompile(r'\{([^\{\}]+)\}') re_break_lines:REPattern = RECompile(r'\r\n|[\r\n]') re_exception_line:REPattern = RECompile(r'^\s*File "([^"]+)", line ([0-9]+), in ([^\n]+)(.*|[\r\n]*)*$') def __init__(self, inputs:dict[str, Any|None]) -> None: key:str self.__inputs:dict[str, Any|None] = inputs self.__projects:dict[str, PythonMapper.Project] = {} self.__thread:Thread = Thread(target = self.__process_projects) self.__root_path:str = path_absolute(directory_name(__file__)) self.__root_path_l:int = len(self.__root_path) + 1 self.__files_projects:tuple[str] = (self.__root_path + "/../JSON/PythonMapper.py.projects.secrets.json",) self._working:bool = True self.__last_process_try:int = 0 signal_method(SIGNAL_INTERRUPTION, self.signal_handler) for key in ("projects_files", "projects_file", "project_file"): if key in inputs: if isinstance(inputs[key], str): self.__files_projects += (inputs[key],) elif isinstance(inputs[key], list): self.__files_projects += tuple(inputs[key]) elif isinstance(inputs[key], tuple): self.__files_projects += inputs[key] self.update_projects() self.__thread.start() # print(self.__projects) def signal_handler(self, signal_id:int, frame:Any) -> None: self.stop() print("The PythonMapper was stopped successfully.") def stop(self) -> None: if self._working: self._working = False @staticmethod def string_variables(string:str, variables:Optional[dict[str, Any|None]] = {}) -> str: def callback(matches:REMatch) -> str: key:str = matches.group(1) return str(variables[key]) if key in variables else matches.group(0) return PythonMapper.re_string_variables.sub(callback, string) def get_action_data(self, i:int = 1) -> dict[str, str|int]: stack:FrameInfo = get_stack()[1 + i] return { "file" : stack.filename[self.__root_path_l:], "method" : stack.function, "line" : stack.lineno } def _print(self, _type:str, message:str, variables:Optional[dict[str, Any|None]] = None, i:Optional[int] = 1) -> None: type_l:int = len(_type) date:datetime.datetime = datetime.datetime.now() own:dict[str, Any|None] = { "type" : " "[int(type_l / 2):] + _type[0:4].upper() + " "[int((type_l + 1) / 2):], **self.get_action_data(i) } key:str for key in ("year", "month", "day", "hour", "minute", "second"): k:str = "i" if key == "minute" else key[0] own[key] = own[k] = getattr(date, key) % 100 own[k + k] = ("00" + str(own[k]))[-2:] own["year"] = own["yyyy"] = date.year own["milliseconds"] = own["n"] = int(date.microsecond / 1000) own["nnn"] = ("000" + str(own["n"]))[-3:] variables = {**own, **variables} variables["message"] = PythonMapper.string_variables(message + ( PythonMapper.Settings.exception_format if variables["type"] == "EXCE" else "{list}" if variables["type"] == "ERRO" else ""), variables) print(PythonMapper.string_variables(PythonMapper.Settings.print_format, variables)) def validate(self, error:int, messages:Optional[tuple[str]] = tuple(), variables:Optional[dict[str, Any|None]] = {}, error_message:Optional[str|None] = None, ok_message:Optional[str] = None ): if error: if error_message: list_messages:str = "" i:int = 0 l:int = len(messages) while 1 << i <= error: if error & 1 << i: list_messages += PythonMapper.string_variables("\n - [{i}] {message}", { "i" : i, "message" : PythonMapper.string_variables(messages[i], variables) if i < l else "error_message_" + str(i) }) i += 1 self._print("error", error_message, { **variables, "list" : list_messages, "code" : error }) return False ok_message and self._print("ok", ok_message, variables) return True def exception(self, exception:Exception, message:Optional[str|list|tuple] = None, variables:Optional[dict[str, Any]|list|tuple] = None, i:Optional[int] = 1 ) -> None: lines:list[str]|None = extract_traceback(exception.__traceback__).format() line_matches:REMatch[str]|None = self.re_exception_line.match(lines[-1]) key:str value:Any|None block:str|None data:dict[str, Any|None] = { **{key : value for subset in (variables if isinstance(variables, (list, tuple)) else (variables,)) for key, value in (subset if isinstance(subset, dict) else {}).items()}, **self.get_action_data(1), "lines" : "", "exception_message" : str(exception), "method" : line_matches.group(3), "line" : line_matches.group(2), "file" : line_matches.group(1)[self.__root_path_l:] } for block in trace_format_stack()[:-2] + lines: if block: data["lines"] += "\n " + self.re_break_lines.split(block.strip())[0] data["end"] = PythonMapper.string_variables(PythonMapper.Settings.exception_format, data) message and self._print("exception", message, data, i + 1) def load_file(self, path:str) -> tuple[str|None, int]: error:int = ( 1 << 0 if path == None else 1 << 1 if not isinstance(path, str) else 1 << 2 if not path else 1 << 3 if not path_exists(path) else 0) << 0 data:str|None = None if not error: opened:TextIOWrapper[Any] try: with open(path, 'r') as opened: data = opened.read() except Exception as exception: error |= 1 << 0 self.exception(exception, "There was an exception trying load file '{path}'.", { "path" : path[self.__root_path_l:] if self.__root_path in path else path }) self.validate( error, ( "exception", "path_null", "path_not_string", "path_empty", "path_not_exists" ), { "path" : path[self.__root_path_l:] if self.__root_path in path else path }, "There was any error with code '{code}' trying load the file '{path}'.", "The file '{path}' was loaded successfully." ) return data, error def save_file(self, path:str, content:str) -> int: error:int = ( (( 1 << 0 if path == None else 1 << 1 if not isinstance(path, str) else 1 << 2 if not path else # 1 << 3 if not path_exists(path) else 0) << 0) | (( 1 << 0 if content == None else 1 << 1 if not isinstance(content, str) else # 1 << 2 if not content else 0) << 4) | 0) << 1 if not error: opened:TextIOWrapper[Any] try: with open(path, "w") as opened: opened.write(content) except Exception as exception: error |= 1 << 0 self.exception(exception, "There was an exception trying save file '{path}' with {bytes} bytes of String data.", { "path" : path, "bytes" : len(content) }) self.validate( error, ( "exception", "path_null", "path_not_string", "path_empty", "path_not_exists" "content_null", "content_not_string", "content_empty" ), { "path" : path, "bytes" : len(content) if isinstance(content, str) else -1 }, "There was any error with code '{code}' trying save file '{path}' with {bytes} bytes of String data.", "The file '{path}' was write with {bytes} bytes of String data successfully." ) return error def __load_projects_file(self, path:str) -> int: error:int data:str|None data, error = self.load_file(path) error <<= 3 if data: try: project_key:str project_data:dict[str, str|list[str]] projects:dict[str, dict[str, str|list[str]]] = json_decode(data) for project_key, project_data in projects.items(): if project_key in self.__projects: while self.__projects[project_key]["in_use"]: if not self._working: error |= 1 << 1 break sleep((random.random() * PythonMapper.Settings.random_wait_time_in_use) / 1000.0) if self._working: del self.__projects[project_key] if not self._working: error |= 1 << 1 break try: self.__projects[project_key] = PythonMapper.Project(self, project_key, project_data) except Exception as exception: error |= 1 << 2 self.exception(exception, "There was an exception trying load the project '{key}' from '{path}'.", { "path" : path[self.__root_path_l:] if self.__root_path in path else path, "key" : project_key }) if project_key in self.__projects: del self.__projects[project_key] except Exception as exception: error |= 1 << 0 self.exception(exception, "There was an exception trying load the projects from file '{path}'.", { "path" : path[self.__root_path_l:] if self.__root_path in path else path }) self.validate( error, ( "exception", "not_working", "project_building_exception", "load_exception", "path_null", "path_not_string", "path_empty", "path_not_exists" ), { "path" : path[self.__root_path_l:] if self.__root_path in path else path }, "There was any error with code '{code}' trying loading the projects of file '{path}'.", "The projects of file '{path}' was loaded successfully." ) return error def update_projects(self) -> int: error:int = 0 for file in self.__files_projects: if self.__load_projects_file(file): error |= 1 << 1 self.validate( error, ( "exception", "load_projects_errors" ), {}, "There was any error with code '{code}' trying update the projects.", "The projects was updated successfully." ) return error @staticmethod def time() -> int: return int(timestamp() * 1000) def __process_projects(self) -> None: while self._working: time:int = PythonMapper.time() if time - self.__last_process_try >= PythonMapper.Settings.process_time: project:PythonMapper.Project for project in self.__projects.values(): project.process() self.__last_process_try = time sleep(PythonMapper.Settings.process_subtimer / 1000.0)