import json import os import tempfile import zipfile from rest_framework import serializers from zipfile import ZipFile from django.apps import apps from django.conf import settings from django.core.serializers import deserialize from django.db.models import Q from . import models from ishtar_common.serializers_utils import generic_get_results, \ archive_serialization, generic_archive_files, SERIALIZATION_VERSION, \ get_model_from_filename from archaeological_operations.serializers import operation_serialization from archaeological_context_records.serializers import cr_serialization from archaeological_finds.serializers import find_serialization from archaeological_warehouse.serializers import warehouse_serialization class PublicSerializer(serializers.BaseSerializer): def to_representation(self, obj): return obj.public_representation() TYPE_MODEL_EXCLUDE = ["Area", "OperationTypeOld"] def type_serialization(archive=False, return_empty_types=False, archive_name=None): TYPE_MODEL_LIST = [ model for model in apps.get_models() if isinstance(model(), models.GeneralType) and ( model.__name__ not in TYPE_MODEL_EXCLUDE) ] result = generic_get_results(TYPE_MODEL_LIST, "types") return archive_serialization(result, archive_dir="types", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name) CONF_MODEL_LIST = [ models.IshtarSiteProfile, models.GlobalVar, models.CustomForm, models.ExcludedField, models.JsonDataSection, models.JsonDataField, models.CustomFormJsonField, models.ImporterModel, models.DocumentTemplate ] def conf_serialization(archive=False, return_empty_types=False, archive_name=None): media_archive = None if archive: media_archive = generic_archive_files(CONF_MODEL_LIST) result = generic_get_results(CONF_MODEL_LIST, "common_configuration") full_archive = archive_serialization( result, archive_dir="common_configuration", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name) if not media_archive: return full_archive with ZipFile(full_archive, 'a') as current_zip: current_zip.write(media_archive, arcname="media.zip") return full_archive IMPORT_MODEL_LIST = [ models.Regexp, models.ImporterModel, models.ImporterType, models.ValueFormater, models.ImporterColumn, models.FormaterType, models.ImporterDefault, models.ImporterDefaultValues, models.ImportTarget, models.ImporterDefaultValues, models.ImporterDuplicateField ] def importer_serialization(archive=False, return_empty_types=False, archive_name=None): result = generic_get_results(IMPORT_MODEL_LIST, "common_imports") full_archive = archive_serialization( result, archive_dir="common_imports", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name) return full_archive GEO_MODEL_LIST = [ models.State, models.Department, models.Town, models.Area ] def geo_serialization(archive=False, return_empty_types=False, archive_name=None, no_geo=True): result = generic_get_results(GEO_MODEL_LIST, "common_geo", no_geo=no_geo) full_archive = archive_serialization( result, archive_dir="common_geo", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name) return full_archive DIRECTORY_MODEL_LIST = [ models.Organization, models.Person, models.Author ] def directory_serialization(archive=False, return_empty_types=False, archive_name=None): result = generic_get_results(DIRECTORY_MODEL_LIST, "common_directory") full_archive = archive_serialization( result, archive_dir="common_directory", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name) return full_archive def document_serialization(archive=False, return_empty_types=False, archive_name=None, operation_queryset=None, site_queryset=None, cr_queryset=None, find_queryset=None, warehouse_queryset=None): result_queryset = {} get_queryset_attr = None if operation_queryset: get_queryset_attr = {"operation_queryset": operation_queryset, "get_queryset": True} elif site_queryset: get_queryset_attr = {"site_queryset": site_queryset, "get_queryset": True} elif cr_queryset: get_queryset_attr = {"cr_queryset": cr_queryset, "get_queryset": True} elif find_queryset: get_queryset_attr = {"find_queryset": find_queryset, "get_queryset": True} elif warehouse_queryset: get_queryset_attr = {"warehouse_queryset": warehouse_queryset, "get_queryset": True} if get_queryset_attr: queries = operation_serialization(**get_queryset_attr) queries.update(cr_serialization(**get_queryset_attr)) queries.update(find_serialization(**get_queryset_attr)) queries.update(warehouse_serialization(**get_queryset_attr)) q = None for model, attr in ( ("Operation", "operations"), ("ArchaeologicalSite", "sites"), ("ContextRecord", "context_records"), ("Find", "finds"), ("Warehouse", "warehouses"), ("Container", "containers")): values = queries[model].values_list("id", flat=True) base_q = Q(**{attr + "__id__in": values}) if not q: q = base_q else: q |= base_q result_queryset["Document"] = models.Document.objects.filter(q) result = generic_get_results([models.Document], "documents", result_queryset=result_queryset) media_archive = None if archive: media_archive = generic_archive_files([models.Document], result_queryset=result_queryset) full_archive = archive_serialization( result, archive_dir="documents", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name) if not media_archive: return full_archive with ZipFile(full_archive, 'a') as current_zip: current_zip.write(media_archive, arcname="media.zip") return full_archive def restore_serialized(archive_name, delete_existing=False): with zipfile.ZipFile(archive_name, "r") as zip_file: # check version info = json.loads(zip_file.read("info.json").decode("utf-8")) if info["serialize-version"] != SERIALIZATION_VERSION: raise ValueError( "This dump version: {} is not managed by this Ishtar " "installation".format(info["serialize-version"]) ) DIRS = ( ("types", [None]), ("common_configuration", CONF_MODEL_LIST), ("common_imports", IMPORT_MODEL_LIST), ("common_geo", GEO_MODEL_LIST), ("common_directory", DIRECTORY_MODEL_LIST), ) namelist = zip_file.namelist() for current_dir, model_list in DIRS: for current_model in model_list: for json_filename in namelist: path = json_filename.split(os.sep) if len(path) != 2 or path[0] != current_dir: continue model = get_model_from_filename(path[-1]) if current_model and current_model != model: continue if delete_existing: model.objects.all().delete() data = zip_file.read(json_filename).decode("utf-8") for obj in deserialize("json", data): obj.save() # restore media if "media.zip" in namelist: with tempfile.TemporaryDirectory() as tmp_dir_name: zip_file.extract("media.zip", tmp_dir_name) with zipfile.ZipFile( tmp_dir_name + os.sep + "media.zip", 'r') as media_zip: media_zip.extractall(settings.MEDIA_ROOT)