diff options
Diffstat (limited to 'ishtar_common/serializers_utils.py')
-rw-r--r-- | ishtar_common/serializers_utils.py | 203 |
1 files changed, 203 insertions, 0 deletions
diff --git a/ishtar_common/serializers_utils.py b/ishtar_common/serializers_utils.py new file mode 100644 index 000000000..4bd655269 --- /dev/null +++ b/ishtar_common/serializers_utils.py @@ -0,0 +1,203 @@ +from collections import OrderedDict +from copy import deepcopy +import datetime +import json +import importlib +import os +import tempfile +from zipfile import ZipFile + +from django.contrib.sites.models import Site +from django.core.serializers import serialize + +from version import get_version +from . import models + + +SERIALIZATION_VERSION = "1.0" + + +def get_model_from_filename(filename): + filename = filename.split(".")[0] # remove extension + module_name, model_name = filename.split("__") + module = importlib.import_module(module_name + ".models") + return getattr(module, model_name) + + +def serialization_info(): + site = Site.objects.get_current() + return { + "serialize-version": SERIALIZATION_VERSION, + "ishtar-version": get_version(), + "domain": site.domain, + "name": site.name, + "date": datetime.datetime.now().isoformat() + } + + +def archive_serialization(result, archive_dir=None, archive=False, + return_empty_types=False, archive_name=None): + """ + Serialize all types models to JSON + Used for import and export scripts + + :param result: serialization results + :param archive_dir: directory inside the archive (default None) + :param return_empty_types: if True instead of serialization return empty + types (default False) + :param archive: if True return a zip file containing all the file serialized + (default False) + :param archive_name: path to the archive if not provided a new archive is + created + :return: string containing the json serialization of types unless + return_empty_types or archive is set to True + """ + if archive and return_empty_types: + raise ValueError("archive and return_empty_types are incompatible") + if return_empty_types: + return [k for k in result if not result[k]] + if not archive: + return result + archive_created = False + if not archive_name: + archive_created = True + tmpdir = tempfile.mkdtemp(prefix="ishtarexport-") + os.sep + archive_name = tmpdir + "ishtar-{}.zip".format( + datetime.date.today().strftime("%Y-%m-%d") + ) + if not archive_name.endswith(".zip"): + archive_name += ".zip" + mode = "w" if archive_created else "a" + with tempfile.TemporaryDirectory() as tmpdirname: + if archive_dir: + os.mkdir(tmpdirname + os.sep + archive_dir) + + with ZipFile(archive_name, mode) as current_zip: + if archive_created: + base_filename = "info.json" + filename = tmpdirname + os.sep + base_filename + with open(filename, "w") as json_file: + json_file.write( + json.dumps(serialization_info(), indent=2) + ) + current_zip.write(filename, arcname=base_filename) + + for dir_name, model_name in result: + base_filename = model_name + ".json" + filename = tmpdirname + os.sep + base_filename + with open(filename, "w") as json_file: + json_file.write(result[(dir_name, model_name)]) + arcname = base_filename + if dir_name: + arcname = dir_name + os.sep + base_filename + current_zip.write(filename, arcname=arcname) + return archive_name + + +def generic_get_results(model_list, dirname, no_geo=True, + result_queryset=None): + result = OrderedDict() + for model in model_list: + base_model_name = model.__name__ + model_name = str(model.__module__).split(".")[0] + "__" + \ + base_model_name + + if result_queryset and base_model_name in result_queryset: + base_q = result_queryset[base_model_name] + else: + base_q = model.objects + q = base_q + recursion = None + if hasattr(model, "parent"): + recursion = "parent" + elif hasattr(model, "inverse_relation"): + recursion = "inverse_relation" + elif hasattr(model, "children"): + recursion = "children__id" + if recursion: + q = q.filter(**{recursion + "__isnull": True}) + + key = (dirname, model_name) + result[key] = serialize( + "json", q.distinct().all(), + indent=2, + use_natural_foreign_keys=True, use_natural_primary_keys=True, + ) + + if recursion: + serialized = [item["id"] for item in q.values("id").all()] + recursion_in = recursion + if not recursion.endswith("_id"): + recursion_in += "_id" + recursion_in += "__in" + q = base_q.filter(**{recursion_in: serialized} + ).exclude(id__in=serialized) + while q.count(): + v = serialize( + "json", q.all(), indent=2, use_natural_foreign_keys=True, + use_natural_primary_keys=True) + new_result = json.loads(result[key]) + new_result += json.loads(v) + result[key] = json.dumps(new_result, indent=2) + serialized += [item["id"] for item in q.values("id").all()] + q = base_q.filter(**{recursion_in: serialized} + ).exclude(id__in=serialized) + # managed circular + q = base_q.exclude(id__in=serialized) + if q.count(): + v = serialize( + "json", q.all(), indent=2, use_natural_foreign_keys=True, + use_natural_primary_keys=True) + result_to_add = json.loads(v) + result_cleaned = deepcopy(result_to_add) + for res in result_cleaned: # first add with no recursion + res["fields"][recursion] = None + new_result = json.loads(result[key]) + new_result += result_cleaned + new_result += result_to_add + result[key] = json.dumps(new_result, indent=2) + + excluded_fields = ["history_modifier", "history_creator", "imports"] + if hasattr(model, "SERIALIZATION_EXCLUDE"): + excluded_fields = list(model.SERIALIZATION_EXCLUDE) + if no_geo: + excluded_fields += ["center", "limit"] + [ + field.name for field in models.GeoItem._meta.get_fields() + ] + if excluded_fields: + new_result = json.loads(result[key]) + for idx in range(len(new_result)): + for excluded_field in excluded_fields: + if excluded_field in new_result[idx]["fields"]: + new_result[idx]["fields"].pop(excluded_field) + result[key] = json.dumps(new_result, indent=2) + return result + + +def generic_archive_files(model_list, archive_name=None, result_queryset=None): + if not result_queryset: + result_queryset = {} + result = [] + for model in model_list: + if model.__name__ in result_queryset.keys(): + query = result_queryset[model.__name__] + else: + query = model.objects + if hasattr(model, "SERIALIZATION_FILES"): + for item in query.all(): + for attr in model.SERIALIZATION_FILES: + media = getattr(item, attr) + result.append((media.path, media.name)) + + archive_created = False + if not archive_name: + archive_created = True + tmpdir = tempfile.mkdtemp(prefix="ishtarexport-") + os.sep + archive_name = tmpdir + "media.zip" + if not archive_name.endswith(".zip"): + archive_name += ".zip" + mode = "w" if archive_created else "a" + with ZipFile(archive_name, mode) as current_zip: + for media_path, name in result: + current_zip.write(media_path, arcname=name) + return archive_name |