807 lines
30 KiB
Python
807 lines
30 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from os.path import dirname as directory_name
|
|
from os.path import abspath as path_absolute
|
|
from os.path import exists as path_exists
|
|
from re import compile as RECompile
|
|
from json import loads as json_decode
|
|
from inspect import stack as get_stack
|
|
from traceback import format_stack as trace_format_stack
|
|
from traceback import extract_tb as extract_traceback
|
|
from time import time as timestamp
|
|
from base64 import b64encode as base64_encode
|
|
import datetime
|
|
|
|
if "common" not in globals():
|
|
class common:pass
|
|
|
|
class MemWeb:
|
|
|
|
__default_settings = common.DefaultData.settings
|
|
|
|
re_key = RECompile(r'^[a-zA-Z0-9_]+$')
|
|
re_string_variables = RECompile(r'\{([^\{\}]+)\}')
|
|
re_print_type = RECompile(r' ?[a-z0-9_]+ ?')
|
|
re_json = RECompile(r'^(\[(.+|[\r\n]+)*\]|\{(.+|[\r\n]+)*\})$')
|
|
re_slashes = RECompile(r'[\/\\\\]+')
|
|
re_break_lines = RECompile(r'\r\n|[\r\n]')
|
|
re_exception_line = RECompile(r'^\s*File "([^"]+)", line ([0-9]+), in ([^\n]+)(.*|[\r\n]*)*$')
|
|
|
|
def __set_basic_data(self):
|
|
|
|
self.__nulls = self.settings("nulls", None, False, False)
|
|
self.__default_value = self.settings("default_value", None, None, True)
|
|
self.__exception_format = self.settings("exception_format")
|
|
self.__language = self.settings(("language", "default_language"))
|
|
self.__default_language = self.settings(("default_language", "language"))
|
|
self.__default_text = self.settings("default_text")
|
|
self.__print_format = self.settings("print_format")
|
|
self.__settings_overwrite = self.settings(("settings_overwrite", "overwrite"))
|
|
self.__debug_print_types = self.settings("debug_print_types")
|
|
self.__show_settings_add_ok = self.settings(("show_settings_add_ok_message", "show_ok_message"))
|
|
self.__show_settings_add_error = self.settings(("show_settings_add_error_message", "show_error_message"))
|
|
self.__show_settings_add_exception = self.settings(("show_settings_add_exception_message", "show_exception_message"))
|
|
self.__show_fix_path_exception = self.settings(("show_fix_path_exception_message", "show_exception_message"))
|
|
self.__show_load_file_exception = self.settings(("show_load_file_exception_message", "show_exception_message"))
|
|
self.__show_add_print_types_exception = self.settings(("show_add_print_types_exception_message", "show_exception_message"))
|
|
|
|
for key in ("default_print_types", "print_types"):
|
|
self.add_print_types(self.settings(key))
|
|
|
|
def __init__(self, inputs = None):
|
|
|
|
self.__sentences = common.DefaultData.i18n
|
|
self.__settings = {}
|
|
self.__inputs = inputs
|
|
self.__print_types = []
|
|
self.__root_paths = [path_absolute(directory_name(__file__))]
|
|
self.__slash = '/' if '/' in self.__root_paths[0] else '\\'
|
|
|
|
for i in range(3):
|
|
self.__root_paths += [self.__root_paths[0 + i] + "/.."]
|
|
self.__root_paths += [""]
|
|
|
|
self.__root_paths.reverse()
|
|
|
|
self.__set_basic_data()
|
|
|
|
self._print("info", "mem_web_building")
|
|
|
|
self.threads = MemWeb.Threads(self, inputs)
|
|
self.terminal = MemWeb.Terminal(self, inputs)
|
|
|
|
self.__started = False
|
|
|
|
self._print("info", "mem_web_settings_loading")
|
|
for key in ("default_settings_files", "settings_files"):
|
|
self.settings_add(self.settings(key), True)
|
|
self.__set_basic_data()
|
|
self._print("ok", "mem_web_settings_loaded")
|
|
|
|
self.__i18n_overwrite = self.settings(("i18n_overwrite", "overwrite"))
|
|
self.__show_i18n_add_ok = self.settings(("show_i18n_add_ok_message", "show_ok_message"))
|
|
self.__show_i18n_add_error = self.settings(("show_i18n_add_error_message", "show_error_message"))
|
|
self.__show_i18n_add_exception = self.settings(("show_i18n_add_exception_message", "show_exception_message"))
|
|
self.__show_launch_asynchronous_ok = self.settings(("show_launch_asynchronous_ok_message", "show_ok_message"))
|
|
self.__show_launch_asynchronous_error = self.settings(("show_launch_asynchronous_error_message", "show_error_message"))
|
|
self.__show_launch_asynchronous_exception = self.settings(("show_launch_asynchronous_exception_message", "show_exception_message"))
|
|
self.__show_launch_asynchronous_item_exception = self.settings(("show_launch_asynchronous_exception_item_message", "show_exception_message"))
|
|
|
|
self._print("info", "mem_web_i18n_loading")
|
|
for key in ("default_i18n_files", "i18n_files"):
|
|
self.i18n_add(self.settings(key), True)
|
|
self._print("ok", "mem_web_i18n_loaded")
|
|
|
|
self._print("ok", "mem_web_built")
|
|
|
|
self.settings("autostart") and self.start()
|
|
|
|
@staticmethod
|
|
def reduce(inputs, callback, value = None):
|
|
|
|
error = (
|
|
((
|
|
1 << 0 if inputs == None else
|
|
1 << 1 if not isinstance(inputs, (list, tuple)) else
|
|
0) << 0) |
|
|
((
|
|
1 << 0 if callback == None else
|
|
1 << 1 if not callable(callback) else
|
|
0) << 2) |
|
|
0) << 1
|
|
|
|
if not error:
|
|
try:
|
|
for i, item in enumerate(inputs):
|
|
value = callback(value, item, i, inputs)
|
|
except:
|
|
error |= 1 << 0
|
|
|
|
return (value, error)
|
|
|
|
@staticmethod
|
|
def is_object(item):
|
|
return not isinstance(item, str) and " object " in str(item)
|
|
|
|
@staticmethod
|
|
def is_class(item):
|
|
return not isinstance(item, str) and "<class '" == str(item)[0:8]
|
|
|
|
def __launch_asynchronous(self, object, items, method, end, i, errors):
|
|
if i < len(items):
|
|
|
|
next = lambda error, *_:self.__launch_asynchronous(object, items, method, end, i + 1, errors + [error])
|
|
error = 0 if hasattr(object, items[i]) else 1 << 2
|
|
|
|
if not error:
|
|
level = getattr(object, items[i])
|
|
error |= 0 if hasattr(level, method) else 1 << 3
|
|
if not error:
|
|
callback = getattr(level, method)
|
|
error |= (
|
|
1 << 0 if callback == None else
|
|
1 << 1 if not callable(callback) else
|
|
0) << 4
|
|
if not error:
|
|
try:
|
|
return callback(next)
|
|
except Exception as exception:
|
|
self.exception(exception, self.__show_launch_asynchronous_item_exception and "mem_web_launch_asynchronous_item_exception", {
|
|
"i" : i,
|
|
"item" : items[i],
|
|
"method" : method,
|
|
"error" : error
|
|
})
|
|
error |= 1 << 0
|
|
next(error)
|
|
else:
|
|
self.execute(end, errors)
|
|
|
|
def launch_asynchronous(self, object, items, method, end, show_errors = None):
|
|
|
|
error = (
|
|
((
|
|
1 << 0 if object == None else
|
|
1 << 1 if not self.is_object(object) else
|
|
0) << 0) |
|
|
((
|
|
1 << 0 if items == None else
|
|
1 << 1 if not isinstance(items, (list, tuple)) else
|
|
1 << 2 if not len(items) else
|
|
self.reduce(
|
|
[(
|
|
1 << 0 if item == None else
|
|
1 << 1 if not isinstance(item, str) else
|
|
1 << 2 if not item else
|
|
1 << 3 if not self.re_key.match(item) else
|
|
0) for item in items],
|
|
lambda a, b, *_: a | b,
|
|
0)[0] << 3) << 2) |
|
|
((
|
|
1 << 0 if method == None else
|
|
1 << 1 if not isinstance(method, str) else
|
|
1 << 2 if not method else
|
|
1 << 3 if not self.re_key.match(method) else
|
|
0) << 9) |
|
|
((
|
|
1 << 0 if end == None else
|
|
1 << 1 if not callable(end) else
|
|
0) << 13) |
|
|
0) << 1
|
|
has_show_errors = isinstance(show_errors, bool)
|
|
|
|
if not error:
|
|
try:
|
|
self.__launch_asynchronous(object, items, method, end, 0, [])
|
|
except Exception as exception:
|
|
self.exception(exception, self.__show_launch_asynchronous_exception and "web_mem_launch_asynchronous_exception", {
|
|
"method" : method
|
|
})
|
|
error |= 1 << 0
|
|
|
|
self.validate(
|
|
error,
|
|
(
|
|
"exception",
|
|
"object_null",
|
|
"object_not_class_object",
|
|
"items_null",
|
|
"items_not_list",
|
|
"items_empty",
|
|
"some_item_null",
|
|
"some_item_not_string",
|
|
"some_item_empty",
|
|
"some_item_bad_characters",
|
|
"method_null",
|
|
"method_not_string",
|
|
"method_empty",
|
|
"method_bad_characters",
|
|
"end_null",
|
|
"end_not_function"
|
|
),
|
|
{
|
|
"method" : method
|
|
},
|
|
(show_errors if has_show_errors else self.__show_launch_asynchronous_error) and "web_mem_launch_asynchronous_error",
|
|
(show_errors if has_show_errors else True) and self.__show_launch_asynchronous_ok and "web_mem_launch_asynchronous_ok"
|
|
)
|
|
|
|
return error
|
|
|
|
def __end_start(self, errors, callback):
|
|
|
|
message = ("ok", "mem_web_started")
|
|
ok = False
|
|
|
|
for error in errors:
|
|
if (
|
|
not error if isinstance(error, bool) else
|
|
error if isinstance(error, int) else
|
|
False):
|
|
message = ("warn", "mem_web_started_with_errors")
|
|
ok = False
|
|
break
|
|
|
|
self._print(*message)
|
|
|
|
MemWeb.execute(callback, ok, errors)
|
|
|
|
def start(self, callback = None):
|
|
|
|
self._print("info", "mem_web_starting")
|
|
|
|
if self.__started:
|
|
self._print("warn", "mem_web_already_started")
|
|
MemWeb.execute(callback, False)
|
|
return False
|
|
self.__started = True
|
|
|
|
self.launch_asynchronous(self, ("threads", "terminal"), "start", lambda errors:self.__end_start(errors, callback))
|
|
|
|
return True
|
|
|
|
def __end_close(self, errors, callback):
|
|
|
|
message = ("ok", "mem_web_closed")
|
|
ok = False
|
|
|
|
for error in errors:
|
|
if (
|
|
not error if isinstance(error, bool) else
|
|
error if isinstance(error, int) else
|
|
False):
|
|
message = ("warn", "mem_web_closed_with_errors")
|
|
ok = False
|
|
break
|
|
|
|
self._print(*message)
|
|
|
|
MemWeb.execute(callback, ok, errors)
|
|
|
|
def close(self, callback = None):
|
|
|
|
self._print("info", "mem_web_closing")
|
|
|
|
if not self.__started:
|
|
self._print("warn", "mem_web_already_closed")
|
|
MemWeb.execute(callback, False)
|
|
return False
|
|
self.__started = False
|
|
|
|
self.launch_asynchronous(self, ("terminal", "threads"), "close", lambda errors:self.__end_close(errors, callback))
|
|
|
|
return True
|
|
|
|
@classmethod
|
|
def get_keys(self, keys):
|
|
|
|
keys = [key.strip() for key in (keys if isinstance(keys, (list, tuple)) else (keys,)) if isinstance(key, str)]
|
|
|
|
return [key for i, key in enumerate(keys) if key and keys.index(key) == i and self.re_key.match(key)]
|
|
|
|
@staticmethod
|
|
def get_dictionaries(inputs):
|
|
return (
|
|
tuple([
|
|
subset for subset in inputs if isinstance(subset, dict)
|
|
]) if isinstance(inputs, (list, tuple)) else
|
|
(inputs,) if isinstance(inputs, dict) else
|
|
tuple())
|
|
|
|
@classmethod
|
|
def join_dictionaries(self, inputs):
|
|
return {key : value for subset in self.get_dictionaries(inputs) for key, value in subset.items()}
|
|
|
|
@staticmethod
|
|
def execute(callback, *parameters):
|
|
callback and callable(callback) and callback(*parameters)
|
|
|
|
@staticmethod
|
|
def get_texts(texts):
|
|
|
|
if not isinstance(texts, (list, tuple)):
|
|
texts = (texts,)
|
|
|
|
return [text for i, text in enumerate(texts) if isinstance(text, str) and texts.index(text) == i]
|
|
|
|
def nulls(self, nulls = None):
|
|
return nulls if isinstance(nulls, bool) else self.__nulls
|
|
|
|
def default_value(self, default = None, nulls = None):
|
|
return default if default != None or self.nulls(nulls) else self.__default_value
|
|
|
|
def settings(self, keys, inputs = None, default = None, nulls = None):
|
|
|
|
keys = self.get_keys(keys)
|
|
|
|
if len(keys):
|
|
|
|
nulls = self.nulls(nulls)
|
|
|
|
for subset in self.get_dictionaries(inputs) + (self.__inputs, self.__settings, self.__default_settings):
|
|
if isinstance(subset, dict):
|
|
for key in keys:
|
|
if key in subset and (nulls or subset[key] != None):
|
|
return subset[key]
|
|
return self.default_value(default, nulls)
|
|
|
|
@classmethod
|
|
def string_variables(self, string, variables = None, default = None):
|
|
|
|
variables = self.get_dictionaries(variables)
|
|
|
|
def callback(matches):
|
|
key = matches.group(1)
|
|
for subset in variables:
|
|
if key in subset:
|
|
return str(subset[key])
|
|
return matches.group(0) if default == None else str(default)
|
|
|
|
return self.re_string_variables.sub(callback, string)
|
|
|
|
def default_text(self, default = None):
|
|
return default if isinstance(default, str) else self.__default_text
|
|
|
|
def __i18n_get(self, texts, default):
|
|
|
|
texts = self.get_texts(texts)
|
|
|
|
if len(texts):
|
|
|
|
languages = (self.__language, self.__default_language) + tuple(self.__sentences.keys())
|
|
|
|
for i, language in enumerate(languages):
|
|
if language and isinstance(language, str) and language in self.__sentences and languages.index(language) == i:
|
|
for key in texts:
|
|
if key in self.__sentences[language]:
|
|
return self.__sentences[language][key]
|
|
return texts[0]
|
|
return self.default_text(default)
|
|
|
|
def i18n(self, texts, variables = None, default = None):
|
|
|
|
text = self.__i18n_get(texts, default)
|
|
|
|
return self.string_variables("".join(text) if isinstance(text, (list, tuple)) else str(text), variables, default)
|
|
|
|
def fix_path(self, path):
|
|
|
|
error = (
|
|
1 << 0 if path == None else
|
|
1 << 1 if not isinstance(path, str) else
|
|
0) << 1
|
|
|
|
if not error:
|
|
try:
|
|
path = self.re_slashes.sub(self.__slash, path)
|
|
except Exception as exception:
|
|
error |= 1 << 0
|
|
self.exception(exception, self.__show_fix_path_exception and "mem_web_fix_path_exception", {
|
|
"path" : path
|
|
})
|
|
|
|
return (path, error)
|
|
|
|
def get_absolute_path(self, path):
|
|
|
|
error = (
|
|
1 << 0 if path == None else
|
|
1 << 1 if not isinstance(path, str) else
|
|
0) << 1
|
|
absolute_path = None
|
|
|
|
if not error:
|
|
for root in self.__root_paths:
|
|
current_path = self.fix_path(root + self.__slash + path)[0]
|
|
if path_exists(current_path):
|
|
absolute_path = current_path
|
|
break
|
|
if absolute_path == None:
|
|
error |= 1 << 3
|
|
|
|
return (absolute_path, error)
|
|
|
|
def load_file(self, path, mode = "r"):
|
|
|
|
absolute_path, error = self.get_absolute_path(path)
|
|
error |= (
|
|
1 << 0 if mode == None else
|
|
1 << 1 if not isinstance(mode, str) else
|
|
1 << 2 if not mode else
|
|
1 << 3 if mode not in ("r", "rb") else
|
|
0) << 4
|
|
response = None
|
|
|
|
if not error:
|
|
try:
|
|
with open(absolute_path, mode) as data:
|
|
response = data.read()
|
|
except Exception as exception:
|
|
error |= 1 << 0
|
|
self.exception(exception, self.__show_load_file_exception and "mem_web_load_file_exception", {
|
|
"path" : path,
|
|
"mode" : mode
|
|
})
|
|
|
|
return (response, error)
|
|
|
|
def add_print_types(self, inputs, show_errors = None):
|
|
if isinstance(inputs, (list, tuple)):
|
|
|
|
has_show_errors = isinstance(show_errors, bool)
|
|
|
|
for subset in inputs:
|
|
|
|
error = (
|
|
1 << 0 if subset == None else
|
|
1 << 1 if not isinstance(subset, (list, tuple)) else
|
|
1 << 2 if not len(subset) else
|
|
0) << 1
|
|
|
|
if not error:
|
|
|
|
i = None
|
|
|
|
for _type in subset:
|
|
|
|
suberror = (
|
|
1 << 0 if _type == None else
|
|
1 << 1 if not isinstance(_type, str) else
|
|
1 << 2 if not _type else
|
|
1 << 3 if not self.re_print_type.match(_type) else
|
|
0) << 1
|
|
|
|
if suberror:
|
|
if not error & 1 << 4:
|
|
error |= 1 << 4
|
|
continue
|
|
|
|
if i == None:
|
|
for j, print_types in enumerate(self.__print_types):
|
|
if _type in print_types:
|
|
i = j
|
|
break
|
|
if i == None:
|
|
i = len(self.__print_types)
|
|
self.__print_types += [[_type]]
|
|
continue
|
|
|
|
self.__print_types[i] += [_type]
|
|
|
|
elif isinstance(inputs, str):
|
|
if self.re_json.match(inputs.strip()):
|
|
try:
|
|
self.add_print_types(json_decode(inputs), show_errors)
|
|
except Exception as exception:
|
|
self.exception(exception, self.__show_add_print_types_exception and "mem_web_add_print_types_exception")
|
|
else:
|
|
|
|
json = self.load_file(inputs)[0]
|
|
|
|
if json and self.re_json.match(json.strip()):
|
|
try:
|
|
self.add_print_types(json_decode(json), show_errors)
|
|
except Exception as exception:
|
|
self.exception(exception, self.__show_add_print_types_exception and "mem_web_add_print_types_exception")
|
|
|
|
def get_print_type(self, _type):
|
|
|
|
error = (
|
|
1 << 0 if _type == None else
|
|
1 << 1 if not isinstance(_type, str) else
|
|
1 << 2 if not _type else
|
|
0) << 1
|
|
|
|
if not error:
|
|
_type = _type.lower()
|
|
for print_types in self.__print_types:
|
|
if _type in print_types:
|
|
return print_types[0]
|
|
return self.__print_types[0][0]
|
|
|
|
@staticmethod
|
|
def get_action_data(i = 1):
|
|
|
|
stack = get_stack()[1 + i]
|
|
|
|
return {
|
|
"file" : stack.filename,
|
|
"method" : stack.function,
|
|
"line" : stack.lineno
|
|
}
|
|
|
|
@staticmethod
|
|
def base64_encode(data):
|
|
return base64_encode(bytes(
|
|
data.to_bytes((data.bit_length() + 7) // 8, byteorder = "big") if isinstance(data, (int, float)) else
|
|
data.encode() if isinstance(data, str) else
|
|
data.encode("ascii"))).decode()
|
|
|
|
def _print(self, _type, message, inputs = None, i = 1):
|
|
|
|
basic_type = self.get_print_type(_type)
|
|
|
|
if basic_type.strip() not in self.__debug_print_types:
|
|
return
|
|
|
|
own = {
|
|
"raw_type" : _type,
|
|
"basic_type" : self.get_print_type(_type),
|
|
"i18n" : message,
|
|
**self.get_action_data(i),
|
|
**self.join_dictionaries(inputs)
|
|
}
|
|
date = datetime.datetime.now()
|
|
|
|
own["type"] = own["basic_type"].upper()
|
|
|
|
for key in ("year", "month", "day", "hour", "minute", "second"):
|
|
|
|
k = "i" if key == "minute" else key[0]
|
|
|
|
own[key] = getattr(date, key)
|
|
own[k] = own[key] % 100
|
|
own[k + k] = ("00" + str(own[k]))[-2:]
|
|
|
|
own["yyyy"] = own["year"]
|
|
|
|
own["message"] = self.i18n(message, own) + (own["end"] if own["type"] in ("ERRO", "EXCE") and "end" in own else "")
|
|
|
|
print(self.string_variables(self.__print_format, own))
|
|
|
|
@staticmethod
|
|
def is_key(key):
|
|
return key and isinstance(key, str) and key[0:7] != "MemWeb_" and key[-4:] != "_end" and key[-6:] != "_start"
|
|
|
|
def settings_add(self, inputs, overwrite = None, show_errors = None):
|
|
if isinstance(inputs, (list, tuple)):
|
|
for subinputs in inputs:
|
|
self.settings_add(subinputs, overwrite)
|
|
elif isinstance(inputs, dict):
|
|
|
|
has_show_errors = isinstance(show_errors, bool)
|
|
|
|
if not isinstance(overwrite, bool):
|
|
overwrite = self.__settings_overwrite
|
|
|
|
for key, value in inputs.items():
|
|
if not self.is_key(key):
|
|
continue
|
|
|
|
error = (
|
|
1 << 0 if key == None else
|
|
1 << 1 if not isinstance(key, str) else
|
|
1 << 2 if not key else
|
|
1 << 3 if not self.re_key.match(key) else
|
|
1 << 4 if not overwrite and key in self.__settings else
|
|
0) << 1
|
|
|
|
if self.validate(
|
|
error,
|
|
(
|
|
"exception",
|
|
"key_null",
|
|
"key_not_string",
|
|
"key_empty",
|
|
"key_not_key",
|
|
"key_exists"
|
|
),
|
|
{
|
|
"key" : key,
|
|
"value" : value
|
|
},
|
|
(show_errors if has_show_errors else self.__show_settings_add_error) and "mem_web_settings_add_error",
|
|
(show_errors if has_show_errors else True) and self.__show_settings_add_ok and "mem_web_settings_add_ok"
|
|
):
|
|
self.__settings[key] = value
|
|
|
|
elif isinstance(inputs, str):
|
|
if self.re_json.match(inputs.strip()):
|
|
try:
|
|
self.settings_add(json_decode(inputs), overwrite, show_errors)
|
|
except Exception as exception:
|
|
self.__show_settings_add_exception and self.exception(exception, "mem_web_settings_add_exception")
|
|
else:
|
|
|
|
json, error = self.load_file(inputs)
|
|
|
|
if not error and json and self.re_json.match(json.strip()):
|
|
try:
|
|
self.settings_add(json_decode(json), overwrite, show_errors)
|
|
except Exception as exception:
|
|
self.__show_settings_add_exception and self.exception(exception, "mem_web_settings_add_exception")
|
|
|
|
def i18n_add(self, inputs, overwrite = None, show_errors = None):
|
|
if isinstance(inputs, (list, tuple)):
|
|
for subinputs in inputs:
|
|
self.i18n_add(subinputs, overwrite)
|
|
elif isinstance(inputs, dict):
|
|
|
|
has_show_errors = isinstance(show_errors, bool)
|
|
|
|
if not isinstance(overwrite, bool):
|
|
overwrite = self.__i18n_overwrite
|
|
|
|
for language, sentences in inputs.items():
|
|
|
|
error = (
|
|
((
|
|
1 << 0 if language == None else
|
|
1 << 1 if not isinstance(language, str) else
|
|
1 << 2 if not language else
|
|
1 << 3 if not self.re_key.match(language) else
|
|
0) << 0) |
|
|
((
|
|
1 << 0 if sentences == None else
|
|
1 << 1 if not isinstance(sentences, dict) else
|
|
1 << 2 if not len(sentences) else
|
|
0) << 4) |
|
|
0) << 1
|
|
|
|
if not error:
|
|
if language not in self.__sentences:
|
|
self.__sentences[language] = {}
|
|
for key, sentence in sentences.items():
|
|
if not self.is_key(key):
|
|
continue
|
|
|
|
suberror = (
|
|
((
|
|
1 << 0 if key == None else
|
|
1 << 1 if not isinstance(key, str) else
|
|
1 << 2 if not key else
|
|
1 << 3 if not self.re_key.match(key) else
|
|
1 << 5 if not overwrite and key in self.__sentences[language] else
|
|
0) << 0) |
|
|
((
|
|
1 << 0 if sentence == None else
|
|
1 << 1 if not isinstance(sentence, (str, list, tuple)) else
|
|
0) << 5) |
|
|
0) << 1
|
|
|
|
if self.validate(
|
|
suberror,
|
|
(
|
|
"exception",
|
|
"key_null",
|
|
"key_not_string",
|
|
"key_empty",
|
|
"key_not_key",
|
|
"key_exists",
|
|
"sentence_null",
|
|
"sentence_not_string"
|
|
),
|
|
{
|
|
"language" : language,
|
|
"key" : key,
|
|
"sentence" : sentence
|
|
},
|
|
(show_errors if has_show_errors else self.__show_i18n_add_error) and "mem_web_i18n_add_key_error",
|
|
(show_errors if has_show_errors else True) and self.__show_i18n_add_ok and "mem_web_i18n_add_key_ok"
|
|
):
|
|
self.__sentences[language][key] = sentence
|
|
elif error & 1 << 8:
|
|
error |= 1 << 8
|
|
|
|
self.validate(
|
|
error,
|
|
(
|
|
"exception",
|
|
"language_null",
|
|
"language_not_string",
|
|
"language_empty",
|
|
"language_not_key",
|
|
"sentences_null",
|
|
"sentences_not_dictionary",
|
|
"sentences_empty",
|
|
"sentences_with_errors"
|
|
),
|
|
{
|
|
"language" : language
|
|
},
|
|
(show_errors if has_show_errors else self.__show_i18n_add_error) and "mem_web_i18n_add_error",
|
|
(show_errors if has_show_errors else True) and self.__show_i18n_add_ok and "mem_web_i18n_add_ok"
|
|
)
|
|
|
|
elif isinstance(inputs, str):
|
|
if self.re_json.match(inputs.strip()):
|
|
try:
|
|
self.i18n_add(json_decode(inputs), overwrite, show_errors)
|
|
except Exception as exception:
|
|
self.__show_i18n_add_exception and self.exception(exception, "mem_web_i18n_add_json_exception")
|
|
else:
|
|
|
|
json, error = self.load_file(inputs)
|
|
|
|
if not error and json and self.re_json.match(json.strip()):
|
|
try:
|
|
self.i18n_add(json_decode(json), overwrite, show_errors)
|
|
except Exception as exception:
|
|
self.__show_i18n_add_exception and self.exception(exception, "mem_web_i18n_add_file_exception")
|
|
|
|
def validate(self, error, messages = [], variables = None, error_message = None, ok_message = None):
|
|
|
|
if error:
|
|
if error_message:
|
|
|
|
own = {
|
|
"code" : self.base64_encode(error),
|
|
"raw_code" : error,
|
|
"end" : "",
|
|
**{key : value for subset in (variables if isinstance(variables, (list, tuple)) else (variables,)) for key, value in (subset if isinstance(subset, dict) else {}).items()}
|
|
}
|
|
i = 0
|
|
l = len(messages)
|
|
|
|
while 1 << i <= error:
|
|
if (1 << i) & error:
|
|
own["end"] += "\n [" + str(i) + "] " + self.i18n(messages[i] if i < l and messages[i] else "error_message_" + str(i), own)
|
|
i += 1
|
|
|
|
self._print("error", error_message, own, 2)
|
|
|
|
return False
|
|
|
|
ok_message and self._print("ok", ok_message, variables)
|
|
|
|
return True
|
|
|
|
def exception(self, exception, message = None, variables = None, i = 1):
|
|
|
|
lines = extract_traceback(exception.__traceback__).format()
|
|
line_matches = self.re_exception_line.match(lines[-1])
|
|
data = {
|
|
**{key : value for subset in (variables if isinstance(variables, (list, tuple)) else (variables,)) for key, value in (subset if isinstance(subset, dict) else {}).items()},
|
|
**self.get_action_data(1),
|
|
"lines" : "",
|
|
"exception_message" : str(exception),
|
|
"method" : line_matches.group(3),
|
|
"line" : line_matches.group(2),
|
|
"file" : line_matches.group(1)
|
|
}
|
|
|
|
for block in trace_format_stack()[:-2] + lines:
|
|
if block:
|
|
data["lines"] += "\n " + self.re_break_lines.split(block.strip())[0]
|
|
|
|
data["end"] = self.string_variables(self.__exception_format, data)
|
|
|
|
message and self._print("exception", message, data, i + 1)
|
|
|
|
|
|
@classmethod
|
|
def get_value(self, keys, inputs, default = None):
|
|
|
|
keys = self.get_keys(keys)
|
|
|
|
if len(keys):
|
|
for subset in self.get_dictionaries(inputs):
|
|
for key in keys:
|
|
if key in subset:
|
|
return subset[key]
|
|
return default
|
|
|
|
@staticmethod
|
|
def timestamp():
|
|
return int(timestamp() * 1000) |