from collections import OrderedDict from copy import deepcopy import datetime import json import importlib import os import tempfile from zipfile import ZipFile from django.contrib.sites.models import Site from django.core.serializers import serialize from django.db.models import Q from ishtar_common.version import get_version from . import models SERIALIZATION_VERSION = "1.0" def get_model_from_filename(filename): filename = filename.split(".")[0] # remove extension splitted = filename.split("__") if len(splitted) != 2: return module_name, model_name = splitted if module_name == "django": if model_name in ("Group", "Permission"): module = importlib.import_module("django.contrib.auth.models") elif model_name in ("ContentType",): module = importlib.import_module("django.contrib.contenttypes.models") else: return else: module = importlib.import_module(module_name + ".models") return getattr(module, model_name) def serialization_info(info=None): site = Site.objects.get_current() base_info = { "serialize-version": SERIALIZATION_VERSION, "ishtar-version": get_version(), "domain": site.domain, "name": site.name, "date": datetime.datetime.now().isoformat(), } if info: base_info.update(info) return base_info def archive_serialization( result, archive_dir=None, archive=False, return_empty_types=False, archive_name=None, info=None, ): """ Serialize all types models to JSON Used for import and export scripts :param result: serialization results :param archive_dir: directory inside the archive (default None) :param return_empty_types: if True instead of serialization return empty types (default False) :param archive: if True return a zip file containing all the file serialized (default False) :param archive_name: path to the archive if not provided a new archive is created :return: string containing the json serialization of types unless return_empty_types or archive is set to True """ if archive and return_empty_types: raise ValueError("archive and return_empty_types are incompatible") if return_empty_types: return [k for k in result if not result[k]] if not archive: return result archive_created = False if not archive_name: archive_created = True tmpdir = tempfile.mkdtemp(prefix="ishtarexport-") + os.sep archive_name = tmpdir + "ishtar-{}.zip".format( datetime.date.today().strftime("%Y-%m-%d") ) if not archive_name.endswith(".zip"): archive_name += ".zip" mode = "w" if archive_created else "a" with tempfile.TemporaryDirectory() as tmpdirname: if archive_dir: os.mkdir(tmpdirname + os.sep + archive_dir) with ZipFile(archive_name, mode) as current_zip: if archive_created: base_filename = "info.json" filename = tmpdirname + os.sep + base_filename with open(filename, "w") as json_file: json_file.write(json.dumps(serialization_info(info=info), indent=2)) current_zip.write(filename, arcname=base_filename) for dir_name, model_name in result: base_filename = model_name + ".json" filename = tmpdirname + os.sep + base_filename with open(filename, "w") as json_file: json_file.write(result[(dir_name, model_name)]) arcname = base_filename if dir_name: arcname = dir_name + os.sep + base_filename current_zip.write(filename, arcname=arcname) return archive_name GENERIC_QUERYSET_FILTER = { "Regexp": {"ImporterType": "columns__importer_type__pk__in"}, "ImporterModel": { "ImporterType": [ "importer_type_associated__pk__in", "importer_type_created__pk__in", ] }, "ValueFormater": {"ImporterType": "columns__importer_type__pk__in"}, "ImporterColumn": {"ImporterType": "importer_type__pk__in"}, "ImporterDefault": {"ImporterType": "importer_type__pk__in"}, "ImportTarget": {"ImporterType": "column__importer_type__pk__in"}, "FormaterType": {"ImporterType": "targets__column__importer_type__pk__in"}, "ImporterDefaultValues": {"ImporterType": "default_target__importer_type__pk__in"}, "ImporterDuplicateField": {"ImporterType": "column__importer_type__pk__in"}, } def generic_get_results( model_list, dirname, no_geo=True, result_queryset=None, serialization_include=None ): result = OrderedDict() for model in model_list: base_model_name = model.__name__ model_name = str(model.__module__).split(".")[0] + "__" + base_model_name base_q = model.objects if result_queryset: if result_queryset and base_model_name in result_queryset: base_q = result_queryset[base_model_name] elif base_model_name in GENERIC_QUERYSET_FILTER: alt_filter = GENERIC_QUERYSET_FILTER[base_model_name] for k in alt_filter: if k in result_queryset: terms = alt_filter[k] if not isinstance(terms, (list, tuple)): terms = [terms] ids = [r["pk"] for r in result_queryset[k].values("pk").all()] q = None for term in terms: if not q: q = Q(**{term: ids}) else: q |= Q(**{term: ids}) base_q = base_q.filter(q) break q = base_q recursion = None if hasattr(model, "parent"): recursion = "parent" elif hasattr(model, "inverse_relation"): recursion = "inverse_relation" elif hasattr(model, "children") and model.children.field.model == model: recursion = "children__id" elif hasattr(model, "child"): recursion = "child_id" if recursion: q = q.filter(**{recursion + "__isnull": True}) key = (dirname, model_name) result[key] = serialize( "json", q.distinct().all(), indent=2, use_natural_foreign_keys=True, use_natural_primary_keys=True, ) if recursion: serialized = [item["id"] for item in q.values("id").all()] recursion_in = recursion if not recursion.endswith("_id"): recursion_in += "_id" recursion_in += "__in" q = base_q.filter(**{recursion_in: serialized}).exclude(id__in=serialized) while q.count(): v = serialize( "json", q.all(), indent=2, use_natural_foreign_keys=True, use_natural_primary_keys=True, ) new_result = json.loads(result[key]) new_result += json.loads(v) result[key] = json.dumps(new_result, indent=2) serialized += [item["id"] for item in q.values("id").all()] q = base_q.filter(**{recursion_in: serialized}).exclude( id__in=serialized ) # managed circular q = base_q.exclude(id__in=serialized) if q.count(): v = serialize( "json", q.all(), indent=2, use_natural_foreign_keys=True, use_natural_primary_keys=True, ) result_to_add = json.loads(v) result_cleaned = deepcopy(result_to_add) for res in result_cleaned: # first add with no recursion res["fields"][recursion] = None new_result = json.loads(result[key]) new_result += result_cleaned new_result += result_to_add result[key] = json.dumps(new_result, indent=2) excluded_fields = [ "history_modifier", "history_creator", "imports", "locked", "lock_user", ] if hasattr(model, "SERIALIZATION_EXCLUDE"): excluded_fields += list(model.SERIALIZATION_EXCLUDE) if no_geo: excluded_fields += ["center", "limit"] + [ field.name for field in models.GeoItem._meta.get_fields() ] if serialization_include and model.__name__ in serialization_include: for k in serialization_include[model.__name__]: if k in excluded_fields: excluded_fields.pop(excluded_fields.index(k)) if excluded_fields: new_result = json.loads(result[key]) for idx in range(len(new_result)): for excluded_field in excluded_fields: if excluded_field in new_result[idx]["fields"]: new_result[idx]["fields"].pop(excluded_field) result[key] = json.dumps(new_result, indent=2) return result def generic_archive_files(model_list, archive_name=None, result_queryset=None): if not result_queryset: result_queryset = {} result = [] for model in model_list: if model.__name__ in result_queryset.keys(): query = result_queryset[model.__name__] else: query = model.objects if hasattr(model, "SERIALIZATION_FILES"): for item in query.all(): for attr in model.SERIALIZATION_FILES: media = getattr(item, attr) try: result.append((media.path, media.name)) except ValueError: pass archive_created = False if not archive_name: archive_created = True tmpdir = tempfile.mkdtemp(prefix="ishtarexport-") + os.sep archive_name = tmpdir + "media.zip" if not archive_name.endswith(".zip"): archive_name += ".zip" mode = "w" if archive_created else "a" with ZipFile(archive_name, mode) as current_zip: for media_path, name in result: try: current_zip.write(media_path, arcname=name) except OSError: pass return archive_name