MemWeb/Python/Application/MemWeb.py

807 lines
30 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from os.path import dirname as directory_name
from os.path import abspath as path_absolute
from os.path import exists as path_exists
from re import compile as RECompile
from json import loads as json_decode
from inspect import stack as get_stack
from traceback import format_stack as trace_format_stack
from traceback import extract_tb as extract_traceback
from time import time as timestamp
2024-05-04 09:13:39 +00:00
from base64 import b64encode as base64_encode
import datetime
if "common" not in globals():
class common:pass
class MemWeb:
__default_settings = common.DefaultData.settings
re_key = RECompile(r'^[a-zA-Z0-9_]+$')
re_string_variables = RECompile(r'\{([^\{\}]+)\}')
re_print_type = RECompile(r' ?[a-z0-9_]+ ?')
re_json = RECompile(r'^(\[(.+|[\r\n]+)*\]|\{(.+|[\r\n]+)*\})$')
re_slashes = RECompile(r'[\/\\\\]+')
re_break_lines = RECompile(r'\r\n|[\r\n]')
re_exception_line = RECompile(r'^\s*File "([^"]+)", line ([0-9]+), in ([^\n]+)(.*|[\r\n]*)*$')
def __set_basic_data(self):
self.__nulls = self.settings("nulls", None, False, False)
self.__default_value = self.settings("default_value", None, None, True)
self.__exception_format = self.settings("exception_format")
self.__language = self.settings(("language", "default_language"))
self.__default_language = self.settings(("default_language", "language"))
self.__default_text = self.settings("default_text")
self.__print_format = self.settings("print_format")
self.__settings_overwrite = self.settings(("settings_overwrite", "overwrite"))
self.__debug_print_types = self.settings("debug_print_types")
self.__show_settings_add_ok = self.settings(("show_settings_add_ok_message", "show_ok_message"))
self.__show_settings_add_error = self.settings(("show_settings_add_error_message", "show_error_message"))
self.__show_settings_add_exception = self.settings(("show_settings_add_exception_message", "show_exception_message"))
self.__show_fix_path_exception = self.settings(("show_fix_path_exception_message", "show_exception_message"))
self.__show_load_file_exception = self.settings(("show_load_file_exception_message", "show_exception_message"))
self.__show_add_print_types_exception = self.settings(("show_add_print_types_exception_message", "show_exception_message"))
for key in ("default_print_types", "print_types"):
self.add_print_types(self.settings(key))
def __init__(self, inputs = None):
self.__sentences = common.DefaultData.i18n
self.__settings = {}
self.__inputs = inputs
self.__print_types = []
self.__root_paths = [path_absolute(directory_name(__file__))]
self.__slash = '/' if '/' in self.__root_paths[0] else '\\'
for i in range(3):
self.__root_paths += [self.__root_paths[0 + i] + "/.."]
self.__root_paths += [""]
self.__root_paths.reverse()
self.__set_basic_data()
self._print("info", "mem_web_building")
self.threads = MemWeb.Threads(self, inputs)
self.terminal = MemWeb.Terminal(self, inputs)
self.__started = False
self._print("info", "mem_web_settings_loading")
for key in ("default_settings_files", "settings_files"):
self.settings_add(self.settings(key), True)
self.__set_basic_data()
self._print("ok", "mem_web_settings_loaded")
self.__i18n_overwrite = self.settings(("i18n_overwrite", "overwrite"))
self.__show_i18n_add_ok = self.settings(("show_i18n_add_ok_message", "show_ok_message"))
self.__show_i18n_add_error = self.settings(("show_i18n_add_error_message", "show_error_message"))
self.__show_i18n_add_exception = self.settings(("show_i18n_add_exception_message", "show_exception_message"))
self.__show_launch_asynchronous_ok = self.settings(("show_launch_asynchronous_ok_message", "show_ok_message"))
self.__show_launch_asynchronous_error = self.settings(("show_launch_asynchronous_error_message", "show_error_message"))
self.__show_launch_asynchronous_exception = self.settings(("show_launch_asynchronous_exception_message", "show_exception_message"))
self.__show_launch_asynchronous_item_exception = self.settings(("show_launch_asynchronous_exception_item_message", "show_exception_message"))
self._print("info", "mem_web_i18n_loading")
for key in ("default_i18n_files", "i18n_files"):
self.i18n_add(self.settings(key), True)
self._print("ok", "mem_web_i18n_loaded")
self._print("ok", "mem_web_built")
self.settings("autostart") and self.start()
@staticmethod
def reduce(inputs, callback, value = None):
error = (
((
1 << 0 if inputs == None else
1 << 1 if not isinstance(inputs, (list, tuple)) else
0) << 0) |
((
1 << 0 if callback == None else
1 << 1 if not callable(callback) else
0) << 2) |
0) << 1
if not error:
try:
for i, item in enumerate(inputs):
value = callback(value, item, i, inputs)
except:
error |= 1 << 0
return (value, error)
@staticmethod
def is_object(item):
return not isinstance(item, str) and " object " in str(item)
@staticmethod
def is_class(item):
return not isinstance(item, str) and "<class '" == str(item)[0:8]
def __launch_asynchronous(self, object, items, method, end, i, errors):
if i < len(items):
next = lambda error, *_:self.__launch_asynchronous(object, items, method, end, i + 1, errors + [error])
error = 0 if hasattr(object, items[i]) else 1 << 2
if not error:
level = getattr(object, items[i])
error |= 0 if hasattr(level, method) else 1 << 3
if not error:
callback = getattr(level, method)
error |= (
1 << 0 if callback == None else
1 << 1 if not callable(callback) else
0) << 4
if not error:
try:
return callback(next)
except Exception as exception:
self.exception(exception, self.__show_launch_asynchronous_item_exception and "mem_web_launch_asynchronous_item_exception", {
"i" : i,
"item" : items[i],
"method" : method,
"error" : error
})
error |= 1 << 0
next(error)
else:
self.execute(end, errors)
def launch_asynchronous(self, object, items, method, end, show_errors = None):
error = (
((
1 << 0 if object == None else
1 << 1 if not self.is_object(object) else
0) << 0) |
((
1 << 0 if items == None else
1 << 1 if not isinstance(items, (list, tuple)) else
1 << 2 if not len(items) else
self.reduce(
[(
1 << 0 if item == None else
1 << 1 if not isinstance(item, str) else
1 << 2 if not item else
1 << 3 if not self.re_key.match(item) else
0) for item in items],
lambda a, b, *_: a | b,
0)[0] << 3) << 2) |
((
1 << 0 if method == None else
1 << 1 if not isinstance(method, str) else
1 << 2 if not method else
1 << 3 if not self.re_key.match(method) else
0) << 9) |
((
1 << 0 if end == None else
1 << 1 if not callable(end) else
0) << 13) |
0) << 1
has_show_errors = isinstance(show_errors, bool)
if not error:
try:
self.__launch_asynchronous(object, items, method, end, 0, [])
except Exception as exception:
self.exception(exception, self.__show_launch_asynchronous_exception and "web_mem_launch_asynchronous_exception", {
"method" : method
})
error |= 1 << 0
self.validate(
error,
(
"exception",
"object_null",
"object_not_class_object",
"items_null",
"items_not_list",
"items_empty",
"some_item_null",
"some_item_not_string",
"some_item_empty",
"some_item_bad_characters",
"method_null",
"method_not_string",
"method_empty",
"method_bad_characters",
"end_null",
"end_not_function"
),
{
"method" : method
},
(show_errors if has_show_errors else self.__show_launch_asynchronous_error) and "web_mem_launch_asynchronous_error",
(show_errors if has_show_errors else True) and self.__show_launch_asynchronous_ok and "web_mem_launch_asynchronous_ok"
)
return error
def __end_start(self, errors, callback):
message = ("ok", "mem_web_started")
ok = False
for error in errors:
if (
not error if isinstance(error, bool) else
error if isinstance(error, int) else
False):
message = ("warn", "mem_web_started_with_errors")
ok = False
break
self._print(*message)
MemWeb.execute(callback, ok, errors)
def start(self, callback = None):
self._print("info", "mem_web_starting")
if self.__started:
self._print("warn", "mem_web_already_started")
MemWeb.execute(callback, False)
return False
self.__started = True
self.launch_asynchronous(self, ("threads", "terminal"), "start", lambda errors:self.__end_start(errors, callback))
return True
def __end_close(self, errors, callback):
message = ("ok", "mem_web_closed")
ok = False
for error in errors:
if (
not error if isinstance(error, bool) else
error if isinstance(error, int) else
False):
message = ("warn", "mem_web_closed_with_errors")
ok = False
break
self._print(*message)
MemWeb.execute(callback, ok, errors)
def close(self, callback = None):
self._print("info", "mem_web_closing")
if not self.__started:
self._print("warn", "mem_web_already_closed")
MemWeb.execute(callback, False)
return False
self.__started = False
self.launch_asynchronous(self, ("terminal", "threads"), "close", lambda errors:self.__end_close(errors, callback))
return True
@classmethod
def get_keys(self, keys):
keys = [key.strip() for key in (keys if isinstance(keys, (list, tuple)) else (keys,)) if isinstance(key, str)]
return [key for i, key in enumerate(keys) if key and keys.index(key) == i and self.re_key.match(key)]
@staticmethod
def get_dictionaries(inputs):
return (
tuple([
subset for subset in inputs if isinstance(subset, dict)
]) if isinstance(inputs, (list, tuple)) else
(inputs,) if isinstance(inputs, dict) else
tuple())
@classmethod
def join_dictionaries(self, inputs):
return {key : value for subset in self.get_dictionaries(inputs) for key, value in subset.items()}
@staticmethod
def execute(callback, *parameters):
callback and callable(callback) and callback(*parameters)
@staticmethod
def get_texts(texts):
if not isinstance(texts, (list, tuple)):
texts = (texts,)
return [text for i, text in enumerate(texts) if isinstance(text, str) and texts.index(text) == i]
def nulls(self, nulls = None):
return nulls if isinstance(nulls, bool) else self.__nulls
def default_value(self, default = None, nulls = None):
return default if default != None or self.nulls(nulls) else self.__default_value
def settings(self, keys, inputs = None, default = None, nulls = None):
keys = self.get_keys(keys)
if len(keys):
nulls = self.nulls(nulls)
for subset in self.get_dictionaries(inputs) + (self.__inputs, self.__settings, self.__default_settings):
if isinstance(subset, dict):
for key in keys:
if key in subset and (nulls or subset[key] != None):
return subset[key]
return self.default_value(default, nulls)
@classmethod
def string_variables(self, string, variables = None, default = None):
variables = self.get_dictionaries(variables)
def callback(matches):
key = matches.group(1)
for subset in variables:
if key in subset:
return str(subset[key])
return matches.group(0) if default == None else str(default)
return self.re_string_variables.sub(callback, string)
def default_text(self, default = None):
return default if isinstance(default, str) else self.__default_text
def __i18n_get(self, texts, default):
texts = self.get_texts(texts)
if len(texts):
languages = (self.__language, self.__default_language) + tuple(self.__sentences.keys())
for i, language in enumerate(languages):
if language and isinstance(language, str) and language in self.__sentences and languages.index(language) == i:
for key in texts:
if key in self.__sentences[language]:
return self.__sentences[language][key]
return texts[0]
return self.default_text(default)
def i18n(self, texts, variables = None, default = None):
text = self.__i18n_get(texts, default)
return self.string_variables("".join(text) if isinstance(text, (list, tuple)) else str(text), variables, default)
def fix_path(self, path):
error = (
1 << 0 if path == None else
1 << 1 if not isinstance(path, str) else
0) << 1
if not error:
try:
path = self.re_slashes.sub(self.__slash, path)
except Exception as exception:
error |= 1 << 0
self.exception(exception, self.__show_fix_path_exception and "mem_web_fix_path_exception", {
"path" : path
})
return (path, error)
def get_absolute_path(self, path):
error = (
1 << 0 if path == None else
1 << 1 if not isinstance(path, str) else
0) << 1
absolute_path = None
if not error:
for root in self.__root_paths:
current_path = self.fix_path(root + self.__slash + path)[0]
if path_exists(current_path):
absolute_path = current_path
break
if absolute_path == None:
error |= 1 << 3
return (absolute_path, error)
def load_file(self, path, mode = "r"):
absolute_path, error = self.get_absolute_path(path)
error |= (
1 << 0 if mode == None else
1 << 1 if not isinstance(mode, str) else
1 << 2 if not mode else
1 << 3 if mode not in ("r", "rb") else
0) << 4
response = None
if not error:
try:
with open(absolute_path, mode) as data:
response = data.read()
except Exception as exception:
error |= 1 << 0
self.exception(exception, self.__show_load_file_exception and "mem_web_load_file_exception", {
"path" : path,
"mode" : mode
})
return (response, error)
def add_print_types(self, inputs, show_errors = None):
if isinstance(inputs, (list, tuple)):
has_show_errors = isinstance(show_errors, bool)
for subset in inputs:
error = (
1 << 0 if subset == None else
1 << 1 if not isinstance(subset, (list, tuple)) else
1 << 2 if not len(subset) else
0) << 1
if not error:
i = None
for _type in subset:
suberror = (
1 << 0 if _type == None else
1 << 1 if not isinstance(_type, str) else
1 << 2 if not _type else
1 << 3 if not self.re_print_type.match(_type) else
0) << 1
if suberror:
if not error & 1 << 4:
error |= 1 << 4
continue
if i == None:
for j, print_types in enumerate(self.__print_types):
if _type in print_types:
i = j
break
if i == None:
i = len(self.__print_types)
self.__print_types += [[_type]]
continue
self.__print_types[i] += [_type]
elif isinstance(inputs, str):
if self.re_json.match(inputs.strip()):
try:
self.add_print_types(json_decode(inputs), show_errors)
except Exception as exception:
self.exception(exception, self.__show_add_print_types_exception and "mem_web_add_print_types_exception")
else:
json = self.load_file(inputs)[0]
if json and self.re_json.match(json.strip()):
try:
self.add_print_types(json_decode(json), show_errors)
except Exception as exception:
self.exception(exception, self.__show_add_print_types_exception and "mem_web_add_print_types_exception")
def get_print_type(self, _type):
error = (
1 << 0 if _type == None else
1 << 1 if not isinstance(_type, str) else
1 << 2 if not _type else
0) << 1
if not error:
_type = _type.lower()
for print_types in self.__print_types:
if _type in print_types:
return print_types[0]
return self.__print_types[0][0]
@staticmethod
def get_action_data(i = 1):
stack = get_stack()[1 + i]
return {
"file" : stack.filename,
"method" : stack.function,
"line" : stack.lineno
}
2024-05-04 09:13:39 +00:00
@staticmethod
def base64_encode(data):
return base64_encode(bytes(
data.to_bytes((data.bit_length() + 7) // 8, byteorder = "big") if isinstance(data, (int, float)) else
data.encode() if isinstance(data, str) else
data.encode("ascii"))).decode()
def _print(self, _type, message, inputs = None, i = 1):
basic_type = self.get_print_type(_type)
if basic_type.strip() not in self.__debug_print_types:
return
own = {
"raw_type" : _type,
"basic_type" : self.get_print_type(_type),
"i18n" : message,
**self.get_action_data(i),
**self.join_dictionaries(inputs)
}
date = datetime.datetime.now()
own["type"] = own["basic_type"].upper()
for key in ("year", "month", "day", "hour", "minute", "second"):
k = "i" if key == "minute" else key[0]
own[key] = getattr(date, key)
own[k] = own[key] % 100
own[k + k] = ("00" + str(own[k]))[-2:]
own["yyyy"] = own["year"]
own["message"] = self.i18n(message, own) + (own["end"] if own["type"] in ("ERRO", "EXCE") and "end" in own else "")
print(self.string_variables(self.__print_format, own))
@staticmethod
def is_key(key):
return key and isinstance(key, str) and key[0:7] != "MemWeb_" and key[-4:] != "_end" and key[-6:] != "_start"
def settings_add(self, inputs, overwrite = None, show_errors = None):
if isinstance(inputs, (list, tuple)):
for subinputs in inputs:
self.settings_add(subinputs, overwrite)
elif isinstance(inputs, dict):
has_show_errors = isinstance(show_errors, bool)
if not isinstance(overwrite, bool):
overwrite = self.__settings_overwrite
for key, value in inputs.items():
if not self.is_key(key):
continue
error = (
1 << 0 if key == None else
1 << 1 if not isinstance(key, str) else
1 << 2 if not key else
1 << 3 if not self.re_key.match(key) else
1 << 4 if not overwrite and key in self.__settings else
0) << 1
if self.validate(
error,
(
"exception",
"key_null",
"key_not_string",
"key_empty",
"key_not_key",
"key_exists"
),
{
"key" : key,
"value" : value
},
(show_errors if has_show_errors else self.__show_settings_add_error) and "mem_web_settings_add_error",
(show_errors if has_show_errors else True) and self.__show_settings_add_ok and "mem_web_settings_add_ok"
):
self.__settings[key] = value
elif isinstance(inputs, str):
if self.re_json.match(inputs.strip()):
try:
self.settings_add(json_decode(inputs), overwrite, show_errors)
except Exception as exception:
self.__show_settings_add_exception and self.exception(exception, "mem_web_settings_add_exception")
else:
json, error = self.load_file(inputs)
if not error and json and self.re_json.match(json.strip()):
try:
self.settings_add(json_decode(json), overwrite, show_errors)
except Exception as exception:
self.__show_settings_add_exception and self.exception(exception, "mem_web_settings_add_exception")
def i18n_add(self, inputs, overwrite = None, show_errors = None):
if isinstance(inputs, (list, tuple)):
for subinputs in inputs:
self.i18n_add(subinputs, overwrite)
elif isinstance(inputs, dict):
has_show_errors = isinstance(show_errors, bool)
if not isinstance(overwrite, bool):
overwrite = self.__i18n_overwrite
for language, sentences in inputs.items():
error = (
((
1 << 0 if language == None else
1 << 1 if not isinstance(language, str) else
1 << 2 if not language else
1 << 3 if not self.re_key.match(language) else
0) << 0) |
((
1 << 0 if sentences == None else
1 << 1 if not isinstance(sentences, dict) else
1 << 2 if not len(sentences) else
0) << 4) |
0) << 1
if not error:
if language not in self.__sentences:
self.__sentences[language] = {}
for key, sentence in sentences.items():
if not self.is_key(key):
continue
suberror = (
((
1 << 0 if key == None else
1 << 1 if not isinstance(key, str) else
1 << 2 if not key else
1 << 3 if not self.re_key.match(key) else
1 << 5 if not overwrite and key in self.__sentences[language] else
0) << 0) |
((
1 << 0 if sentence == None else
1 << 1 if not isinstance(sentence, (str, list, tuple)) else
0) << 5) |
0) << 1
if self.validate(
suberror,
(
"exception",
"key_null",
"key_not_string",
"key_empty",
"key_not_key",
"key_exists",
"sentence_null",
"sentence_not_string"
),
{
"language" : language,
"key" : key,
"sentence" : sentence
},
(show_errors if has_show_errors else self.__show_i18n_add_error) and "mem_web_i18n_add_key_error",
(show_errors if has_show_errors else True) and self.__show_i18n_add_ok and "mem_web_i18n_add_key_ok"
):
self.__sentences[language][key] = sentence
elif error & 1 << 8:
error |= 1 << 8
self.validate(
error,
(
"exception",
"language_null",
"language_not_string",
"language_empty",
"language_not_key",
"sentences_null",
"sentences_not_dictionary",
"sentences_empty",
"sentences_with_errors"
),
{
"language" : language
},
(show_errors if has_show_errors else self.__show_i18n_add_error) and "mem_web_i18n_add_error",
(show_errors if has_show_errors else True) and self.__show_i18n_add_ok and "mem_web_i18n_add_ok"
)
elif isinstance(inputs, str):
if self.re_json.match(inputs.strip()):
try:
self.i18n_add(json_decode(inputs), overwrite, show_errors)
except Exception as exception:
self.__show_i18n_add_exception and self.exception(exception, "mem_web_i18n_add_json_exception")
else:
json, error = self.load_file(inputs)
if not error and json and self.re_json.match(json.strip()):
try:
self.i18n_add(json_decode(json), overwrite, show_errors)
except Exception as exception:
self.__show_i18n_add_exception and self.exception(exception, "mem_web_i18n_add_file_exception")
def validate(self, error, messages = [], variables = None, error_message = None, ok_message = None):
if error:
if error_message:
own = {
2024-05-04 09:13:39 +00:00
"code" : self.base64_encode(error),
"raw_code" : error,
"end" : "",
**{key : value for subset in (variables if isinstance(variables, (list, tuple)) else (variables,)) for key, value in (subset if isinstance(subset, dict) else {}).items()}
}
i = 0
l = len(messages)
while 1 << i <= error:
if (1 << i) & error:
own["end"] += "\n [" + str(i) + "] " + self.i18n(messages[i] if i < l and messages[i] else "error_message_" + str(i), own)
i += 1
self._print("error", error_message, own, 2)
return False
ok_message and self._print("ok", ok_message, variables)
return True
def exception(self, exception, message = None, variables = None, i = 1):
lines = extract_traceback(exception.__traceback__).format()
line_matches = self.re_exception_line.match(lines[-1])
data = {
**{key : value for subset in (variables if isinstance(variables, (list, tuple)) else (variables,)) for key, value in (subset if isinstance(subset, dict) else {}).items()},
**self.get_action_data(1),
"lines" : "",
"exception_message" : str(exception),
"method" : line_matches.group(3),
"line" : line_matches.group(2),
"file" : line_matches.group(1)
}
for block in trace_format_stack()[:-2] + lines:
if block:
data["lines"] += "\n " + self.re_break_lines.split(block.strip())[0]
data["end"] = self.string_variables(self.__exception_format, data)
message and self._print("exception", message, data, i + 1)
@classmethod
def get_value(self, keys, inputs, default = None):
keys = self.get_keys(keys)
if len(keys):
for subset in self.get_dictionaries(inputs):
for key in keys:
if key in subset:
return subset[key]
return default
@staticmethod
def timestamp():
return int(timestamp() * 1000)