PythonMapper/Python/PythonMapper.py

780 lines
28 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from inspect import FrameInfo
from typing import Any, Optional
from os.path import dirname as directory_name
from os.path import abspath as path_absolute
from os.path import exists as path_exists
from threading import Thread
from json import loads as json_decode
from time import sleep
import random
from re import compile as RECompile
from re import Pattern as REPattern
from re import Match as REMatch
from re import MULTILINE as RE_MULTILINE
from time import time as timestamp
from signal import SIGINT as SIGNAL_INTERRUPTION
from signal import signal as signal_method
from inspect import stack as get_stack
from traceback import format_stack as trace_format_stack
from traceback import extract_tb as extract_traceback
import datetime
from io import TextIOWrapper
class PythonMapper:
class Settings:
random_wait_time_in_use:int = 500
project_process_timer:int = 30000
process_subtimer:int = 100
process_time:int = 1000
print_format:str = "[{type}] {yyyy}{mm}{dd} {hh}{ii}{ss}.{nnn} [{line}]{file}({method}): {message}"
exception_format:int = " '{file}({method})[{line}]'{lines}\n\n{exception_message}"
rest_time_per_process_file:int = 10
tabulation_character:str = " "
class CodeBlock:
def __init__(self):
self.class_data:str = ""
self.class_code:list[PythonMapper.CodeBlock] = []
self.methods:list[str] = []
self.static_variables:list[str] = []
self.object_variables:list[str] = []
self.constructor:str = ""
class ClassData:
def __init__(self, class_name:str, spaces:int) -> None:
self.object_done:list[str] = []
self.static_done:list[str] = []
self.spaces:int = spaces
self.class_name:str = class_name
self.code:PythonMapper.CodeBlock = PythonMapper.CodeBlock()
class SearchData:
def __init__(self) -> None:
self.levels:list[PythonMapper.ClassData] = [PythonMapper.ClassData("ROOT", 0)]
self.i:int = 0
class Project:
methods:dict[str, REPattern] = {
"class_item" : RECompile(r'^([ \t]*)class[ \t]+([a-zA-Z0-9_]+)(#[^\:]+)?(?=\:)', RE_MULTILINE),
"method" : RECompile(r'^([ \t]*\@[^\r\n]+(\n|\r\n))*([ \t]*)def[ \t]+(_?[^_][a-zA-Z0-9_]*|__init__)(#[^\:]+)(?=\:)', RE_MULTILINE),
"object_variable" : RECompile(r'^[ \t]+(self[ \t]*\.[ \t]*(_?[^_][a-zA-Z0-9_]*)([ \t]*\:[^\=\r\n]+)?)([ \t]*=|[\r\n]|$)', RE_MULTILINE),
"static_variable" : RECompile(r'^([ \t]*)(_?[^_][a-zA-Z0-9]*)([ \t]*\:[^\r\n\=]+)?(?=[ \t]*\=)(?!\=)', RE_MULTILINE)
}
re_unclean_code:REPattern = RECompile(r'(?<!\\)("{3}((?!"{3})(.|[\r\n]))*"{3}|\'{3}((?!\'{3})(.|[\r\n]))*\'{3})|"((?!")(.|\\\\.))*"|\'((?!\')(.|\\\\.))*\'|\[((?![\[\]])(.|[\r\n]))*\]|\(((?![\(\)])(.|[\r\n]))*\)|\{((?![\{\}])(.|[\r\n]))*\}')
re_clean_code:REPattern = RECompile(r'\#{3}UNCLEAN([0-9]+)KEY#{3}')
def __init__(self, parent:Any, name:str, inputs:dict[str, str|list[str]|int]) -> None:
self.parent:PythonMapper = parent
self.name:str = name
self.timer:int = inputs["timer"] if "timer" in inputs else PythonMapper.Settings.project_process_timer
self.rest_timer:int = (
inputs["rest_timer"] if "rest_timer" in inputs else
inputs["rest"] if "rest" in inputs else
PythonMapper.Settings.rest_time_per_process_file
)
self.path:str = inputs["path"]
self.files:tuple[str] = tuple(inputs["files"])
self.map:str = inputs["map"]
self.__cache:dict[str, str] = {}
self.in_use:bool = False
self.__last_time:int = 0
self.__data:list[PythonMapper.CodeBlock|None] = []
self.dependences:list[str|list[str, str|list[str]]|list[str, str, str]|list[str]] = inputs["dependences"] if "dependences" in inputs else []
self.definitions:list[str] = inputs["definitions"] if "definitions" in inputs else []
self.tabulation:str = inputs["tabulation"] if "tabulation" in inputs else PythonMapper.Settings.tabulation_character
if len(self.definitions):
has_new_type:bool = False
dependence:str|list[str, str|list[str]]|list[str, str, str]|list[str]
for dependence in self.dependences:
if isinstance(dependence, list) and len(dependence) > 1 and dependence[0] == "typing" and (dependence[1] == "NewType" if isinstance(dependence[1], str) else "NewType" in dependence[1]):
has_new_type = True
break
if not has_new_type:
self.dependences += [["typing", "NewType"]]
def process(self) -> None:
time:int = PythonMapper.time()
if time - self.__last_time >= self.timer:
self.__update_cache() and self.__process_cache_file()
self.__last_time = time
def __process_cache_level(self, block:Any, level:int) -> str:
white_spaces:str = ""
code:str = ""
subblock:PythonMapper.CodeBlock
method:str
variable:str
has_class:bool = block.class_data != ""
block:PythonMapper.CodeBlock = block
for _ in range(level - 1):
white_spaces += self.tabulation
if has_class:
code += "\n\n" + white_spaces + block.class_data + ":"
white_spaces += self.tabulation
for subblock in block.class_code:
code += self.__process_cache_level(subblock, level + 1)
if len(block.static_variables):
code += "\n"
for variable in block.static_variables:
code += "\n" + white_spaces + variable + " = None"
if has_class:
l:int = len(block.object_variables)
if l or block.constructor:
tabulation:str = white_spaces + self.tabulation
code += "\n\n" + white_spaces + (block.constructor + ":" if block.constructor else "def __init__(self):")
if l:
for variable in block.object_variables:
code += "\n" + tabulation + variable + " = None"
else:
code += "\n" + tabulation + "pass"
for method in block.methods:
code += "\n\n" + white_spaces + method + ":pass"
return code
def __process_cache_file(self) -> None:
# b:PythonMapper.CodeBlock
# for i, block in enumerate(self.__data):
# print([i, [b.class_data for b in block.class_code]])
# return
code:str = (
"#!/usr/bin/env python\n" +
"# -*- coding: utf-8 -*-"
)
block:PythonMapper.CodeBlock
if len(self.dependences):
dependence:str|list[str, str|list[str]]|list[str, str, str]|list[str]
code += "\n"
for dependence in self.dependences:
if isinstance(dependence, str):
if " " in dependence:
code += "\n" + dependence
continue
dependence = [dependence]
l:int = len(dependence)
if l == 1:
code += "\nimport " + (dependence[0] if isinstance(dependence[0], str) else ", ".join(dependence[0]))
elif l == 2:
code += "\nfrom " + dependence[0] + " import " + (dependence[1] if isinstance(dependence[1], str) else ", ".join(dependence[1]))
elif l == 3:
code += "\nfrom " + dependence[0] + " import " + dependence[1] + " as " + dependence[2]
if len(self.definitions):
definition:str
code += "\n"
for definition in self.definitions:
code += "\n" + (definition if "=" in definition or " " in definition else definition + " = NewType(\"" + definition + "\", Any)")
for block in self.__data:
code += self.__process_cache_level(block, 0)
self.parent.save_file(self.path + '/' + self.map, code)
def __update_cache(self) -> bool:
i:int
file:str
has_changes:bool = False
for i, file in enumerate(self.files):
if not self.parent._working:
break
error:int
data:int
path:str = self.path + '/' + file
data, error = self.parent.load_file(path)
if not error and (path not in self.__cache or data != self.__cache[path]):
self.__cache[path] = data
self.__process_code(path, data, i)
if not has_changes:
has_changes = True
sleep(self.rest_timer / 1000.0)
return has_changes
@staticmethod
def unclean_code(code:str) -> tuple[str, tuple[str]]:
blocks:tuple[str] = tuple()
i:int = 0
while True:
matches:REMatch = PythonMapper.Project.re_unclean_code.search(code)
if not matches:
break
position:tuple[int, int] = matches.span()
blocks += (code[position[0]:position[1]],)
code = code[:position[0]] + "###UNCLEAN"+ str(i) + "KEY###" + code[position[1]:]
i += 1
return (code, blocks)
@staticmethod
def clean_code(code:str, blocks:tuple[str]) -> str:
while True:
matches:REMatch = PythonMapper.Project.re_clean_code.search(code)
if not matches:
break
position:tuple[int, int] = matches.span()
code = code[:position[0]] + blocks[int(matches.group(1))] + code[position[1]:]
return code
@staticmethod
def get_group(code:str, pattern:REPattern) -> list[REMatch|None, list[int, int], bool]:
matches:REMatch|None = pattern.search(code)
position:int = list(matches.span()) if matches else [-1, -1]
ok:bool = matches != None and position[0] >= 0
return [matches, position, ok]
def __process_code(self, path:str, code:str, k:int) -> None:
blocks:tuple[str]
l:int = len(code)
sets:dict[str, list[REMatch|None, int, bool]] = {}
data:PythonMapper.SearchData = PythonMapper.SearchData()
code, blocks = PythonMapper.Project.unclean_code(code)
self.parent._print("info", "The code with '{length}' characters from file '{path}' was uncleaned with '{l}' items.", {
"length" : l,
"path" : path,
"l" : len(blocks)
})
while True:
selected:str = ""
i:int = -1
key:str
pattern:REPattern
has_next:bool = False
matches:REMatch
ok:bool
span:list[int, int]
position:list[int, int]
for key, pattern in PythonMapper.Project.methods.items():
if key not in sets:
sets[key] = PythonMapper.Project.get_group(code, pattern)
matches, span, ok = sets[key]
if not ok:
continue
if span[0] < 0:
matches, span, ok = sets[key] = PythonMapper.Project.get_group(code, pattern)
if ok and (i < 0 or span[0] < i):
i = span[0]
selected = key
if not has_next:
has_next = True
if not has_next:
break
matches, span, ok = sets[selected]
getattr(PythonMapper.Project, selected)(code, PythonMapper.Project.clean_code(code[span[0]:span[1]], blocks).strip(), data, matches)
# getattr(PythonMapper.Project, selected)(code, code[span[0]:span[1]], data, matches)
code = code[span[1]:]
if not code:
break
for key, (matches, position, ok) in sets.items():
if ok:
sets[key][1] = [i - span[1] for i in position]
if k == len(self.__data):
self.__data += [None]
while data.i:
data.i -= 1
data.levels[data.i].code.class_code += [data.levels[data.i + 1].code]
self.__data[k] = data.levels[0].code
@staticmethod
def set_level(data:Any, spaces:int) -> bool:
data:PythonMapper.SearchData = data
if spaces > data.levels[data.i].spaces:
if data.i and data.levels[data.i].spaces == data.levels[data.i - 1].spaces:
data.levels[data.i].spaces = spaces
else:
i:int = len(data.levels)
while data.levels[data.i].spaces > spaces:
data.i -= 1
data.levels[data.i].code.class_code += [data.levels[data.i + 1].code]
i -= 1
data.levels = data.levels[:i]
return data.levels[data.i].spaces == spaces
@classmethod
def class_item(self, code:str, fragment:str, data:Any, matches:REMatch):
data:PythonMapper.SearchData = data
if self.set_level(data, len(matches.group(1))):
data.levels += [PythonMapper.ClassData(matches.group(2), len(matches.group(1)))]
data.i += 1
data.levels[data.i].code.class_data = fragment
# print(["class_item", fragment])
@classmethod
def method(self, code:str, fragment:str, data:Any, matches:REMatch):
data:PythonMapper.SearchData = data
name:str = matches.group(4)
if self.set_level(data, len(matches.group(3))):
if name == "__init__" and not data.levels[data.i].code.constructor and "@" not in fragment[:fragment.index("__init__")]:
data.levels[data.i].code.constructor = fragment
else:
data.levels[data.i].code.methods += [fragment]
# print(["method", fragment])
@classmethod
def object_variable(self, code:str, fragment:str, data:Any, matches:REMatch):
name:str = matches.group(1)
data:PythonMapper.SearchData = data
if name in data.levels[data.i].object_done:
return
data.levels[data.i].object_done += [name]
fragment = (fragment[:-1] if fragment[-1] == "=" else fragment).strip()
data.levels[data.i].code.object_variables += [fragment]
# print(["object_variable", fragment])
@classmethod
def static_variable(self, code:str, fragment:str, data:Any, matches:REMatch):
name:str = matches.group(2)
data:PythonMapper.SearchData = data
if not self.set_level(data, len(matches.group(1))) or name in data.levels[data.i].static_done:
return
data.levels[data.i].static_done += [name]
data.levels[data.i].code.static_variables += [fragment]
# print(["static_variable", data.i, name, len(matches.group(1)), fragment])
# print(["static_variable", fragment])
re_string_variables:REPattern = RECompile(r'\{([^\{\}]+)\}')
re_break_lines:REPattern = RECompile(r'\r\n|[\r\n]')
re_exception_line:REPattern = RECompile(r'^\s*File "([^"]+)", line ([0-9]+), in ([^\n]+)(.*|[\r\n]*)*$')
def __init__(self, inputs:dict[str, Any|None]) -> None:
key:str
self.__inputs:dict[str, Any|None] = inputs
self.__projects:dict[str, PythonMapper.Project] = {}
self.__thread:Thread = Thread(target = self.__process_projects)
self.__root_path:str = path_absolute(directory_name(__file__))
self.__root_path_l:int = len(self.__root_path) + 1
self.__files_projects:tuple[str] = (self.__root_path + "/../JSON/PythonMapper.py.projects.secrets.json",)
self._working:bool = True
self.__last_process_try:int = 0
signal_method(SIGNAL_INTERRUPTION, self.signal_handler)
for key in ("projects_files", "projects_file", "project_file"):
if key in inputs:
if isinstance(inputs[key], str):
self.__files_projects += (inputs[key],)
elif isinstance(inputs[key], list):
self.__files_projects += tuple(inputs[key])
elif isinstance(inputs[key], tuple):
self.__files_projects += inputs[key]
self.update_projects()
self.__thread.start()
# print(self.__projects)
def signal_handler(self, signal_id:int, frame:Any) -> None:
self.stop()
print("The PythonMapper was stopped successfully.")
def stop(self) -> None:
if self._working:
self._working = False
@staticmethod
def string_variables(string:str, variables:Optional[dict[str, Any|None]] = {}) -> str:
def callback(matches:REMatch) -> str:
key:str = matches.group(1)
return str(variables[key]) if key in variables else matches.group(0)
return PythonMapper.re_string_variables.sub(callback, string)
def get_action_data(self, i:int = 1) -> dict[str, str|int]:
stack:FrameInfo = get_stack()[1 + i]
return {
"file" : stack.filename[self.__root_path_l:],
"method" : stack.function,
"line" : stack.lineno
}
def _print(self, _type:str, message:str, variables:Optional[dict[str, Any|None]] = None, i:Optional[int] = 1) -> None:
type_l:int = len(_type)
date:datetime.datetime = datetime.datetime.now()
own:dict[str, Any|None] = {
"type" : " "[int(type_l / 2):] + _type[0:4].upper() + " "[int((type_l + 1) / 2):],
**self.get_action_data(i)
}
key:str
for key in ("year", "month", "day", "hour", "minute", "second"):
k:str = "i" if key == "minute" else key[0]
own[key] = own[k] = getattr(date, key) % 100
own[k + k] = ("00" + str(own[k]))[-2:]
own["year"] = own["yyyy"] = date.year
own["milliseconds"] = own["n"] = int(date.microsecond / 1000)
own["nnn"] = ("000" + str(own["n"]))[-3:]
variables = {**own, **variables}
variables["message"] = PythonMapper.string_variables(message + (
PythonMapper.Settings.exception_format if variables["type"] == "EXCE" else
"{list}" if variables["type"] == "ERRO" else
""), variables)
print(PythonMapper.string_variables(PythonMapper.Settings.print_format, variables))
def validate(self,
error:int,
messages:Optional[tuple[str]] = tuple(),
variables:Optional[dict[str, Any|None]] = {},
error_message:Optional[str|None] = None,
ok_message:Optional[str] = None
):
if error:
if error_message:
list_messages:str = ""
i:int = 0
l:int = len(messages)
while 1 << i <= error:
if error & 1 << i:
list_messages += PythonMapper.string_variables("\n - [{i}] {message}", {
"i" : i,
"message" : PythonMapper.string_variables(messages[i], variables) if i < l else "error_message_" + str(i)
})
i += 1
self._print("error", error_message, {
**variables,
"list" : list_messages,
"code" : error
})
return False
ok_message and self._print("ok", ok_message, variables)
return True
def exception(self,
exception:Exception,
message:Optional[str|list|tuple] = None,
variables:Optional[dict[str, Any]|list|tuple] = None,
i:Optional[int] = 1
) -> None:
lines:list[str]|None = extract_traceback(exception.__traceback__).format()
line_matches:REMatch[str]|None = self.re_exception_line.match(lines[-1])
key:str
value:Any|None
block:str|None
data:dict[str, Any|None] = {
**{key : value for subset in (variables if isinstance(variables, (list, tuple)) else (variables,)) for key, value in (subset if isinstance(subset, dict) else {}).items()},
**self.get_action_data(1),
"lines" : "",
"exception_message" : str(exception),
"method" : line_matches.group(3),
"line" : line_matches.group(2),
"file" : line_matches.group(1)[self.__root_path_l:]
}
for block in trace_format_stack()[:-2] + lines:
if block:
data["lines"] += "\n " + self.re_break_lines.split(block.strip())[0]
data["end"] = PythonMapper.string_variables(PythonMapper.Settings.exception_format, data)
message and self._print("exception", message, data, i + 1)
def load_file(self, path:str) -> tuple[str|None, int]:
error:int = (
1 << 0 if path == None else
1 << 1 if not isinstance(path, str) else
1 << 2 if not path else
1 << 3 if not path_exists(path) else
0) << 0
data:str|None = None
if not error:
opened:TextIOWrapper[Any]
try:
with open(path, 'r') as opened:
data = opened.read()
except Exception as exception:
error |= 1 << 0
self.exception(exception, "There was an exception trying load file '{path}'.", {
"path" : path[self.__root_path_l:] if self.__root_path in path else path
})
self.validate(
error,
(
"exception",
"path_null",
"path_not_string",
"path_empty",
"path_not_exists"
),
{
"path" : path[self.__root_path_l:] if self.__root_path in path else path
},
"There was any error with code '{code}' trying load the file '{path}'.",
"The file '{path}' was loaded successfully."
)
return data, error
def save_file(self, path:str, content:str) -> int:
error:int = (
((
1 << 0 if path == None else
1 << 1 if not isinstance(path, str) else
1 << 2 if not path else
# 1 << 3 if not path_exists(path) else
0) << 0) |
((
1 << 0 if content == None else
1 << 1 if not isinstance(content, str) else
# 1 << 2 if not content else
0) << 4) |
0) << 1
if not error:
opened:TextIOWrapper[Any]
try:
with open(path, "w") as opened:
opened.write(content)
except Exception as exception:
error |= 1 << 0
self.exception(exception, "There was an exception trying save file '{path}' with {bytes} bytes of String data.", {
"path" : path,
"bytes" : len(content)
})
self.validate(
error,
(
"exception",
"path_null",
"path_not_string",
"path_empty",
"path_not_exists"
"content_null",
"content_not_string",
"content_empty"
),
{
"path" : path,
"bytes" : len(content) if isinstance(content, str) else -1
},
"There was any error with code '{code}' trying save file '{path}' with {bytes} bytes of String data.",
"The file '{path}' was write with {bytes} bytes of String data successfully."
)
return error
def __load_projects_file(self, path:str) -> int:
error:int
data:str|None
data, error = self.load_file(path)
error <<= 3
if data:
try:
project_key:str
project_data:dict[str, str|list[str]]
projects:dict[str, dict[str, str|list[str]]] = json_decode(data)
for project_key, project_data in projects.items():
if project_key in self.__projects:
while self.__projects[project_key]["in_use"]:
if not self._working:
error |= 1 << 1
break
sleep((random.random() * PythonMapper.Settings.random_wait_time_in_use) / 1000.0)
if self._working:
del self.__projects[project_key]
if not self._working:
error |= 1 << 1
break
try:
self.__projects[project_key] = PythonMapper.Project(self, project_key, project_data)
except Exception as exception:
error |= 1 << 2
self.exception(exception, "There was an exception trying load the project '{key}' from '{path}'.", {
"path" : path[self.__root_path_l:] if self.__root_path in path else path,
"key" : project_key
})
if project_key in self.__projects:
del self.__projects[project_key]
except Exception as exception:
error |= 1 << 0
self.exception(exception, "There was an exception trying load the projects from file '{path}'.", {
"path" : path[self.__root_path_l:] if self.__root_path in path else path
})
self.validate(
error,
(
"exception",
"not_working",
"project_building_exception",
"load_exception",
"path_null",
"path_not_string",
"path_empty",
"path_not_exists"
),
{
"path" : path[self.__root_path_l:] if self.__root_path in path else path
},
"There was any error with code '{code}' trying loading the projects of file '{path}'.",
"The projects of file '{path}' was loaded successfully."
)
return error
def update_projects(self) -> int:
error:int = 0
for file in self.__files_projects:
if self.__load_projects_file(file):
error |= 1 << 1
self.validate(
error,
(
"exception",
"load_projects_errors"
),
{},
"There was any error with code '{code}' trying update the projects.",
"The projects was updated successfully."
)
return error
@staticmethod
def time() -> int:
return int(timestamp() * 1000)
def __process_projects(self) -> None:
while self._working:
time:int = PythonMapper.time()
if time - self.__last_process_try >= PythonMapper.Settings.process_time:
project:PythonMapper.Project
for project in self.__projects.values():
project.process()
self.__last_process_try = time
sleep(PythonMapper.Settings.process_subtimer / 1000.0)