import json import os import tempfile import zipfile from rest_framework import serializers from zipfile import ZipFile from django.apps import apps from django.conf import settings from django.core.serializers import deserialize from django.db.models import Q from . import models from ishtar_common.serializers_utils import generic_get_results, \ archive_serialization, generic_archive_files, SERIALIZATION_VERSION, \ get_model_from_filename from archaeological_operations.serializers import operation_serialization, \ OPERATION_MODEL_LIST from archaeological_context_records.serializers import cr_serialization, \ CR_MODEL_LIST from archaeological_finds.serializers import find_serialization, \ FIND_MODEL_LIST from archaeological_warehouse.serializers import warehouse_serialization, \ WAREHOUSE_MODEL_LIST class PublicSerializer(serializers.BaseSerializer): def to_representation(self, obj): return obj.public_representation() TYPE_MODEL_EXCLUDE = ["Area", "OperationTypeOld"] def get_type_models(): return [ model for model in apps.get_models() if isinstance(model(), models.GeneralType) and ( model.__name__ not in TYPE_MODEL_EXCLUDE) ] def type_serialization(archive=False, return_empty_types=False, archive_name=None, info=None): result = generic_get_results(get_type_models(), "types") return archive_serialization(result, archive_dir="types", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name, info=info) CONF_MODEL_LIST = [ models.IshtarSiteProfile, models.GlobalVar, models.CustomForm, models.ExcludedField, models.JsonDataSection, models.JsonDataField, models.CustomFormJsonField, models.ImporterModel, models.DocumentTemplate ] def conf_serialization(archive=False, return_empty_types=False, archive_name=None): media_archive = None if archive: media_archive = generic_archive_files(CONF_MODEL_LIST) result = generic_get_results(CONF_MODEL_LIST, "common_configuration") full_archive = archive_serialization( result, archive_dir="common_configuration", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name) if not media_archive: return full_archive with ZipFile(full_archive, 'a') as current_zip: current_zip.write(media_archive, arcname="media.zip") return full_archive IMPORT_MODEL_LIST = [ models.Regexp, models.ImporterModel, models.ImporterType, models.ValueFormater, models.ImporterColumn, models.FormaterType, models.ImporterDefault, models.ImporterDefaultValues, models.ImportTarget, models.ImporterDefaultValues, models.ImporterDuplicateField ] def importer_serialization(archive=False, return_empty_types=False, archive_name=None): result = generic_get_results(IMPORT_MODEL_LIST, "common_imports") full_archive = archive_serialization( result, archive_dir="common_imports", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name) return full_archive GEO_MODEL_LIST = [ models.State, models.Department, models.Town, models.Area ] def geo_serialization(archive=False, return_empty_types=False, archive_name=None, no_geo=True): result = generic_get_results(GEO_MODEL_LIST, "common_geo", no_geo=no_geo) full_archive = archive_serialization( result, archive_dir="common_geo", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name) return full_archive DIRECTORY_MODEL_LIST = [ models.Organization, models.Person, models.Author ] def directory_serialization(archive=False, return_empty_types=False, archive_name=None): result = generic_get_results(DIRECTORY_MODEL_LIST, "common_directory") full_archive = archive_serialization( result, archive_dir="common_directory", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name) return full_archive def document_serialization(archive=False, return_empty_types=False, archive_name=None, operation_queryset=None, site_queryset=None, cr_queryset=None, find_queryset=None, warehouse_queryset=None): result_queryset = {} get_queryset_attr = None if operation_queryset: get_queryset_attr = {"operation_queryset": operation_queryset, "get_queryset": True} elif site_queryset: get_queryset_attr = {"site_queryset": site_queryset, "get_queryset": True} elif cr_queryset: get_queryset_attr = {"cr_queryset": cr_queryset, "get_queryset": True} elif find_queryset: get_queryset_attr = {"find_queryset": find_queryset, "get_queryset": True} elif warehouse_queryset: get_queryset_attr = {"warehouse_queryset": warehouse_queryset, "get_queryset": True} if get_queryset_attr: queries = operation_serialization(**get_queryset_attr) queries.update(cr_serialization(**get_queryset_attr)) queries.update(find_serialization(**get_queryset_attr)) queries.update(warehouse_serialization(**get_queryset_attr)) document_ids = set() for model, attr in ( ("Operation", "operations"), ("ArchaeologicalSite", "sites"), ("ContextRecord", "context_records"), ("Find", "finds"), ("Warehouse", "warehouses"), ("Container", "containers")): values = list(queries[model].values_list("id", flat=True)) document_ids.update( models.Document.objects.filter( **{attr + "__id__in": values}).values_list( "id", flat=True)) result_queryset["Document"] = models.Document.objects.filter( id__in=document_ids) result = generic_get_results([models.Document], "documents", result_queryset=result_queryset) media_archive = None if archive: media_archive = generic_archive_files([models.Document], result_queryset=result_queryset) full_archive = archive_serialization( result, archive_dir="documents", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name) if not media_archive: return full_archive has_media = "media.zip" in ZipFile(full_archive, 'r').namelist() if not has_media: with ZipFile(full_archive, 'a') as current_zip: current_zip.write(media_archive, arcname="media.zip") os.remove(media_archive) return full_archive with tempfile.TemporaryDirectory() as tmp_dir_name: # extract the current archive current_zip = ZipFile(full_archive, 'r') name_list = current_zip.namelist() for name in name_list: current_zip.extract(name, tmp_dir_name) current_zip.close() # extract the media and recreate a media.zip old_media_archive = ZipFile( os.path.join(tmp_dir_name, "media.zip"), "r") with ZipFile(media_archive, "a") as new_zip: for name in old_media_archive.namelist(): new_zip.writestr( name, old_media_archive.open(name).read()) # rewrite the archive with ZipFile(full_archive + "_new", "w") as new_zip: for name in name_list: if name == "media.zip": continue new_zip.write(os.path.join(tmp_dir_name, name), arcname=name) new_zip.write(media_archive, arcname="media.zip") os.remove(media_archive) os.remove(full_archive) os.rename(full_archive + "_new", full_archive) return full_archive def full_serialization(operation_queryset=None, site_queryset=None, cr_queryset=None, find_queryset=None, warehouse_queryset=None, archive=True, no_geo=True, info=None): # print("type") archive_name = type_serialization(archive=archive, info=info) # print("conf") conf_serialization(archive=archive, archive_name=archive_name) # print("importer") importer_serialization(archive=archive, archive_name=archive_name) # print("geo") geo_serialization(archive=archive, archive_name=archive_name, no_geo=no_geo) # print("directory") directory_serialization(archive=archive, archive_name=archive_name) # print("document") document_serialization( archive=archive, archive_name=archive_name, operation_queryset=operation_queryset, site_queryset=site_queryset, cr_queryset=cr_queryset, find_queryset=find_queryset, warehouse_queryset=warehouse_queryset) # print("operation") operation_serialization( archive=archive, archive_name=archive_name, operation_queryset=operation_queryset, site_queryset=site_queryset, cr_queryset=cr_queryset, find_queryset=find_queryset, warehouse_queryset=warehouse_queryset, no_geo=no_geo) # print("cr") cr_serialization( archive=archive, archive_name=archive_name, operation_queryset=operation_queryset, site_queryset=site_queryset, cr_queryset=cr_queryset, find_queryset=find_queryset, warehouse_queryset=warehouse_queryset, no_geo=no_geo) # print("find") find_serialization( archive=archive, archive_name=archive_name, operation_queryset=operation_queryset, site_queryset=site_queryset, cr_queryset=cr_queryset, find_queryset=find_queryset, warehouse_queryset=warehouse_queryset, no_geo=no_geo) # print("warehouse") warehouse_serialization( archive=archive, archive_name=archive_name, operation_queryset=operation_queryset, site_queryset=site_queryset, cr_queryset=cr_queryset, find_queryset=find_queryset, warehouse_queryset=warehouse_queryset, no_geo=no_geo) return archive_name def restore_serialized(archive_name, delete_existing=False): with zipfile.ZipFile(archive_name, "r") as zip_file: # check version info = json.loads(zip_file.read("info.json").decode("utf-8")) if info["serialize-version"] != SERIALIZATION_VERSION: raise ValueError( "This dump version: {} is not managed by this Ishtar " "installation".format(info["serialize-version"]) ) DIRS = ( ("types", [None]), ("common_imports", IMPORT_MODEL_LIST), ("common_configuration", CONF_MODEL_LIST), ("common_geo", GEO_MODEL_LIST), ("common_directory", DIRECTORY_MODEL_LIST), ("documents", [models.Document]), ("operations", OPERATION_MODEL_LIST), ("context_records", CR_MODEL_LIST), ("warehouse", WAREHOUSE_MODEL_LIST), ("finds", FIND_MODEL_LIST), ) namelist = zip_file.namelist() for current_dir, model_list in DIRS: for current_model in model_list: for json_filename in namelist: path = json_filename.split(os.sep) if len(path) != 2 or path[0] != current_dir: continue model = get_model_from_filename(path[-1]) if current_model and current_model != model: continue if delete_existing: model.objects.all().delete() data = zip_file.read(json_filename).decode("utf-8") for obj in deserialize("json", data): obj.save() # restore media if "media.zip" in namelist: with tempfile.TemporaryDirectory() as tmp_dir_name: zip_file.extract("media.zip", tmp_dir_name) with zipfile.ZipFile( tmp_dir_name + os.sep + "media.zip", 'r') as media_zip: media_zip.extractall(settings.MEDIA_ROOT)