from collections import OrderedDict from copy import deepcopy import datetime import json import importlib import os import tempfile import zipfile from rest_framework import serializers from zipfile import ZipFile from django.apps import apps from django.conf import settings from django.contrib.sites.models import Site from django.core.serializers import deserialize, serialize from django.db.models import Q from version import get_version from . import models from archaeological_warehouse import models as warehouse_models class PublicSerializer(serializers.BaseSerializer): def to_representation(self, obj): return obj.public_representation() SERIALIZATION_VERSION = "1.0" def get_model_from_filename(filename): filename = filename.split(".")[0] # remove extension module_name, model_name = filename.split("__") module = importlib.import_module(module_name + ".models") return getattr(module, model_name) def serialization_info(): site = Site.objects.get_current() return { "serialize-version": SERIALIZATION_VERSION, "ishtar-version": get_version(), "domain": site.domain, "name": site.name, "date": datetime.datetime.now().isoformat() } def archive_serialization(result, archive_dir=None, archive=False, return_empty_types=False, archive_name=None): """ Serialize all types models to JSON Used for import and export scripts :param result: serialization results :param archive_dir: directory inside the archive (default None) :param return_empty_types: if True instead of serialization return empty types (default False) :param archive: if True return a zip file containing all the file serialized (default False) :param archive_name: path to the archive if not provided a new archive is created :return: string containing the json serialization of types unless return_empty_types or archive is set to True """ if archive and return_empty_types: raise ValueError("archive and return_empty_types are incompatible") if return_empty_types: return [k for k in result if not result[k]] if not archive: return result archive_created = False if not archive_name: archive_created = True tmpdir = tempfile.mkdtemp(prefix="ishtarexport-") + os.sep archive_name = tmpdir + "ishtar-{}.zip".format( datetime.date.today().strftime("%Y-%m-%d") ) if not archive_name.endswith(".zip"): archive_name += ".zip" mode = "w" if archive_created else "a" with tempfile.TemporaryDirectory() as tmpdirname: if archive_dir: os.mkdir(tmpdirname + os.sep + archive_dir) with ZipFile(archive_name, mode) as current_zip: if archive_created: base_filename = "info.json" filename = tmpdirname + os.sep + base_filename with open(filename, "w") as json_file: json_file.write( json.dumps(serialization_info(), indent=2) ) current_zip.write(filename, arcname=base_filename) for dir_name, model_name in result: base_filename = model_name + ".json" filename = tmpdirname + os.sep + base_filename with open(filename, "w") as json_file: json_file.write(result[(dir_name, model_name)]) arcname = base_filename if dir_name: arcname = dir_name + os.sep + base_filename current_zip.write(filename, arcname=arcname) return archive_name def generic_get_results(model_list, dirname, no_geo=True, result_queryset=None): result = OrderedDict() for model in model_list: base_model_name = model.__name__ model_name = str(model.__module__).split(".")[0] + "__" + \ base_model_name if result_queryset and base_model_name in result_queryset: base_q = result_queryset[base_model_name] else: base_q = model.objects q = base_q recursion = None if hasattr(model, "parent"): recursion = "parent" elif hasattr(model, "inverse_relation"): recursion = "inverse_relation" elif hasattr(model, "children"): recursion = "children__id" if recursion: q = q.filter(**{recursion + "__isnull": True}) key = (dirname, model_name) result[key] = serialize( "json", q.distinct().all(), indent=2, use_natural_foreign_keys=True, use_natural_primary_keys=True, ) if recursion: serialized = [item["id"] for item in q.values("id").all()] recursion_in = recursion if not recursion.endswith("_id"): recursion_in += "_id" recursion_in += "__in" q = base_q.filter(**{recursion_in: serialized} ).exclude(id__in=serialized) while q.count(): v = serialize( "json", q.all(), indent=2, use_natural_foreign_keys=True, use_natural_primary_keys=True) new_result = json.loads(result[key]) new_result += json.loads(v) result[key] = json.dumps(new_result, indent=2) serialized += [item["id"] for item in q.values("id").all()] q = base_q.filter(**{recursion_in: serialized} ).exclude(id__in=serialized) # managed circular q = base_q.exclude(id__in=serialized) if q.count(): v = serialize( "json", q.all(), indent=2, use_natural_foreign_keys=True, use_natural_primary_keys=True) result_to_add = json.loads(v) result_cleaned = deepcopy(result_to_add) for res in result_cleaned: # first add with no recursion res["fields"][recursion] = None new_result = json.loads(result[key]) new_result += result_cleaned new_result += result_to_add result[key] = json.dumps(new_result, indent=2) excluded_fields = ["history_modifier", "history_creator", "imports"] if hasattr(model, "SERIALIZATION_EXCLUDE"): excluded_fields = list(model.SERIALIZATION_EXCLUDE) if no_geo: excluded_fields += ["center", "limit"] + [ field.name for field in models.GeoItem._meta.get_fields() ] if excluded_fields: new_result = json.loads(result[key]) for idx in range(len(new_result)): for excluded_field in excluded_fields: if excluded_field in new_result[idx]["fields"]: new_result[idx]["fields"].pop(excluded_field) result[key] = json.dumps(new_result, indent=2) return result def generic_archive_files(model_list, archive_name=None): result = [] for model in model_list: if hasattr(model, "SERIALIZATION_FILES"): for item in model.objects.all(): for attr in model.SERIALIZATION_FILES: media = getattr(item, attr) result.append((media.path, media.name)) archive_created = False if not archive_name: archive_created = True tmpdir = tempfile.mkdtemp(prefix="ishtarexport-") + os.sep archive_name = tmpdir + "media.zip" if not archive_name.endswith(".zip"): archive_name += ".zip" mode = "w" if archive_created else "a" with ZipFile(archive_name, mode) as current_zip: for media_path, name in result: current_zip.write(media_path, arcname=name) return archive_name TYPE_MODEL_EXCLUDE = ["Area", "OperationTypeOld"] def type_serialization(archive=False, return_empty_types=False, archive_name=None): TYPE_MODEL_LIST = [ model for model in apps.get_models() if isinstance(model(), models.GeneralType) and ( model.__name__ not in TYPE_MODEL_EXCLUDE) ] result = generic_get_results(TYPE_MODEL_LIST, "types") return archive_serialization(result, archive_dir="types", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name) CONF_MODEL_LIST = [ models.IshtarSiteProfile, models.GlobalVar, models.CustomForm, models.ExcludedField, models.JsonDataSection, models.JsonDataField, models.CustomFormJsonField, models.ImporterModel, models.DocumentTemplate ] def conf_serialization(archive=False, return_empty_types=False, archive_name=None): media_archive = None if archive: media_archive = generic_archive_files(CONF_MODEL_LIST) result = generic_get_results(CONF_MODEL_LIST, "common_configuration") full_archive = archive_serialization( result, archive_dir="common_configuration", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name) if not media_archive: return full_archive with ZipFile(full_archive, 'a') as current_zip: current_zip.write(media_archive, arcname="media.zip") return full_archive IMPORT_MODEL_LIST = [ models.Regexp, models.ImporterModel, models.ImporterType, models.ValueFormater, models.ImporterColumn, models.FormaterType, models.ImporterDefault, models.ImporterDefaultValues, models.ImportTarget, models.ImporterDefaultValues, models.ImporterDuplicateField ] def importer_serialization(archive=False, return_empty_types=False, archive_name=None): result = generic_get_results(IMPORT_MODEL_LIST, "common_imports") full_archive = archive_serialization( result, archive_dir="common_imports", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name) return full_archive GEO_MODEL_LIST = [ models.State, models.Department, models.Town, models.Area ] def geo_serialization(archive=False, return_empty_types=False, archive_name=None, no_geo=True): result = generic_get_results(GEO_MODEL_LIST, "common_geo", no_geo=no_geo) full_archive = archive_serialization( result, archive_dir="common_geo", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name) return full_archive DIRECTORY_MODEL_LIST = [ models.Organization, models.Person, models.Author ] def directory_serialization(archive=False, return_empty_types=False, archive_name=None): result = generic_get_results(DIRECTORY_MODEL_LIST, "common_directory") full_archive = archive_serialization( result, archive_dir="common_directory", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name) return full_archive def restore_serialized(archive_name, delete_existing=False): with zipfile.ZipFile(archive_name, "r") as zip_file: # check version info = json.loads(zip_file.read("info.json").decode("utf-8")) if info["serialize-version"] != SERIALIZATION_VERSION: raise ValueError( "This dump version: {} is not managed by this Ishtar " "installation".format(info["serialize-version"]) ) DIRS = ( ("types", [None]), ("common_configuration", CONF_MODEL_LIST), ("common_imports", IMPORT_MODEL_LIST), ("common_geo", GEO_MODEL_LIST), ("common_directory", DIRECTORY_MODEL_LIST), ) namelist = zip_file.namelist() for current_dir, model_list in DIRS: for current_model in model_list: for json_filename in namelist: path = json_filename.split(os.sep) if len(path) != 2 or path[0] != current_dir: continue model = get_model_from_filename(path[-1]) if current_model and current_model != model: continue if delete_existing: model.objects.all().delete() data = zip_file.read(json_filename).decode("utf-8") for obj in deserialize("json", data): obj.save() # restore media if "media.zip" in namelist: with tempfile.TemporaryDirectory() as tmp_dir_name: zip_file.extract("media.zip", tmp_dir_name) with zipfile.ZipFile( tmp_dir_name + os.sep + "media.zip", 'r') as media_zip: media_zip.extractall(settings.MEDIA_ROOT)