from collections import OrderedDict from copy import deepcopy import datetime import json import importlib import os from rest_framework.renderers import JSONRenderer from rest_framework import serializers import tempfile from zipfile import ZipFile from django.contrib.contenttypes.models import ContentType from django.contrib.sites.models import Site from django.core.serializers import serialize from django.db.models import Q from django.utils import timezone from ishtar_common.version import get_version from . import models SERIALIZATION_VERSION = "1.0" def get_model_from_filename(filename): filename = filename.split(".")[0] # remove extension splitted = filename.split("__") if len(splitted) != 2: return module_name, model_name = splitted if module_name == "django": if model_name in ("Group", "Permission"): module = importlib.import_module("django.contrib.auth.models") elif model_name in ("ContentType",): module = importlib.import_module("django.contrib.contenttypes.models") else: return else: module = importlib.import_module(module_name + ".models") return getattr(module, model_name) def serialization_info(info=None): site = Site.objects.get_current() base_info = { "serialize-version": SERIALIZATION_VERSION, "ishtar-version": get_version(), "domain": site.domain, "name": site.name, "date": timezone.now().isoformat(), } if info: base_info.update(info) return base_info def archive_serialization( result, archive_dir=None, archive=False, return_empty_types=False, archive_name=None, info=None, ): """ Serialize all types models to JSON Used for import and export scripts :param result: serialization results :param archive_dir: directory inside the archive (default None) :param return_empty_types: if True instead of serialization return empty types (default False) :param archive: if True return a zip file containing all the file serialized (default False) :param archive_name: path to the archive if not provided a new archive is created :return: string containing the json serialization of types unless return_empty_types or archive is set to True """ if archive and return_empty_types: raise ValueError("archive and return_empty_types are incompatible") if return_empty_types: return [k for k in result if not result[k]] if not archive: return result archive_created = False if not archive_name: archive_created = True tmpdir = tempfile.mkdtemp(prefix="ishtarexport-") + os.sep archive_name = tmpdir + "ishtar-{}.zip".format( datetime.date.today().strftime("%Y-%m-%d") ) if not archive_name.endswith(".zip"): archive_name += ".zip" mode = "w" if archive_created else "a" with tempfile.TemporaryDirectory() as tmpdirname: if archive_dir: os.mkdir(tmpdirname + os.sep + archive_dir) with ZipFile(archive_name, mode) as current_zip: if archive_created: base_filename = "info.json" filename = tmpdirname + os.sep + base_filename with open(filename, "w") as json_file: json_file.write(json.dumps(serialization_info(info=info), indent=2)) current_zip.write(filename, arcname=base_filename) for dir_name, model_name in result: base_filename = model_name + ".json" filename = tmpdirname + os.sep + base_filename with open(filename, "w") as json_file: json_file.write(result[(dir_name, model_name)]) arcname = base_filename if dir_name: arcname = dir_name + os.sep + base_filename current_zip.write(filename, arcname=arcname) return archive_name class ItemKeySerializer(serializers.ModelSerializer): app = serializers.SerializerMethodField() model = serializers.SerializerMethodField() content_slug = serializers.SerializerMethodField() importer_type = serializers.SerializerMethodField() class Meta: model = models.ItemKey fields = ['key', 'app', 'model', 'content_slug', 'importer_type'] def get_content_slug(self, obj): return obj.content_slug def get_app(self, obj): return obj.content_type.app_label def get_model(self, obj): return obj.content_type.model def get_importer_type(self, obj): return obj.importer_type.slug if obj.importer_type else "" def save(self, data): try: ct = ContentType.objects.get( app_label=data["app"], model=data["model"], ) except ContentType.DoesNotExist: return model = ct.model_class() if hasattr(model, "txt_idx"): slug = "txt_idx" elif hasattr(model, "slug"): slug = "slug" else: slug = "pk" try: value = model.objects.get(**{slug: data["content_slug"]}) except model.DoesNotExist: return importer_type = None if data["importer_type"]: try: importer_type = models.ImporterType.objects.get( slug=data["importer_type"] ) except models.ImporterType.DoesNotExist: return obj, created = models.ItemKey.objects.get_or_create( key=data["key"], content_type=ct, importer_type=importer_type, ishtar_import=None, user=None, group=None, defaults={"object_id": value.pk} ) if not created: obj.object_id = value.pk obj.save() return obj GENERIC_QUERYSET_FILTER = { "JsonDataSection": {"JsonDataField": "json_data_field__pk__in"}, "Regexp": { "ImporterType": "columns__importer_type__pk__in", "ImporterGroup": "columns__importer_type__groups__group__pk__in", }, "ImporterModel": { "ImporterType": [ "importer_type_associated__pk__in", "importer_type_created__pk__in", ], "ImporterGroup": [ "importer_type_associated__groups__group__pk__in", "importer_type_created__groups__group__pk__in", ], }, "ValueFormater": { "ImporterType": "columns__importer_type__pk__in", "ImporterGroup": "columns__importer_type__groups__group__pk__in", }, "ImporterColumn": { "ImporterType": "importer_type__pk__in", "ImporterGroup": "importer_type__groups__group__pk__in", }, "ImporterDefault": { "ImporterType": "importer_type__pk__in", "ImporterGroup": "importer_type__groups__group__pk__in", }, "ImportTarget": { "ImporterType": "column__importer_type__pk__in", "ImporterGroup": "column__importer_type__groups__group__pk__in", }, "FormaterType": { "ImporterType": "targets__column__importer_type__pk__in", "ImporterGroup": "targets__column__importer_type__groups__group__pk__in", }, "ImporterDefaultValues": { "ImporterType": "default_target__importer_type__pk__in", "ImporterGroup": "default_target__importer_type__groups__group__pk__in", }, "ImporterDuplicateField": { "ImporterType": "column__importer_type__pk__in", "ImporterGroup": "column__importer_type__groups__group__pk__in", }, "ImporterGroup": { "ImporterType": "importer_types__importer_type__pk__in", "ImporterGroup": "importer_types__importer_type__groups__group__pk__in", }, "ImporterGroupImporter": { "ImporterType": "importer_type__pk__in", "ImporterGroup": "group__pk__in", }, "ImporterType": { "ImporterGroup": "groups__group__pk__in", } } CUSTOM_SERIALIZERS = {"ishtar_common.itemkey": ItemKeySerializer} def generic_get_results( model_list, dirname, no_geo=True, result_queryset=None, serialization_include=None ): result = OrderedDict() for model in model_list: base_model_name = model.__name__ model_name = str(model.__module__).split(".")[0] + "__" + base_model_name base_q = model.objects if result_queryset: if result_queryset and base_model_name in result_queryset: base_q = result_queryset[base_model_name] elif base_model_name in GENERIC_QUERYSET_FILTER: alt_filter = GENERIC_QUERYSET_FILTER[base_model_name] for k in alt_filter: if k in result_queryset: terms = alt_filter[k] if not isinstance(terms, (list, tuple)): terms = [terms] ids = [r["pk"] for r in result_queryset[k].values("pk").all()] q = None for term in terms: if not q: q = Q(**{term: ids}) else: q |= Q(**{term: ids}) base_q = base_q.filter(q) break q = base_q recursion = None if hasattr(model, "parent"): recursion = "parent" elif hasattr(model, "inverse_relation"): recursion = "inverse_relation" elif hasattr(model, "children") and model.children.field.model == model: recursion = "children__id" elif hasattr(model, "child"): recursion = "child_id" if recursion: q = q.filter(**{recursion + "__isnull": True}) key = (dirname, model_name) model_name = f"{model._meta.app_label}.{model._meta.model_name}" is_custom = model_name in CUSTOM_SERIALIZERS if is_custom: current_serializer = CUSTOM_SERIALIZERS[model_name] result[key] = JSONRenderer().render( [{"model": model_name, "fields": current_serializer(item).data} for item in q.distinct().all()] ) else: result[key] = serialize( "json", q.distinct().all(), indent=2, use_natural_foreign_keys=True, use_natural_primary_keys=True, ) if recursion and is_custom: # TODO raise NotImplementedError("Recursion not managed for this custom serializer") if recursion: serialized = [item["id"] for item in q.values("id").all()] recursion_in = recursion if not recursion.endswith("_id"): recursion_in += "_id" recursion_in += "__in" q = base_q.filter(**{recursion_in: serialized}).exclude(id__in=serialized) while q.count(): v = serialize( "json", q.all(), indent=2, use_natural_foreign_keys=True, use_natural_primary_keys=True, ) new_result = json.loads(result[key]) new_result += json.loads(v) result[key] = json.dumps(new_result, indent=2) serialized += [item["id"] for item in q.values("id").all()] q = base_q.filter(**{recursion_in: serialized}).exclude( id__in=serialized ) # managed circular q = base_q.exclude(id__in=serialized) if q.count(): v = serialize( "json", q.all(), indent=2, use_natural_foreign_keys=True, use_natural_primary_keys=True, ) result_to_add = json.loads(v) result_cleaned = deepcopy(result_to_add) for res in result_cleaned: # first add with no recursion res["fields"][recursion] = None new_result = json.loads(result[key]) new_result += result_cleaned new_result += result_to_add result[key] = json.dumps(new_result, indent=2) excluded_fields = [ "history_modifier", "history_creator", "imports", "locked", "lock_user", ] if hasattr(model, "SERIALIZATION_EXCLUDE"): excluded_fields += list(model.SERIALIZATION_EXCLUDE) if no_geo: excluded_fields += ["center", "limit"] + [ field.name for field in models.GeoItem._meta.get_fields() ] if serialization_include and model.__name__ in serialization_include: for k in serialization_include[model.__name__]: if k in excluded_fields: excluded_fields.pop(excluded_fields.index(k)) if excluded_fields: new_result = json.loads(result[key]) for idx in range(len(new_result)): for excluded_field in excluded_fields: if excluded_field in new_result[idx]["fields"]: new_result[idx]["fields"].pop(excluded_field) result[key] = json.dumps(new_result, indent=2) return result def generic_archive_files(model_list, archive_name=None, result_queryset=None): if not result_queryset: result_queryset = {} result = [] for model in model_list: if model.__name__ in result_queryset.keys(): query = result_queryset[model.__name__] else: query = model.objects if hasattr(model, "SERIALIZATION_FILES"): for item in query.all(): for attr in model.SERIALIZATION_FILES: media = getattr(item, attr) try: result.append((media.path, media.name)) except ValueError: pass archive_created = False if not archive_name: archive_created = True tmpdir = tempfile.mkdtemp(prefix="ishtarexport-") + os.sep archive_name = tmpdir + "media.zip" if not archive_name.endswith(".zip"): archive_name += ".zip" mode = "w" if archive_created else "a" with ZipFile(archive_name, mode) as current_zip: for media_path, name in result: try: current_zip.write(media_path, arcname=name) except OSError: pass return archive_name