from collections import OrderedDict from copy import deepcopy import datetime import json import importlib import os import tempfile from zipfile import ZipFile from django.contrib.sites.models import Site from django.core.serializers import serialize from version import get_version from . import models SERIALIZATION_VERSION = "1.0" def get_model_from_filename(filename): filename = filename.split(".")[0] # remove extension module_name, model_name = filename.split("__") module = importlib.import_module(module_name + ".models") return getattr(module, model_name) def serialization_info(info=None): site = Site.objects.get_current() base_info = { "serialize-version": SERIALIZATION_VERSION, "ishtar-version": get_version(), "domain": site.domain, "name": site.name, "date": datetime.datetime.now().isoformat() } if info: base_info.update(info) return base_info def archive_serialization(result, archive_dir=None, archive=False, return_empty_types=False, archive_name=None, info=None): """ Serialize all types models to JSON Used for import and export scripts :param result: serialization results :param archive_dir: directory inside the archive (default None) :param return_empty_types: if True instead of serialization return empty types (default False) :param archive: if True return a zip file containing all the file serialized (default False) :param archive_name: path to the archive if not provided a new archive is created :return: string containing the json serialization of types unless return_empty_types or archive is set to True """ if archive and return_empty_types: raise ValueError("archive and return_empty_types are incompatible") if return_empty_types: return [k for k in result if not result[k]] if not archive: return result archive_created = False if not archive_name: archive_created = True tmpdir = tempfile.mkdtemp(prefix="ishtarexport-") + os.sep archive_name = tmpdir + "ishtar-{}.zip".format( datetime.date.today().strftime("%Y-%m-%d") ) if not archive_name.endswith(".zip"): archive_name += ".zip" mode = "w" if archive_created else "a" with tempfile.TemporaryDirectory() as tmpdirname: if archive_dir: os.mkdir(tmpdirname + os.sep + archive_dir) with ZipFile(archive_name, mode) as current_zip: if archive_created: base_filename = "info.json" filename = tmpdirname + os.sep + base_filename with open(filename, "w") as json_file: json_file.write( json.dumps(serialization_info(info=info), indent=2) ) current_zip.write(filename, arcname=base_filename) for dir_name, model_name in result: base_filename = model_name + ".json" filename = tmpdirname + os.sep + base_filename with open(filename, "w") as json_file: json_file.write(result[(dir_name, model_name)]) arcname = base_filename if dir_name: arcname = dir_name + os.sep + base_filename current_zip.write(filename, arcname=arcname) return archive_name def generic_get_results(model_list, dirname, no_geo=True, result_queryset=None): result = OrderedDict() for model in model_list: base_model_name = model.__name__ model_name = str(model.__module__).split(".")[0] + "__" + \ base_model_name if result_queryset and base_model_name in result_queryset: base_q = result_queryset[base_model_name] else: base_q = model.objects q = base_q recursion = None if hasattr(model, "parent"): recursion = "parent" elif hasattr(model, "inverse_relation"): recursion = "inverse_relation" elif hasattr(model, "children"): recursion = "children__id" if recursion: q = q.filter(**{recursion + "__isnull": True}) key = (dirname, model_name) result[key] = serialize( "json", q.distinct().all(), indent=2, use_natural_foreign_keys=True, use_natural_primary_keys=True, ) if recursion: serialized = [item["id"] for item in q.values("id").all()] recursion_in = recursion if not recursion.endswith("_id"): recursion_in += "_id" recursion_in += "__in" q = base_q.filter(**{recursion_in: serialized} ).exclude(id__in=serialized) while q.count(): v = serialize( "json", q.all(), indent=2, use_natural_foreign_keys=True, use_natural_primary_keys=True) new_result = json.loads(result[key]) new_result += json.loads(v) result[key] = json.dumps(new_result, indent=2) serialized += [item["id"] for item in q.values("id").all()] q = base_q.filter(**{recursion_in: serialized} ).exclude(id__in=serialized) # managed circular q = base_q.exclude(id__in=serialized) if q.count(): v = serialize( "json", q.all(), indent=2, use_natural_foreign_keys=True, use_natural_primary_keys=True) result_to_add = json.loads(v) result_cleaned = deepcopy(result_to_add) for res in result_cleaned: # first add with no recursion res["fields"][recursion] = None new_result = json.loads(result[key]) new_result += result_cleaned new_result += result_to_add result[key] = json.dumps(new_result, indent=2) excluded_fields = ["history_modifier", "history_creator", "imports", "locked", "lock_user"] if hasattr(model, "SERIALIZATION_EXCLUDE"): excluded_fields = list(model.SERIALIZATION_EXCLUDE) if no_geo: excluded_fields += ["center", "limit"] + [ field.name for field in models.GeoItem._meta.get_fields() ] if excluded_fields: new_result = json.loads(result[key]) for idx in range(len(new_result)): for excluded_field in excluded_fields: if excluded_field in new_result[idx]["fields"]: new_result[idx]["fields"].pop(excluded_field) result[key] = json.dumps(new_result, indent=2) return result def generic_archive_files(model_list, archive_name=None, result_queryset=None): if not result_queryset: result_queryset = {} result = [] for model in model_list: if model.__name__ in result_queryset.keys(): query = result_queryset[model.__name__] else: query = model.objects if hasattr(model, "SERIALIZATION_FILES"): for item in query.all(): for attr in model.SERIALIZATION_FILES: media = getattr(item, attr) try: result.append((media.path, media.name)) except ValueError: pass archive_created = False if not archive_name: archive_created = True tmpdir = tempfile.mkdtemp(prefix="ishtarexport-") + os.sep archive_name = tmpdir + "media.zip" if not archive_name.endswith(".zip"): archive_name += ".zip" mode = "w" if archive_created else "a" with ZipFile(archive_name, mode) as current_zip: for media_path, name in result: try: current_zip.write(media_path, arcname=name) except OSError: pass return archive_name