import json import os import tempfile import zipfile from rest_framework import serializers from zipfile import ZipFile from django.apps import apps from django.conf import settings from django.core.serializers import deserialize from . import models from ishtar_common.serializers_utils import generic_get_results, \ archive_serialization, generic_archive_files, SERIALIZATION_VERSION, \ get_model_from_filename from archaeological_operations.serializers import operation_serialization, \ OPERATION_MODEL_LIST from archaeological_context_records.serializers import cr_serialization, \ CR_MODEL_LIST from archaeological_finds.serializers import find_serialization, \ FIND_MODEL_LIST from archaeological_warehouse.serializers import warehouse_serialization, \ WAREHOUSE_MODEL_LIST class PublicSerializer(serializers.BaseSerializer): def to_representation(self, obj): return obj.public_representation() TYPE_MODEL_EXCLUDE = ["Area", "OperationTypeOld"] def get_type_models(): return [ model for model in apps.get_models() if isinstance(model(), models.GeneralType) and ( model.__name__ not in TYPE_MODEL_EXCLUDE) ] def type_serialization(archive=False, return_empty_types=False, archive_name=None, info=None): result = generic_get_results(get_type_models(), "types") return archive_serialization(result, archive_dir="types", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name, info=info) CONF_MODEL_LIST = [ models.IshtarSiteProfile, models.GlobalVar, models.CustomForm, models.ExcludedField, models.JsonDataSection, models.JsonDataField, models.CustomFormJsonField, models.ImporterModel, models.DocumentTemplate ] def conf_serialization(archive=False, return_empty_types=False, archive_name=None): media_archive = None if archive: media_archive = generic_archive_files(CONF_MODEL_LIST) result = generic_get_results(CONF_MODEL_LIST, "common_configuration") full_archive = archive_serialization( result, archive_dir="common_configuration", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name) if not media_archive: return full_archive with ZipFile(full_archive, 'a') as current_zip: current_zip.write(media_archive, arcname="media.zip") return full_archive IMPORT_MODEL_LIST = [ models.Regexp, models.ImporterModel, models.ImporterType, models.ValueFormater, models.ImporterColumn, models.FormaterType, models.ImporterDefault, models.ImporterDefaultValues, models.ImportTarget, models.ImporterDefaultValues, models.ImporterDuplicateField ] def importer_serialization(archive=False, return_empty_types=False, archive_name=None): result = generic_get_results(IMPORT_MODEL_LIST, "common_imports") full_archive = archive_serialization( result, archive_dir="common_imports", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name) return full_archive GEO_MODEL_LIST = [ models.State, models.Department, models.Town, models.Area ] def geo_serialization(archive=False, return_empty_types=False, archive_name=None, no_geo=True): result = generic_get_results(GEO_MODEL_LIST, "common_geo", no_geo=no_geo) full_archive = archive_serialization( result, archive_dir="common_geo", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name) return full_archive DIRECTORY_MODEL_LIST = [ models.Organization, models.Person, models.Author ] def directory_serialization(archive=False, return_empty_types=False, archive_name=None): result = generic_get_results(DIRECTORY_MODEL_LIST, "common_directory") full_archive = archive_serialization( result, archive_dir="common_directory", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name) return full_archive def document_serialization(archive=False, return_empty_types=False, archive_name=None, operation_queryset=None, site_queryset=None, cr_queryset=None, find_queryset=None, warehouse_queryset=None, put_locks=False, lock_user=None): result_queryset = {} get_queryset_attr = None if operation_queryset: get_queryset_attr = {"operation_queryset": operation_queryset, "get_queryset": True} elif site_queryset: get_queryset_attr = {"site_queryset": site_queryset, "get_queryset": True} elif cr_queryset: get_queryset_attr = {"cr_queryset": cr_queryset, "get_queryset": True} elif find_queryset: get_queryset_attr = {"find_queryset": find_queryset, "get_queryset": True} elif warehouse_queryset: get_queryset_attr = {"warehouse_queryset": warehouse_queryset, "get_queryset": True} if get_queryset_attr: queries = operation_serialization(**get_queryset_attr) queries.update(cr_serialization(**get_queryset_attr)) queries.update(find_serialization(**get_queryset_attr)) queries.update(warehouse_serialization(**get_queryset_attr)) document_ids = set() for model, attr in ( ("Operation", "operations"), ("ArchaeologicalSite", "sites"), ("ContextRecord", "context_records"), ("Find", "finds"), ("Warehouse", "warehouses"), ("Container", "containers")): values = list(queries[model].values_list("id", flat=True)) document_ids.update( models.Document.objects.filter( **{attr + "__id__in": values}).values_list( "id", flat=True)) result_queryset["Document"] = models.Document.objects.filter( id__in=document_ids) result = generic_get_results([models.Document], "documents", result_queryset=result_queryset) if put_locks: q = models.Document.objects if result_queryset: q = result_queryset["Document"] q.update(locked=True, lock_user=lock_user) media_archive = None if archive: media_archive = generic_archive_files([models.Document], result_queryset=result_queryset) full_archive = archive_serialization( result, archive_dir="documents", archive=archive, return_empty_types=return_empty_types, archive_name=archive_name) if not media_archive: return full_archive has_media = "media.zip" in ZipFile(full_archive, 'r').namelist() if not has_media: with ZipFile(full_archive, 'a') as current_zip: current_zip.write(media_archive, arcname="media.zip") os.remove(media_archive) return full_archive with tempfile.TemporaryDirectory() as tmp_dir_name: # extract the current archive current_zip = ZipFile(full_archive, 'r') name_list = current_zip.namelist() for name in name_list: current_zip.extract(name, tmp_dir_name) current_zip.close() # extract the media and recreate a media.zip old_media_archive = ZipFile( os.path.join(tmp_dir_name, "media.zip"), "r") with ZipFile(media_archive, "a") as new_zip: for name in old_media_archive.namelist(): new_zip.writestr( name, old_media_archive.open(name).read()) # rewrite the archive with ZipFile(full_archive + "_new", "w") as new_zip: for name in name_list: if name == "media.zip": continue new_zip.write(os.path.join(tmp_dir_name, name), arcname=name) new_zip.write(media_archive, arcname="media.zip") os.remove(media_archive) os.remove(full_archive) os.rename(full_archive + "_new", full_archive) return full_archive def full_serialization(operation_queryset=None, site_queryset=None, cr_queryset=None, find_queryset=None, warehouse_queryset=None, archive=True, no_geo=True, info=None, export_types=True, export_conf=True, export_importers=True, export_geo=True, export_dir=True, export_docs=True, export_items=True, put_locks=False, lock_user=None): archive_name = None if export_types: # print("type") archive_name = type_serialization(archive=archive, info=info) if export_conf: # print("conf") archive_name = conf_serialization(archive=archive, archive_name=archive_name) if export_importers: # print("importer") archive_name = importer_serialization(archive=archive, archive_name=archive_name) if export_geo: # print("geo") archive_name = geo_serialization( archive=archive, archive_name=archive_name, no_geo=no_geo) if export_dir: # print("directory") archive_name = directory_serialization(archive=archive, archive_name=archive_name) if export_docs: # print("document") archive_name = document_serialization( archive=archive, archive_name=archive_name, operation_queryset=operation_queryset, site_queryset=site_queryset, cr_queryset=cr_queryset, find_queryset=find_queryset, warehouse_queryset=warehouse_queryset, put_locks=put_locks, lock_user=lock_user ) if export_items: # print("operation") archive_name = operation_serialization( archive=archive, archive_name=archive_name, operation_queryset=operation_queryset, site_queryset=site_queryset, cr_queryset=cr_queryset, find_queryset=find_queryset, warehouse_queryset=warehouse_queryset, no_geo=no_geo, put_locks=put_locks, lock_user=lock_user) # print("cr") cr_serialization( archive=archive, archive_name=archive_name, operation_queryset=operation_queryset, site_queryset=site_queryset, cr_queryset=cr_queryset, find_queryset=find_queryset, warehouse_queryset=warehouse_queryset, no_geo=no_geo, put_locks=put_locks, lock_user=lock_user) # print("find") find_serialization( archive=archive, archive_name=archive_name, operation_queryset=operation_queryset, site_queryset=site_queryset, cr_queryset=cr_queryset, find_queryset=find_queryset, warehouse_queryset=warehouse_queryset, no_geo=no_geo, put_locks=put_locks, lock_user=lock_user) # print("warehouse") warehouse_serialization( archive=archive, archive_name=archive_name, operation_queryset=operation_queryset, site_queryset=site_queryset, cr_queryset=cr_queryset, find_queryset=find_queryset, warehouse_queryset=warehouse_queryset, no_geo=no_geo, put_locks=put_locks, lock_user=lock_user) return archive_name def restore_serialized(archive_name, user=None, delete_existing=False, release_locks=False): with zipfile.ZipFile(archive_name, "r") as zip_file: # check version info = json.loads(zip_file.read("info.json").decode("utf-8")) if info["serialize-version"] != SERIALIZATION_VERSION: raise ValueError( "This dump version: {} is not managed by this Ishtar " "installation".format(info["serialize-version"]) ) DIRS = ( ("types", [None]), ("common_imports", IMPORT_MODEL_LIST), ("common_configuration", CONF_MODEL_LIST), ("common_geo", GEO_MODEL_LIST), ("common_directory", DIRECTORY_MODEL_LIST), ("documents", [models.Document]), ("operations", OPERATION_MODEL_LIST), ("context_records", CR_MODEL_LIST), ("warehouse", WAREHOUSE_MODEL_LIST), ("finds", FIND_MODEL_LIST), ) result = [] namelist = zip_file.namelist() # restore media if "media.zip" in namelist: with tempfile.TemporaryDirectory() as tmp_dir_name: zip_file.extract("media.zip", tmp_dir_name) with zipfile.ZipFile( tmp_dir_name + os.sep + "media.zip", 'r') as media_zip: media_zip.extractall(settings.MEDIA_ROOT) for current_dir, model_list in DIRS: for current_model in model_list: for json_filename in namelist: path = json_filename.split(os.sep) if len(path) != 2 or path[0] != current_dir: continue model = get_model_from_filename(path[-1]) if current_model and current_model != model: continue if delete_existing: model.objects.all().delete() data = zip_file.read(json_filename).decode("utf-8") # regenerate labels, add a new version, etc. historized = hasattr(model, "history_modifier") and ( hasattr(model, "history_creator")) releasing_locks = hasattr(model, "locked") and ( release_locks) need_resave = hasattr(model, "CACHED_LABELS") or \ hasattr(model, "cached_label") or \ releasing_locks or (user and historized) idx = -1 for idx, obj in enumerate(deserialize("json", data)): extra_attrs = {} if historized or hasattr(model, "locked"): keys = obj.object.natural_key() old_obj = None try: old_obj = model.objects.get_by_natural_key( *keys) except model.DoesNotExist: pass if old_obj: if historized and (old_obj.history_creator or old_obj.history_modifier): extra_attrs = { "history_modifier_id": old_obj.history_modifier_id, "history_creator_id": old_obj.history_creator_id } if hasattr(model, "locked") and old_obj.locked: extra_attrs.update({ "locked": old_obj.locked, "lock_user": old_obj.lock_user, }) obj.save() if need_resave or extra_attrs: obj = model.objects.get(id=obj.object.id) if user: obj.history_modifier = user if extra_attrs and \ "history_creator_id" in extra_attrs: obj.history_creator_id = extra_attrs[ "history_creator_id"] else: obj.history_creator = user if extra_attrs and \ "locked" in extra_attrs: obj.locked = extra_attrs["locked"] obj.lock_user = extra_attrs["lock_user"] elif extra_attrs: for k in extra_attrs: setattr(obj, k, extra_attrs[k]) obj.skip_history_when_saving = True if releasing_locks: obj.locked = False obj.lock_user = None obj._no_move = True obj.save() if idx >= 0: result.append((model.__name__, idx + 1)) return result