LibreTranslatePlus/Python/Models/LibreTranslatePlus.Models.JSON.py

360 lines
16 KiB
Python
Raw Normal View History

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from re import compile as re_compile
from json import dumps as json_encode
from json import loads as json_decode
from random import randint as random_integer
if "LibreTranslatePlus" not in globals():
class LibreTranslatePlus:pass
# KyMAN I18N JSON Files
class Anonymous(LibreTranslatePlus.Abstracts.Models): # LibreTranslatePlus.Models.JSON
re_encode = re_compile(r'[\']')
re_decode = re_compile(r'[`]')
def _build(self, _input):
self.__paths = self.settings("paths", _input)
self.__cache = []
self.__main_language = self.settings(("main", "main_language"), _input)
self.__print_variables["main_language"] = self.__main_language
self.__main_key_language = self.ltp.i18n.get_key(self.__main_language)[0]
self.__connection_key = self.settings(("connection_key", "connection"), _input)
self.__separators = self.settings(("models_kjson_separators", "separators"), _input)
self.__indent = self.settings(("models_kjson_indent", "indent"), _input)
self.__ascii = self.settings(("models_kjson_ascii", "ascii"), _input)
self.__cache_directory = self.ltp.get_root_directory() + "/" + self.settings(("models_kjson_cache_directory", "cache_directory"), _input)
protected_fragments = self.settings(("models_kjson_protected_fragments", "protected_fragments"), _input)
default_variables_pattern = self.settings(("models_kjson_default_variables_pattern", "default_variables_pattern"), _input)
self.__key_name = self.settings(("models_kjson_key_name", "key_name"), _input)
self.__splitted_sentences_length = self.settings(("models_kjson_splitted_sentences_minimum_length", "splitted_sentences_minimum_length"), _input)
self.__fragment_maximum_length = self.settings(("models_kjson_fragment_maximum_length", "fragment_maximum_length"), _input)
self.__default_language_key = self.settings(("models_kjson_default_language_key", "default_language_key"), _input)
self.__language_names_keys = {}
self.__re_variables = (
protected_fragments + r'|' if protected_fragments and isinstance(protected_fragments, str) else
r'|'.join(protected_fragments) + r'|' if isinstance(protected_fragments, (list, tuple)) and len(protected_fragments) else
r''
) + (
default_variables_pattern if default_variables_pattern and isinstance(default_variables_pattern, str) else
r'|'.join(default_variables_pattern) if isinstance(default_variables_pattern, (list, tuple)) and len(default_variables_pattern) else
r''
)
self.__key_name_length = self.settings(("models_kjson_key_name_length", "key_name_length"), _input)
self.__key_name_characters = self.settings(("models_kjson_key_name_characters", "key_name_characters"), _input)
if self.__re_variables[-1] == "|":
self.__re_variables = self.__re_variables[:-1]
self.__re_variables = re_compile(self.__re_variables)
self.__allow_blocks = self.settings(("models_kjson_allow_blocks", "allow_blocks"), _input)
self.__show_load_cache_exception = self.settings(("models_kjson_show_load_cache_exception_message", "show_exception_message"))
self.__show_load_cache_error = self.settings(("models_kjson_show_load_cache_error_message", "show_error_message"))
self.__show_load_cache_ok = self.settings(("models_kjson_show_load_cache_ok_message", "show_ok_message"))
self.__show_load_cache_info = self.settings(("models_kjson_show_load_cache_info_message", "show_info_message"))
self.__show_key_processing = self.settings(("models_kjson_show_key_processing_message", "show_info_message"))
self.__show_only_key_with_changes = self.settings("models_kjson_show_only_key_with_changes")
self.__show_are_changes = self.settings(("models_kjson_show_are_changes_message", "show_info_message"))
self.__show_saving_language = self.settings(("models_kjson_show_saving_language_message", "show_info_message"))
self.__show_saving_cache = self.settings(("models_kjson_show_saving_cache_message", "show_info_message"))
(self.__paths, error) = self.ltp.models.validate_paths(self.__paths, self.__main_key_language, False)[0:2]
error |= (
(self.ltp.connections.has(self.__connection_key)[1] << 4) |
((
1 << 0 if self.__separators == None else
1 << 1 if not isinstance(self.__separators, (list, tuple)) else
1 << 2 if not len(self.__separators) else
1 << 3 if len(self.__separators) != 2 else
((
((
1 << 0 if self.__separators[0] == None else
1 << 1 if not isinstance(self.__separators[0], str) else
1 << 2 if not self.__separators[0] else
0) << 0) |
((
1 << 0 if self.__separators[1] == None else
1 << 1 if not isinstance(self.__separators[1], str) else
1 << 2 if not self.__separators[1] else
0) << 6) |
0) << 4) |
0) << 9) |
((
1 << 0 if self.__indent == None else
1 << 1 if not isinstance(self.__indent, int) else
1 << 2 if self.__indent < 0 else
0) << 19) |
((
1 << 0 if self.__ascii == None else
1 << 1 if not isinstance(self.__ascii, bool) else
0) << 22) |
((
1 << 0 if self.__main_language == None else
1 << 1 if not isinstance(self.__main_language, str) else
1 << 2 if not self.__main_language else
0) << 24) |
((
1 << 0 if self.__cache_directory == None else
1 << 1 if not isinstance(self.__cache_directory, str) else
1 << 2 if not self.__cache_directory else
# 1 << 3 if not LibreTranslatePlus.path_exists(self.__cache_directory) else
0) << 27) |
0)
self.__error |= error << 9
self.__error_messages += [
"paths_null",
"paths_bad_type",
"paths_empty",
"paths_with_errors",
"connection_key_null",
"connection_key_not_string",
"connection_key_empty",
"connection_key_unknown",
"connection_key_deleted",
"separators_null",
"separators_not_list",
"separators_empty",
"separators_not_2",
"separator_item_null",
"separator_item_not_string",
"separator_item_empty",
"separator_key_null",
"separator_key_not_string",
"separator_key_empty",
"indent_null",
"indent_not_integer",
"indent_lower_0",
"ascii_null",
"ascii_not_boolean",
"main_language_null",
"main_language_not_string",
"main_language_empty",
"cache_directory_null",
"cache_directory_not_string",
"cache_directory_empty",
"cache_directory_not_exists"
]
if not self.__error:
for i, path in enumerate(self.__paths):
path = self.__cache_directory + "/" + self.__key + "." + str(i) + "." + self.__main_key_language + ".json"
if LibreTranslatePlus.path_exists(path):
main_data = self.ltp.load_file(path)[0]
self.__cache.append({
"main_data" : main_data,
self.__main_key_language : json_decode(main_data)[self.__main_key_language]
})
self.__main_paths = []
if not self.__error:
for path in self.__paths:
self.__main_paths.append(LibreTranslatePlus.string_variables(path, {"key" : self.__main_key_language}))
self.__load_caches()
def __load_caches(self):
self.__show_load_cache_info and self._print("info", "ltp_models_kjson_loading_caches", self.__print_variables)
if self.__error:
self.__show_load_cache_error and self._print("warn", "ltp_models_kjson_loading_caches_error", self.__print_variables)
return
languages = self.ltp.i18n.get_keys()
for i, path in enumerate(self.__paths):
full_path = self.__cache_directory + "/" + self.__key + "." + str(i) + "." + self.__main_key_language + ".json"
error = (
1 << 0 if full_path == None else
1 << 1 if not isinstance(full_path, str) else
1 << 2 if not full_path else
1 << 3 if not LibreTranslatePlus.path_exists(full_path) else
0) << 1
if not error:
try:
main_data = self.ltp.load_file(full_path)[0]
self.__cache.append({
"main_data" : main_data,
self.__main_key_language : json_decode(main_data)[self.__main_key_language]
})
for language in languages:
if language != self.__main_key_language:
language_path = LibreTranslatePlus.string_variables(path, {"key" : language})
if LibreTranslatePlus.path_exists(language_path):
self.__show_load_cache_info and self._print("info", "ltp_models_kjson_load_cache_language", {
**self.__print_variables,
"language_path" : language_path,
"key" : language
})
self.__cache[i][language] = self.ltp.load_json(language_path)[0][language]
except Exception as exception:
error |= 1 << 0
self.exception(exception, self.__show_load_cache_exception and "ltp_models_kjson_load_cache_exception", {
**self.__print_variables,
"path" : full_path
})
self.validate(
error,
(
"exception",
"cache_path_null",
"cache_path_not_string",
"cache_path_empty",
"cache_path_not_exists"
),
{
**self.__print_variables,
"path" : full_path
},
self.__show_load_cache_error and "ltp_models_kjson_load_cache_error",
self.__show_load_cache_ok and "ltp_models_kjson_load_cache_ok"
)
self.__show_load_cache_ok and self._print("ok", "ltp_models_kjson_loaded_caches", self.__print_variables)
def __working(self, i):
thread = self.ltp.threads.get(i)
return thread and thread["working"]
def execute(self, thread_i):
connection = self.ltp.connections.get(self.__connection_key)[0]
for i, path in enumerate(self.__main_paths):
if not self.__working(thread_i):
break
if len(self.__cache) <= i:
self.__cache.append({"main_data" : None})
data = self.ltp.load_file(path)[0]
if not data or self.__cache[i]["main_data"] == data:
continue
json = self.ltp.json_decode(data, False)[0]
self.__cache[i]["main_data"] = data
if not json:
continue
self.__show_are_changes and self._print("info", "ltp_models_kjson_are_changes", self.__print_variables)
created = self.__main_key_language in self.__cache[i]
if not created:
self.__cache[i][self.__main_key_language] = {}
if len(list(json[self.__main_key_language].keys())):
sentences = json[self.__main_key_language].items()
total = len(tuple(json[self.__main_key_language].keys()))
languages = [(language["code"], language["key"]) for language in connection.languages()[0] if language["code"] != self.__main_language]
new_state = {}
j = 0
for key, sentence in sentences:
if not self.__working(thread_i):
break
changes = (
key not in self.__cache[i][self.__main_key_language] or
self.__cache[i][self.__main_key_language][key] != sentence
)
self.__show_key_processing and self._print("info", "ltp_models_kjson_key_processing", {
**self.__print_variables,
"i18n_key" : key,
"length" : (
len(sentence) if isinstance(sentence, str) else
len("".join(sentence)) if isinstance(sentence, (list, tuple)) else
None),
"total" : total,
"i" : j,
"percentaje" : round(100.0 * j / float(total), 2)
})
j += 1
for language_code, language_key in languages:
if not self.__working(thread_i):
break
if language_key not in new_state:
new_state[language_key] = {}
if language_key not in self.__language_names_keys:
self.__language_names_keys[language_key] = connection.translate(self.__key_name, self.__main_language, language_code)[0]
new_state[language_key][key] = (
sentence if not isinstance(sentence, (str, list, tuple)) else
self.__translate(connection, sentence, language_code, language_key) if (
changes or
language_key not in self.__cache[i] or
key not in self.__cache[i][language_key]
) else
self.__cache[i][language_key][key]
)
for language_code, language_key in languages:
if not self.__working(thread_i):
break
if self.__show_saving_language and (changes or not self.__show_only_key_with_changes):
self._print("info", "ltp_models_kjson_saving_language", {
**self.__print_variables,
"language_code" : language_code,
"language_key" : language_key
})
encoded = json_encode(
{language_key : new_state[language_key]},
separators = self.__separators,
indent = self.__indent,
ensure_ascii = self.__ascii
)
self.__cache[i][language_key] = new_state[language_key]
if self.__allow_blocks:
encoded = re_compile(r'("' + language_key + '" \: \{|_end" : null(, *)?)').sub(r'\1\n', encoded)
self.ltp.save_file(LibreTranslatePlus.string_variables(self.__paths[i], {
"key" : language_key
}), encoded)
self.__show_key_processing and self._print("ok", "ltp_models_kjson_keys_processed", self.__print_variables)
if self.__working(thread_i):
self.__show_saving_cache and self._print("info", "ltp_models_kjson_saving_cache", self.__print_variables)
self.ltp.save_file(self.__cache_directory + "/" + self.__key + "." + str(i) + "." + self.__main_key_language + ".json", self.__cache[i]["main_data"])
self.__cache[i][self.__main_key_language] = json[self.__main_key_language]
if not hasattr(LibreTranslatePlus, "Models"):
class Subanonymous:pass
LibreTranslatePlus.Models = Subanonymous
del globals()["Subanonymous"]
LibreTranslatePlus.Models.JSON = Anonymous
del globals()["Anonymous"]