#!/usr/bin/env python # -*- coding: utf-8 -*- from os.path import dirname as directory_name from os.path import abspath as path_absolute from os.path import exists as path_exists from re import compile as RECompile from json import loads as json_decode from inspect import stack as get_stack from traceback import format_stack as trace_format_stack from traceback import extract_tb as extract_traceback import datetime if "common" not in globals(): class common:pass class MemWeb: __default_settings = common.DefaultData.settings re_key = RECompile(r'^[a-zA-Z0-9_]+$') re_string_variables = RECompile(r'\{([^\{\}]+)\}') re_print_type = RECompile(r' ?[a-z0-9_]+ ?') re_json = RECompile(r'^(\[(.+|[\r\n]+)*\]|\{(.+|[\r\n]+)*\})$') re_slashes = RECompile(r'[\/\\\\]+') re_break_lines = RECompile(r'\r\n|[\r\n]') re_exception_line = RECompile(r'^\s*File "([^"]+)", line ([0-9]+), in ([^\n]+)(.*|[\r\n]*)*$') def __set_basic_data(self): self.__nulls = self.settings("nulls", None, False, False) self.__default_value = self.settings("default_value", None, None, True) self.__exception_format = self.settings("exception_format") self.__language = self.settings(("language", "default_language")) self.__default_language = self.settings(("default_language", "language")) self.__default_text = self.settings("default_text") self.__print_format = self.settings("print_format") self.__settings_overwrite = self.settings(("settings_overwrite", "overwrite")) self.__debug_print_types = self.settings("debug_print_types") self.__show_settings_add_ok = self.settings(("show_settings_add_ok_message", "show_ok_message")) self.__show_settings_add_error = self.settings(("show_settings_add_error_message", "show_error_message")) self.__show_settings_add_exception = self.settings(("show_settings_add_exception_message", "show_exception_message")) self.__show_fix_path_exception = self.settings(("show_fix_path_exception_message", "show_exception_message")) self.__show_load_file_exception = self.settings(("show_load_file_exception_message", "show_exception_message")) self.__show_add_print_types_exception = self.settings(("show_add_print_types_exception_message", "show_exception_message")) for key in ("default_print_types", "print_types"): self.add_print_types(self.settings(key)) def __init__(self, inputs = None): self.__sentences = common.DefaultData.i18n self.__settings = {} self.__inputs = inputs self.__print_types = [] self.__root_paths = [path_absolute(directory_name(__file__))] self.__slash = '/' if '/' in self.__root_paths[0] else '\\' for i in range(3): self.__root_paths += [self.__root_paths[0 + i] + "/.."] self.__root_paths += [""] self.__root_paths.reverse() self.__set_basic_data() self._print("info", "mem_web_building") self.threads = MemWeb.Threads(self, inputs) self.__started = False self._print("info", "mem_web_settings_loading") for key in ("default_settings_files", "settings_files"): self.settings_add(self.settings(key), True) self.__set_basic_data() self._print("ok", "mem_web_settings_loaded") self.__i18n_overwrite = self.settings(("i18n_overwrite", "overwrite")) self.__show_i18n_add_ok = self.settings(("show_i18n_add_ok_message", "show_ok_message")) self.__show_i18n_add_error = self.settings(("show_i18n_add_error_message", "show_error_message")) self.__show_i18n_add_exception = self.settings(("show_i18n_add_exception_message", "show_exception_message")) self.__show_launch_asynchronous_ok = self.settings(("show_launch_asynchronous_ok_message", "show_ok_message")) self.__show_launch_asynchronous_error = self.settings(("show_launch_asynchronous_error_message", "show_error_message")) self.__show_launch_asynchronous_exception = self.settings(("show_launch_asynchronous_exception_message", "show_exception_message")) self.__show_launch_asynchronous_item_exception = self.settings(("show_launch_asynchronous_exception_item_message", "show_exception_message")) self._print("info", "mem_web_i18n_loading") for key in ("default_i18n_files", "i18n_files"): self.i18n_add(self.settings(key), True) self._print("ok", "mem_web_i18n_loaded") self._print("ok", "mem_web_built") self.settings("autostart") and self.start() @staticmethod def reduce(inputs, callback, value = None): error = ( (( 1 << 0 if inputs == None else 1 << 1 if not isinstance(inputs, (list, tuple)) else 0) << 0) | (( 1 << 0 if callback == None else 1 << 1 if not callable(callback) else 0) << 2) | 0) << 1 if not error: try: for i, item in enumerate(inputs): value = callback(value, item, i, inputs) except: error |= 1 << 0 return (value, error) @staticmethod def is_object(item): return not isinstance(item, str) and " object " in str(item) def __launch_asynchronous(self, object, items, method, end, i, errors): if i < len(items): next = lambda error, *_:self.__launch_asynchronous(object, items, method, end, i + 1, errors + [error]) error = 0 if hasattr(object, items[i]) else 1 << 2 if not error: level = getattr(object, items[i]) error |= 0 if hasattr(level, method) else 1 << 3 if not error: callback = getattr(level, method) error |= ( 1 << 0 if callback == None else 1 << 1 if not callable(callback) else 0) << 4 if not error: try: return callback(next) except Exception as exception: self.exception(exception, self.__show_launch_asynchronous_item_exception and "mem_web_launch_asynchronous_item_exception", { "i" : i, "item" : items[i], "method" : method, "error" : error }) error |= 1 << 0 next(error) else: self.execute(end, errors) def launch_asynchronous(self, object, items, method, end, show_errors = None): error = ( (( 1 << 0 if object == None else 1 << 1 if not self.is_object(object) else 0) << 0) | (( 1 << 0 if items == None else 1 << 1 if not isinstance(items, (list, tuple)) else 1 << 2 if not len(items) else self.reduce( [( 1 << 0 if item == None else 1 << 1 if not isinstance(item, str) else 1 << 2 if not item else 1 << 3 if not self.re_key.match(item) else 0) for item in items], lambda a, b, *_: a | b, 0)[0] << 3) << 2) | (( 1 << 0 if method == None else 1 << 1 if not isinstance(method, str) else 1 << 2 if not method else 1 << 3 if not self.re_key.match(method) else 0) << 9) | (( 1 << 0 if end == None else 1 << 1 if not callable(end) else 0) << 13) | 0) << 1 has_show_errors = isinstance(show_errors, bool) if not error: try: self.__launch_asynchronous(object, items, method, end, 0, []) except Exception as exception: self.exception(exception, self.__show_launch_asynchronous_exception and "web_mem_launch_asynchronous_exception", { "method" : method }) error |= 1 << 0 self.validate( error, ( "exception", "object_null", "object_not_class_object", "items_null", "items_not_list", "items_empty", "some_item_null", "some_item_not_string", "some_item_empty", "some_item_bad_characters", "method_null", "method_not_string", "method_empty", "method_bad_characters", "end_null", "end_not_function" ), { "method" : method }, (show_errors if has_show_errors else self.__show_launch_asynchronous_error) and "web_mem_launch_asynchronous_error", (show_errors if has_show_errors else True) and self.__show_launch_asynchronous_ok and "web_mem_launch_asynchronous_ok" ) return error def __end_start(self, errors, callback): message = ("ok", "mem_web_started") ok = False for error in errors: if ( not error if isinstance(error, bool) else error if isinstance(error, int) else False): message = ("warn", "mem_web_started_with_errors") ok = False break self._print(*message) MemWeb.execute(callback, ok, errors) def start(self, callback = None): self._print("info", "mem_web_starting") if self.__started: self._print("warn", "mem_web_already_started") MemWeb.execute(callback, False) return False self.__started = True self.launch_asynchronous(self, ("threads",), "start", lambda errors:self.__end_start(errors, callback)) return True @classmethod def get_keys(self, keys): keys = [key.strip() for key in (keys if isinstance(keys, (list, tuple)) else (keys,)) if isinstance(key, str)] return [key for i, key in enumerate(keys) if key and keys.index(key) == i and self.re_key.match(key)] @staticmethod def get_dictionaries(inputs): return ( tuple([ subset for subset in inputs if isinstance(subset, dict) ]) if isinstance(inputs, (list, tuple)) else (inputs,) if isinstance(inputs, dict) else tuple()) @classmethod def join_dictionaries(self, inputs): return {key : value for subset in self.get_dictionaries(inputs) for key, value in subset.items()} @staticmethod def execute(callback, *parameters): callback and callable(callback) and callback(*parameters) @staticmethod def get_texts(texts): if not isinstance(texts, (list, tuple)): texts = (texts,) return [text for i, text in enumerate(texts) if isinstance(text, str) and texts.index(text) == i] def nulls(self, nulls = None): return nulls if isinstance(nulls, bool) else self.__nulls def default_value(self, default = None, nulls = None): return default if default != None or self.nulls(nulls) else self.__default_value def settings(self, keys, inputs = None, default = None, nulls = None): keys = self.get_keys(keys) if len(keys): nulls = self.nulls(nulls) for subset in self.get_dictionaries(inputs) + (self.__inputs, self.__settings, self.__default_settings): if isinstance(subset, dict): for key in keys: if key in subset and (nulls or subset[key] != None): return subset[key] return self.default_value(default, nulls) @classmethod def string_variables(self, string, variables = None, default = None): variables = self.get_dictionaries(variables) def callback(matches): key = matches.group(1) for subset in variables: if key in subset: return str(subset[key]) return matches.group(0) if default == None else str(default) return self.re_string_variables.sub(callback, string) def default_text(self, default = None): return default if isinstance(default, str) else self.__default_text def __i18n_get(self, texts, default): texts = self.get_texts(texts) if len(texts): languages = (self.__language, self.__default_language) + tuple(self.__sentences.keys()) for i, language in enumerate(languages): if language and isinstance(language, str) and language in self.__sentences and languages.index(language) == i: for key in texts: if key in self.__sentences[language]: return self.__sentences[language][key] return texts[0] return self.default_text(default) def i18n(self, texts, variables = None, default = None): text = self.__i18n_get(texts, default) return self.string_variables("".join(text) if isinstance(text, (list, tuple)) else str(text), variables, default) def fix_path(self, path): error = ( 1 << 0 if path == None else 1 << 1 if not isinstance(path, str) else 0) << 1 if not error: try: path = self.re_slashes.sub(self.__slash, path) except Exception as exception: error |= 1 << 0 self.exception(exception, self.__show_fix_path_exception and "mem_web_fix_path_exception", { "path" : path }) return (path, error) def get_absolute_path(self, path): error = ( 1 << 0 if path == None else 1 << 1 if not isinstance(path, str) else 0) << 1 absolute_path = None if not error: for root in self.__root_paths: current_path = self.fix_path(root + self.__slash + path)[0] if path_exists(current_path): absolute_path = current_path break if absolute_path == None: error |= 1 << 3 return (absolute_path, error) def load_file(self, path, mode = "r"): absolute_path, error = self.get_absolute_path(path) error |= ( 1 << 0 if mode == None else 1 << 1 if not isinstance(mode, str) else 1 << 2 if not mode else 1 << 3 if mode not in ("r", "rb") else 0) << 4 response = None if not error: try: with open(absolute_path, mode) as data: response = data.read() except Exception as exception: error |= 1 << 0 self.exception(exception, self.__show_load_file_exception and "mem_web_load_file_exception", { "path" : path, "mode" : mode }) return (response, error) def add_print_types(self, inputs, show_errors = None): if isinstance(inputs, (list, tuple)): has_show_errors = isinstance(show_errors, bool) for subset in inputs: error = ( 1 << 0 if subset == None else 1 << 1 if not isinstance(subset, (list, tuple)) else 1 << 2 if not len(subset) else 0) << 1 if not error: i = None for _type in subset: suberror = ( 1 << 0 if _type == None else 1 << 1 if not isinstance(_type, str) else 1 << 2 if not _type else 1 << 3 if not self.re_print_type.match(_type) else 0) << 1 if suberror: if not error & 1 << 4: error |= 1 << 4 continue if i == None: for j, print_types in enumerate(self.__print_types): if _type in print_types: i = j break if i == None: i = len(self.__print_types) self.__print_types += [[_type]] continue self.__print_types[i] += [_type] elif isinstance(inputs, str): if self.re_json.match(inputs.strip()): try: self.add_print_types(json_decode(inputs), show_errors) except Exception as exception: self.exception(exception, self.__show_add_print_types_exception and "mem_web_add_print_types_exception") else: json = self.load_file(inputs)[0] if json and self.re_json.match(json.strip()): try: self.add_print_types(json_decode(json), show_errors) except Exception as exception: self.exception(exception, self.__show_add_print_types_exception and "mem_web_add_print_types_exception") def get_print_type(self, _type): error = ( 1 << 0 if _type == None else 1 << 1 if not isinstance(_type, str) else 1 << 2 if not _type else 0) << 1 if not error: _type = _type.lower() for print_types in self.__print_types: if _type in print_types: return print_types[0] return self.__print_types[0][0] @staticmethod def get_action_data(i = 1): stack = get_stack()[1 + i] return { "file" : stack.filename, "method" : stack.function, "line" : stack.lineno } def _print(self, _type, message, inputs = None, i = 1): basic_type = self.get_print_type(_type) if basic_type.strip() not in self.__debug_print_types: return own = { "raw_type" : _type, "basic_type" : self.get_print_type(_type), "i18n" : message, **self.get_action_data(i), **self.join_dictionaries(inputs) } date = datetime.datetime.now() own["type"] = own["basic_type"].upper() for key in ("year", "month", "day", "hour", "minute", "second"): k = "i" if key == "minute" else key[0] own[key] = getattr(date, key) own[k] = own[key] % 100 own[k + k] = ("00" + str(own[k]))[-2:] own["yyyy"] = own["year"] own["message"] = self.i18n(message, own) + (own["end"] if own["type"] in ("ERRO", "EXCE") and "end" in own else "") print(self.string_variables(self.__print_format, own)) @staticmethod def is_key(key): return key and isinstance(key, str) and key[0:7] != "MemWeb_" and key[-4:] != "_end" and key[-6:] != "_start" def settings_add(self, inputs, overwrite = None, show_errors = None): if isinstance(inputs, (list, tuple)): for subinputs in inputs: self.settings_add(subinputs, overwrite) elif isinstance(inputs, dict): has_show_errors = isinstance(show_errors, bool) if not isinstance(overwrite, bool): overwrite = self.__settings_overwrite for key, value in inputs.items(): if not self.is_key(key): continue error = ( 1 << 0 if key == None else 1 << 1 if not isinstance(key, str) else 1 << 2 if not key else 1 << 3 if not self.re_key.match(key) else 1 << 4 if not overwrite and key in self.__settings else 0) << 1 if self.validate( error, ( "exception", "key_null", "key_not_string", "key_empty", "key_not_key", "key_exists" ), { "key" : key, "value" : value }, (show_errors if has_show_errors else self.__show_settings_add_error) and "mem_web_settings_add_error", (show_errors if has_show_errors else True) and self.__show_settings_add_ok and "mem_web_settings_add_ok" ): self.__settings[key] = value elif isinstance(inputs, str): if self.re_json.match(inputs.strip()): try: self.settings_add(json_decode(inputs), overwrite, show_errors) except Exception as exception: self.__show_settings_add_exception and self.exception(exception, "mem_web_settings_add_exception") else: json, error = self.load_file(inputs) if not error and json and self.re_json.match(json.strip()): try: self.settings_add(json_decode(json), overwrite, show_errors) except Exception as exception: self.__show_settings_add_exception and self.exception(exception, "mem_web_settings_add_exception") def i18n_add(self, inputs, overwrite = None, show_errors = None): if isinstance(inputs, (list, tuple)): for subinputs in inputs: self.i18n_add(subinputs, overwrite) elif isinstance(inputs, dict): has_show_errors = isinstance(show_errors, bool) if not isinstance(overwrite, bool): overwrite = self.__i18n_overwrite for language, sentences in inputs.items(): error = ( (( 1 << 0 if language == None else 1 << 1 if not isinstance(language, str) else 1 << 2 if not language else 1 << 3 if not self.re_key.match(language) else 0) << 0) | (( 1 << 0 if sentences == None else 1 << 1 if not isinstance(sentences, dict) else 1 << 2 if not len(sentences) else 0) << 4) | 0) << 1 if not error: if language not in self.__sentences: self.__sentences[language] = {} for key, sentence in sentences.items(): if not self.is_key(key): continue suberror = ( (( 1 << 0 if key == None else 1 << 1 if not isinstance(key, str) else 1 << 2 if not key else 1 << 3 if not self.re_key.match(key) else 1 << 5 if not overwrite and key in self.__sentences[language] else 0) << 0) | (( 1 << 0 if sentence == None else 1 << 1 if not isinstance(sentence, (str, list, tuple)) else 0) << 5) | 0) << 1 if self.validate( suberror, ( "exception", "key_null", "key_not_string", "key_empty", "key_not_key", "key_exists", "sentence_null", "sentence_not_string" ), { "language" : language, "key" : key, "sentence" : sentence }, (show_errors if has_show_errors else self.__show_i18n_add_error) and "mem_web_i18n_add_key_error", (show_errors if has_show_errors else True) and self.__show_i18n_add_ok and "mem_web_i18n_add_key_ok" ): self.__sentences[language][key] = sentence elif error & 1 << 8: error |= 1 << 8 self.validate( error, ( "exception", "language_null", "language_not_string", "language_empty", "language_not_key", "sentences_null", "sentences_not_dictionary", "sentences_empty", "sentences_with_errors" ), { "language" : language }, (show_errors if has_show_errors else self.__show_i18n_add_error) and "mem_web_i18n_add_error", (show_errors if has_show_errors else True) and self.__show_i18n_add_ok and "mem_web_i18n_add_ok" ) elif isinstance(inputs, str): if self.re_json.match(inputs.strip()): try: self.i18n_add(json_decode(inputs), overwrite, show_errors) except Exception as exception: self.__show_i18n_add_exception and self.exception(exception, "mem_web_i18n_add_json_exception") else: json, error = self.load_file(inputs) if not error and json and self.re_json.match(json.strip()): try: self.i18n_add(json_decode(json), overwrite, show_errors) except Exception as exception: self.__show_i18n_add_exception and self.exception(exception, "mem_web_i18n_add_file_exception") def validate(self, error, messages = [], variables = None, error_message = None, ok_message = None): if error: if error_message: own = { "code" : error, "end" : "", **{key : value for subset in (variables if isinstance(variables, (list, tuple)) else (variables,)) for key, value in (subset if isinstance(subset, dict) else {}).items()} } i = 0 l = len(messages) while 1 << i <= error: if 1 << i & error: own["end"] += "\n [" + str(i) + "] " + self.i18n(messages[i] if i < l and messages[i] else "error_message_" + str(i), own) self._print("error", error_message, own, 2) return False ok_message and self._print("ok", ok_message, variables) return True def exception(self, exception, message = None, variables = None, i = 1): lines = extract_traceback(exception.__traceback__).format() line_matches = self.re_exception_line.match(lines[-1]) data = { **{key : value for subset in (variables if isinstance(variables, (list, tuple)) else (variables,)) for key, value in (subset if isinstance(subset, dict) else {}).items()}, **self.get_action_data(1), "lines" : "", "exception_message" : str(exception), "method" : line_matches.group(3), "line" : line_matches.group(2), "file" : line_matches.group(1) } data["end"] = self.string_variables(self.__exception_format, data) for block in trace_format_stack()[:-2] + lines: if block: data["lines"] += "\n " + self.re_break_lines.split(block.strip())[0] message and self._print("exception", message, data, i + 1) @classmethod def get_value(self, keys, inputs, default = None): keys = self.get_keys(keys) if len(keys): for subset in self.get_dictionaries(inputs): for key in keys: if key in subset: return subset[key] return default