import json import os import tempfile import zipfile from rest_framework import serializers from zipfile import ZipFile from django.apps import apps from django.conf import settings from django.core.serializers import deserialize from django.db.models.deletion import ProtectedError from django.contrib.auth.models import Group, Permission from . import models from .models_common import State, Department from archaeological_operations.models import ActType from ishtar_common.serializers_utils import ( generic_get_results, archive_serialization, generic_archive_files, SERIALIZATION_VERSION, get_model_from_filename, ) from archaeological_operations.serializers import ( operation_serialization, OPERATION_MODEL_LIST, ) from archaeological_context_records.serializers import cr_serialization, CR_MODEL_LIST from archaeological_finds.serializers import find_serialization, FIND_MODEL_LIST from archaeological_warehouse.serializers import ( warehouse_serialization, WAREHOUSE_MODEL_LIST, ) from django.contrib.contenttypes.management import create_contenttypes class PublicSerializer(serializers.BaseSerializer): def to_representation(self, obj): return obj.public_representation() TYPE_MODEL_EXCLUDE = ["Area", "OperationTypeOld", "ProfileTypeSummary"] def get_type_models(): return [Permission, Group] + [ model for model in apps.get_models() if isinstance(model(), models.GeneralType) and (model.__name__ not in TYPE_MODEL_EXCLUDE) ] def type_serialization( archive=False, return_empty_types=False, archive_name=None, info=None ): result = generic_get_results(get_type_models(), "types") return archive_serialization( result, archive_dir="types", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name, info=info, ) CONF_MODEL_LIST = [ models.IshtarSiteProfile, models.GlobalVar, models.CustomForm, models.ExcludedField, models.JsonDataSection, models.JsonDataField, models.CustomFormJsonField, models.ImporterModel, models.DocumentTemplate, ActType, ] CONF_SERIALIZATION_INCLUDE = {ActType.__name__: ["associated_template"]} def conf_serialization(archive=False, return_empty_types=False, archive_name=None): media_archive = None if archive: media_archive = generic_archive_files(CONF_MODEL_LIST) result = generic_get_results( CONF_MODEL_LIST, "common_configuration", serialization_include=CONF_SERIALIZATION_INCLUDE, ) full_archive = archive_serialization( result, archive_dir="common_configuration", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name, ) if not media_archive: return full_archive with ZipFile(full_archive, "a") as current_zip: current_zip.write(media_archive, arcname="media.zip") return full_archive IMPORT_MODEL_LIST = [ models.Regexp, models.ImporterModel, models.ImporterType, models.ValueFormater, models.ImporterColumn, models.FormaterType, models.ImporterDefault, models.ImporterDefaultValues, models.ImportTarget, models.ImporterDefaultValues, models.ImporterDuplicateField, models.ImporterGroup, models.ImporterGroupImporter, ] def importer_serialization(archive=False, return_empty_types=False, archive_name=None): result = generic_get_results(IMPORT_MODEL_LIST, "common_imports") full_archive = archive_serialization( result, archive_dir="common_imports", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name, ) return full_archive GEO_MODEL_LIST = [State, Department, models.Town, models.Area] def geo_serialization( archive=False, return_empty_types=False, archive_name=None, no_geo=True ): result = generic_get_results(GEO_MODEL_LIST, "common_geo", no_geo=no_geo) full_archive = archive_serialization( result, archive_dir="common_geo", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name, ) return full_archive DIRECTORY_MODEL_LIST = [models.Organization, models.Person, models.Author] def directory_serialization(archive=False, return_empty_types=False, archive_name=None): result = generic_get_results(DIRECTORY_MODEL_LIST, "common_directory") full_archive = archive_serialization( result, archive_dir="common_directory", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name, ) return full_archive def document_serialization( archive=False, return_empty_types=False, archive_name=None, operation_queryset=None, site_queryset=None, cr_queryset=None, find_queryset=None, warehouse_queryset=None, put_locks=False, lock_user=None, ): result_queryset = {} get_queryset_attr = None if operation_queryset: get_queryset_attr = { "operation_queryset": operation_queryset, "get_queryset": True, } elif site_queryset: get_queryset_attr = {"site_queryset": site_queryset, "get_queryset": True} elif cr_queryset: get_queryset_attr = {"cr_queryset": cr_queryset, "get_queryset": True} elif find_queryset: get_queryset_attr = {"find_queryset": find_queryset, "get_queryset": True} elif warehouse_queryset: get_queryset_attr = { "warehouse_queryset": warehouse_queryset, "get_queryset": True, } if get_queryset_attr: queries = operation_serialization(**get_queryset_attr) queries.update(cr_serialization(**get_queryset_attr)) queries.update(find_serialization(**get_queryset_attr)) queries.update(warehouse_serialization(**get_queryset_attr)) document_ids = set() for model, attr in ( ("Operation", "operations"), ("ArchaeologicalSite", "sites"), ("ContextRecord", "context_records"), ("Find", "finds"), ("Warehouse", "warehouses"), ("Container", "containers"), ): values = list(queries[model].values_list("id", flat=True)) document_ids.update( models.Document.objects.filter( **{attr + "__id__in": values} ).values_list("id", flat=True) ) result_queryset["Document"] = models.Document.objects.filter( id__in=document_ids ) result = generic_get_results( [models.Document], "documents", result_queryset=result_queryset ) if put_locks: q = models.Document.objects if result_queryset: q = result_queryset["Document"] q.update(locked=True, lock_user=lock_user) media_archive = None if archive: media_archive = generic_archive_files( [models.Document], result_queryset=result_queryset ) full_archive = archive_serialization( result, archive_dir="documents", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name, ) if not media_archive: return full_archive has_media = "media.zip" in ZipFile(full_archive, "r").namelist() if not has_media: with ZipFile(full_archive, "a") as current_zip: current_zip.write(media_archive, arcname="media.zip") os.remove(media_archive) return full_archive with tempfile.TemporaryDirectory() as tmp_dir_name: # extract the current archive current_zip = ZipFile(full_archive, "r") name_list = current_zip.namelist() for name in name_list: current_zip.extract(name, tmp_dir_name) current_zip.close() # extract the media and recreate a media.zip old_media_archive = ZipFile(os.path.join(tmp_dir_name, "media.zip"), "r") with ZipFile(media_archive, "a") as new_zip: for name in old_media_archive.namelist(): new_zip.writestr(name, old_media_archive.open(name).read()) # rewrite the archive with ZipFile(full_archive + "_new", "w") as new_zip: for name in name_list: if name == "media.zip": continue new_zip.write(os.path.join(tmp_dir_name, name), arcname=name) new_zip.write(media_archive, arcname="media.zip") os.remove(media_archive) os.remove(full_archive) os.rename(full_archive + "_new", full_archive) return full_archive def full_serialization( operation_queryset=None, site_queryset=None, cr_queryset=None, find_queryset=None, warehouse_queryset=None, archive=True, no_geo=True, info=None, export_types=True, export_conf=True, export_importers=True, export_geo=True, export_dir=True, export_docs=True, export_items=True, put_locks=False, lock_user=None, ): archive_name = None if export_types: # print("type") archive_name = type_serialization(archive=archive, info=info) if export_conf: # print("conf") archive_name = conf_serialization(archive=archive, archive_name=archive_name) if export_importers: # print("importer") archive_name = importer_serialization( archive=archive, archive_name=archive_name ) if export_geo: # print("geo") archive_name = geo_serialization( archive=archive, archive_name=archive_name, no_geo=no_geo ) if export_dir: # print("directory") archive_name = directory_serialization( archive=archive, archive_name=archive_name ) if export_docs: # print("document") archive_name = document_serialization( archive=archive, archive_name=archive_name, operation_queryset=operation_queryset, site_queryset=site_queryset, cr_queryset=cr_queryset, find_queryset=find_queryset, warehouse_queryset=warehouse_queryset, put_locks=put_locks, lock_user=lock_user, ) if export_items: # print("operation") archive_name = operation_serialization( archive=archive, archive_name=archive_name, operation_queryset=operation_queryset, site_queryset=site_queryset, cr_queryset=cr_queryset, find_queryset=find_queryset, warehouse_queryset=warehouse_queryset, no_geo=no_geo, put_locks=put_locks, lock_user=lock_user, ) # print("cr") cr_serialization( archive=archive, archive_name=archive_name, operation_queryset=operation_queryset, site_queryset=site_queryset, cr_queryset=cr_queryset, find_queryset=find_queryset, warehouse_queryset=warehouse_queryset, no_geo=no_geo, put_locks=put_locks, lock_user=lock_user, ) # print("find") find_serialization( archive=archive, archive_name=archive_name, operation_queryset=operation_queryset, site_queryset=site_queryset, cr_queryset=cr_queryset, find_queryset=find_queryset, warehouse_queryset=warehouse_queryset, no_geo=no_geo, put_locks=put_locks, lock_user=lock_user, ) # print("warehouse") warehouse_serialization( archive=archive, archive_name=archive_name, operation_queryset=operation_queryset, site_queryset=site_queryset, cr_queryset=cr_queryset, find_queryset=find_queryset, warehouse_queryset=warehouse_queryset, no_geo=no_geo, put_locks=put_locks, lock_user=lock_user, ) return archive_name def restore_serialized( archive_name, user=None, delete_existing=False, release_locks=False ): for app in apps.get_app_configs(): create_contenttypes(app, verbosity=1, interactive=False) with zipfile.ZipFile(archive_name, "r") as zip_file: # check version info = json.loads(zip_file.read("info.json").decode("utf-8")) if info["serialize-version"] != SERIALIZATION_VERSION: raise ValueError( "This dump version: {} is not managed by this Ishtar " "installation".format(info["serialize-version"]) ) DIRS = ( ("types", [None]), ("common_imports", IMPORT_MODEL_LIST), ("common_configuration", CONF_MODEL_LIST), ("common_geo", GEO_MODEL_LIST), ("common_directory", DIRECTORY_MODEL_LIST), ("documents", [models.Document]), ("operations", OPERATION_MODEL_LIST), ("context_records", CR_MODEL_LIST), ("warehouse", WAREHOUSE_MODEL_LIST), ("finds", FIND_MODEL_LIST), ) result = [] namelist = zip_file.namelist() # restore media if "media.zip" in namelist: with tempfile.TemporaryDirectory() as tmp_dir_name: zip_file.extract("media.zip", tmp_dir_name) with zipfile.ZipFile( tmp_dir_name + os.sep + "media.zip", "r" ) as media_zip: media_zip.extractall(settings.MEDIA_ROOT) for current_dir, model_list in DIRS: for current_model in model_list: for json_filename in namelist: path = json_filename.split(os.sep) if len(path) != 2 or path[0] != current_dir: continue model = get_model_from_filename(path[-1]) if not model or (current_model and current_model != model): continue if delete_existing: try: model.objects.all().delete() deleted = True except ProtectedError: deleted = False if not deleted: # only delete not referenced for item in model.objects.all(): try: item.delete() except ProtectedError: pass data = zip_file.read(json_filename).decode("utf-8") # regenerate labels, add a new version, etc. historized = hasattr(model, "history_modifier") and ( hasattr(model, "history_creator") ) releasing_locks = hasattr(model, "locked") and (release_locks) need_resave = ( hasattr(model, "CACHED_LABELS") or hasattr(model, "cached_label") or releasing_locks or (user and historized) ) idx = -1 for idx, obj in enumerate(deserialize("json", data)): extra_attrs = {} if historized or hasattr(model, "locked"): keys = obj.object.natural_key() old_obj = None try: old_obj = model.objects.get_by_natural_key(*keys) except model.DoesNotExist: pass if old_obj: if historized and ( old_obj.history_creator or old_obj.history_modifier ): extra_attrs = { "history_modifier_id": old_obj.history_modifier_id, "history_creator_id": old_obj.history_creator_id, } if hasattr(model, "locked") and old_obj.locked: extra_attrs.update( { "locked": old_obj.locked, "lock_user": old_obj.lock_user, } ) obj.save() if need_resave or extra_attrs: obj = model.objects.get(id=obj.object.id) if user: obj.history_modifier = user if extra_attrs and "history_creator_id" in extra_attrs: obj.history_creator_id = extra_attrs[ "history_creator_id" ] else: obj.history_creator = user if extra_attrs and "locked" in extra_attrs: obj.locked = extra_attrs["locked"] obj.lock_user = extra_attrs["lock_user"] elif extra_attrs: for k in extra_attrs: setattr(obj, k, extra_attrs[k]) obj.skip_history_when_saving = True if releasing_locks: obj.locked = False obj.lock_user = None obj._no_move = True obj.save() if idx >= 0: result.append((model.__name__, idx + 1)) return result