summaryrefslogtreecommitdiff
path: root/ishtar_common/serializers.py
diff options
context:
space:
mode:
authorÉtienne Loks <etienne.loks@iggdrasil.net>2021-03-19 11:05:22 +0100
committerÉtienne Loks <etienne.loks@iggdrasil.net>2021-03-19 11:05:22 +0100
commite2d6c50f231f636fed362be37e7bf3319fc5d6b8 (patch)
tree5d7fde3628825aebeeef3d85d2dfcf09a52116de /ishtar_common/serializers.py
parente6af0225df8f539308bc3fd8c9dbc967bba5a807 (diff)
downloadIshtar-e2d6c50f231f636fed362be37e7bf3319fc5d6b8.tar.bz2
Ishtar-e2d6c50f231f636fed362be37e7bf3319fc5d6b8.zip
Format - black: ishtar_common
Diffstat (limited to 'ishtar_common/serializers.py')
-rw-r--r--ishtar_common/serializers.py401
1 files changed, 249 insertions, 152 deletions
diff --git a/ishtar_common/serializers.py b/ishtar_common/serializers.py
index 55989adcb..507173642 100644
--- a/ishtar_common/serializers.py
+++ b/ishtar_common/serializers.py
@@ -9,25 +9,30 @@ from django.apps import apps
from django.conf import settings
from django.core.serializers import deserialize
-from django.contrib.contenttypes.models import ContentType
from django.contrib.auth.models import Group, Permission
from . import models
from .models_common import State, Department
from archaeological_operations.models import ActType
-from ishtar_common.serializers_utils import generic_get_results, \
- archive_serialization, generic_archive_files, SERIALIZATION_VERSION, \
- get_model_from_filename
-
-from archaeological_operations.serializers import operation_serialization, \
- OPERATION_MODEL_LIST
-from archaeological_context_records.serializers import cr_serialization, \
- CR_MODEL_LIST
-from archaeological_finds.serializers import find_serialization, \
- FIND_MODEL_LIST
-from archaeological_warehouse.serializers import warehouse_serialization, \
- WAREHOUSE_MODEL_LIST
+from ishtar_common.serializers_utils import (
+ generic_get_results,
+ archive_serialization,
+ generic_archive_files,
+ SERIALIZATION_VERSION,
+ get_model_from_filename,
+)
+
+from archaeological_operations.serializers import (
+ operation_serialization,
+ OPERATION_MODEL_LIST,
+)
+from archaeological_context_records.serializers import cr_serialization, CR_MODEL_LIST
+from archaeological_finds.serializers import find_serialization, FIND_MODEL_LIST
+from archaeological_warehouse.serializers import (
+ warehouse_serialization,
+ WAREHOUSE_MODEL_LIST,
+)
from django.contrib.contenttypes.management import create_contenttypes
@@ -42,116 +47,155 @@ TYPE_MODEL_EXCLUDE = ["Area", "OperationTypeOld", "ProfileTypeSummary"]
def get_type_models():
return [Permission, Group] + [
- model for model in apps.get_models()
- if isinstance(model(), models.GeneralType) and (
- model.__name__ not in TYPE_MODEL_EXCLUDE)
+ model
+ for model in apps.get_models()
+ if isinstance(model(), models.GeneralType)
+ and (model.__name__ not in TYPE_MODEL_EXCLUDE)
]
-def type_serialization(archive=False, return_empty_types=False,
- archive_name=None, info=None):
+def type_serialization(
+ archive=False, return_empty_types=False, archive_name=None, info=None
+):
result = generic_get_results(get_type_models(), "types")
- return archive_serialization(result, archive_dir="types", archive=archive,
- return_empty_types=return_empty_types,
- archive_name=archive_name, info=info)
+ return archive_serialization(
+ result,
+ archive_dir="types",
+ archive=archive,
+ return_empty_types=return_empty_types,
+ archive_name=archive_name,
+ info=info,
+ )
CONF_MODEL_LIST = [
- models.IshtarSiteProfile, models.GlobalVar, models.CustomForm,
- models.ExcludedField, models.JsonDataSection, models.JsonDataField,
- models.CustomFormJsonField, models.ImporterModel,
- models.DocumentTemplate, ActType
+ models.IshtarSiteProfile,
+ models.GlobalVar,
+ models.CustomForm,
+ models.ExcludedField,
+ models.JsonDataSection,
+ models.JsonDataField,
+ models.CustomFormJsonField,
+ models.ImporterModel,
+ models.DocumentTemplate,
+ ActType,
]
CONF_SERIALIZATION_INCLUDE = {ActType.__name__: ["associated_template"]}
-def conf_serialization(archive=False, return_empty_types=False,
- archive_name=None):
+def conf_serialization(archive=False, return_empty_types=False, archive_name=None):
media_archive = None
if archive:
media_archive = generic_archive_files(CONF_MODEL_LIST)
result = generic_get_results(
- CONF_MODEL_LIST, "common_configuration",
- serialization_include=CONF_SERIALIZATION_INCLUDE)
+ CONF_MODEL_LIST,
+ "common_configuration",
+ serialization_include=CONF_SERIALIZATION_INCLUDE,
+ )
full_archive = archive_serialization(
- result, archive_dir="common_configuration", archive=archive,
- return_empty_types=return_empty_types, archive_name=archive_name)
+ result,
+ archive_dir="common_configuration",
+ archive=archive,
+ return_empty_types=return_empty_types,
+ archive_name=archive_name,
+ )
if not media_archive:
return full_archive
- with ZipFile(full_archive, 'a') as current_zip:
+ with ZipFile(full_archive, "a") as current_zip:
current_zip.write(media_archive, arcname="media.zip")
return full_archive
IMPORT_MODEL_LIST = [
- models.Regexp, models.ImporterModel, models.ImporterType,
- models.ValueFormater, models.ImporterColumn,
- models.FormaterType, models.ImporterDefault, models.ImporterDefaultValues,
- models.ImportTarget, models.ImporterDefaultValues,
- models.ImporterDuplicateField
+ models.Regexp,
+ models.ImporterModel,
+ models.ImporterType,
+ models.ValueFormater,
+ models.ImporterColumn,
+ models.FormaterType,
+ models.ImporterDefault,
+ models.ImporterDefaultValues,
+ models.ImportTarget,
+ models.ImporterDefaultValues,
+ models.ImporterDuplicateField,
]
-def importer_serialization(archive=False, return_empty_types=False,
- archive_name=None):
+def importer_serialization(archive=False, return_empty_types=False, archive_name=None):
result = generic_get_results(IMPORT_MODEL_LIST, "common_imports")
full_archive = archive_serialization(
- result, archive_dir="common_imports", archive=archive,
- return_empty_types=return_empty_types, archive_name=archive_name)
+ result,
+ archive_dir="common_imports",
+ archive=archive,
+ return_empty_types=return_empty_types,
+ archive_name=archive_name,
+ )
return full_archive
-GEO_MODEL_LIST = [
- State, Department, models.Town, models.Area
-]
+GEO_MODEL_LIST = [State, Department, models.Town, models.Area]
-def geo_serialization(archive=False, return_empty_types=False,
- archive_name=None, no_geo=True):
+def geo_serialization(
+ archive=False, return_empty_types=False, archive_name=None, no_geo=True
+):
result = generic_get_results(GEO_MODEL_LIST, "common_geo", no_geo=no_geo)
full_archive = archive_serialization(
- result, archive_dir="common_geo", archive=archive,
- return_empty_types=return_empty_types, archive_name=archive_name)
+ result,
+ archive_dir="common_geo",
+ archive=archive,
+ return_empty_types=return_empty_types,
+ archive_name=archive_name,
+ )
return full_archive
-DIRECTORY_MODEL_LIST = [
- models.Organization, models.Person, models.Author
-]
+DIRECTORY_MODEL_LIST = [models.Organization, models.Person, models.Author]
-def directory_serialization(archive=False, return_empty_types=False,
- archive_name=None):
+def directory_serialization(archive=False, return_empty_types=False, archive_name=None):
result = generic_get_results(DIRECTORY_MODEL_LIST, "common_directory")
full_archive = archive_serialization(
- result, archive_dir="common_directory", archive=archive,
- return_empty_types=return_empty_types, archive_name=archive_name)
+ result,
+ archive_dir="common_directory",
+ archive=archive,
+ return_empty_types=return_empty_types,
+ archive_name=archive_name,
+ )
return full_archive
-def document_serialization(archive=False, return_empty_types=False,
- archive_name=None, operation_queryset=None,
- site_queryset=None, cr_queryset=None,
- find_queryset=None, warehouse_queryset=None,
- put_locks=False, lock_user=None):
+def document_serialization(
+ archive=False,
+ return_empty_types=False,
+ archive_name=None,
+ operation_queryset=None,
+ site_queryset=None,
+ cr_queryset=None,
+ find_queryset=None,
+ warehouse_queryset=None,
+ put_locks=False,
+ lock_user=None,
+):
result_queryset = {}
get_queryset_attr = None
if operation_queryset:
- get_queryset_attr = {"operation_queryset": operation_queryset,
- "get_queryset": True}
+ get_queryset_attr = {
+ "operation_queryset": operation_queryset,
+ "get_queryset": True,
+ }
elif site_queryset:
- get_queryset_attr = {"site_queryset": site_queryset,
- "get_queryset": True}
+ get_queryset_attr = {"site_queryset": site_queryset, "get_queryset": True}
elif cr_queryset:
- get_queryset_attr = {"cr_queryset": cr_queryset,
- "get_queryset": True}
+ get_queryset_attr = {"cr_queryset": cr_queryset, "get_queryset": True}
elif find_queryset:
- get_queryset_attr = {"find_queryset": find_queryset,
- "get_queryset": True}
+ get_queryset_attr = {"find_queryset": find_queryset, "get_queryset": True}
elif warehouse_queryset:
- get_queryset_attr = {"warehouse_queryset": warehouse_queryset,
- "get_queryset": True}
+ get_queryset_attr = {
+ "warehouse_queryset": warehouse_queryset,
+ "get_queryset": True,
+ }
if get_queryset_attr:
queries = operation_serialization(**get_queryset_attr)
@@ -160,22 +204,26 @@ def document_serialization(archive=False, return_empty_types=False,
queries.update(warehouse_serialization(**get_queryset_attr))
document_ids = set()
for model, attr in (
- ("Operation", "operations"),
- ("ArchaeologicalSite", "sites"),
- ("ContextRecord", "context_records"),
- ("Find", "finds"),
- ("Warehouse", "warehouses"),
- ("Container", "containers")):
+ ("Operation", "operations"),
+ ("ArchaeologicalSite", "sites"),
+ ("ContextRecord", "context_records"),
+ ("Find", "finds"),
+ ("Warehouse", "warehouses"),
+ ("Container", "containers"),
+ ):
values = list(queries[model].values_list("id", flat=True))
document_ids.update(
models.Document.objects.filter(
- **{attr + "__id__in": values}).values_list(
- "id", flat=True))
+ **{attr + "__id__in": values}
+ ).values_list("id", flat=True)
+ )
result_queryset["Document"] = models.Document.objects.filter(
- id__in=document_ids)
+ id__in=document_ids
+ )
- result = generic_get_results([models.Document], "documents",
- result_queryset=result_queryset)
+ result = generic_get_results(
+ [models.Document], "documents", result_queryset=result_queryset
+ )
if put_locks:
q = models.Document.objects
if result_queryset:
@@ -184,36 +232,39 @@ def document_serialization(archive=False, return_empty_types=False,
media_archive = None
if archive:
- media_archive = generic_archive_files([models.Document],
- result_queryset=result_queryset)
+ media_archive = generic_archive_files(
+ [models.Document], result_queryset=result_queryset
+ )
full_archive = archive_serialization(
- result, archive_dir="documents", archive=archive,
- return_empty_types=return_empty_types, archive_name=archive_name)
+ result,
+ archive_dir="documents",
+ archive=archive,
+ return_empty_types=return_empty_types,
+ archive_name=archive_name,
+ )
if not media_archive:
return full_archive
- has_media = "media.zip" in ZipFile(full_archive, 'r').namelist()
+ has_media = "media.zip" in ZipFile(full_archive, "r").namelist()
if not has_media:
- with ZipFile(full_archive, 'a') as current_zip:
+ with ZipFile(full_archive, "a") as current_zip:
current_zip.write(media_archive, arcname="media.zip")
os.remove(media_archive)
return full_archive
with tempfile.TemporaryDirectory() as tmp_dir_name:
# extract the current archive
- current_zip = ZipFile(full_archive, 'r')
+ current_zip = ZipFile(full_archive, "r")
name_list = current_zip.namelist()
for name in name_list:
current_zip.extract(name, tmp_dir_name)
current_zip.close()
# extract the media and recreate a media.zip
- old_media_archive = ZipFile(
- os.path.join(tmp_dir_name, "media.zip"), "r")
+ old_media_archive = ZipFile(os.path.join(tmp_dir_name, "media.zip"), "r")
with ZipFile(media_archive, "a") as new_zip:
for name in old_media_archive.namelist():
- new_zip.writestr(
- name, old_media_archive.open(name).read())
+ new_zip.writestr(name, old_media_archive.open(name).read())
# rewrite the archive
with ZipFile(full_archive + "_new", "w") as new_zip:
@@ -228,76 +279,119 @@ def document_serialization(archive=False, return_empty_types=False,
return full_archive
-def full_serialization(operation_queryset=None, site_queryset=None,
- cr_queryset=None, find_queryset=None,
- warehouse_queryset=None, archive=True, no_geo=True,
- info=None, export_types=True, export_conf=True,
- export_importers=True, export_geo=True, export_dir=True,
- export_docs=True, export_items=True, put_locks=False,
- lock_user=None):
+def full_serialization(
+ operation_queryset=None,
+ site_queryset=None,
+ cr_queryset=None,
+ find_queryset=None,
+ warehouse_queryset=None,
+ archive=True,
+ no_geo=True,
+ info=None,
+ export_types=True,
+ export_conf=True,
+ export_importers=True,
+ export_geo=True,
+ export_dir=True,
+ export_docs=True,
+ export_items=True,
+ put_locks=False,
+ lock_user=None,
+):
archive_name = None
if export_types:
# print("type")
archive_name = type_serialization(archive=archive, info=info)
if export_conf:
# print("conf")
- archive_name = conf_serialization(archive=archive,
- archive_name=archive_name)
+ archive_name = conf_serialization(archive=archive, archive_name=archive_name)
if export_importers:
# print("importer")
- archive_name = importer_serialization(archive=archive,
- archive_name=archive_name)
+ archive_name = importer_serialization(
+ archive=archive, archive_name=archive_name
+ )
if export_geo:
# print("geo")
archive_name = geo_serialization(
- archive=archive, archive_name=archive_name, no_geo=no_geo)
+ archive=archive, archive_name=archive_name, no_geo=no_geo
+ )
if export_dir:
# print("directory")
- archive_name = directory_serialization(archive=archive,
- archive_name=archive_name)
+ archive_name = directory_serialization(
+ archive=archive, archive_name=archive_name
+ )
if export_docs:
# print("document")
archive_name = document_serialization(
- archive=archive, archive_name=archive_name,
- operation_queryset=operation_queryset, site_queryset=site_queryset,
- cr_queryset=cr_queryset, find_queryset=find_queryset,
+ archive=archive,
+ archive_name=archive_name,
+ operation_queryset=operation_queryset,
+ site_queryset=site_queryset,
+ cr_queryset=cr_queryset,
+ find_queryset=find_queryset,
warehouse_queryset=warehouse_queryset,
- put_locks=put_locks, lock_user=lock_user
+ put_locks=put_locks,
+ lock_user=lock_user,
)
if export_items:
# print("operation")
archive_name = operation_serialization(
archive=archive,
- archive_name=archive_name, operation_queryset=operation_queryset,
- site_queryset=site_queryset, cr_queryset=cr_queryset,
- find_queryset=find_queryset, warehouse_queryset=warehouse_queryset,
- no_geo=no_geo, put_locks=put_locks, lock_user=lock_user)
+ archive_name=archive_name,
+ operation_queryset=operation_queryset,
+ site_queryset=site_queryset,
+ cr_queryset=cr_queryset,
+ find_queryset=find_queryset,
+ warehouse_queryset=warehouse_queryset,
+ no_geo=no_geo,
+ put_locks=put_locks,
+ lock_user=lock_user,
+ )
# print("cr")
cr_serialization(
archive=archive,
- archive_name=archive_name, operation_queryset=operation_queryset,
- site_queryset=site_queryset, cr_queryset=cr_queryset,
- find_queryset=find_queryset, warehouse_queryset=warehouse_queryset,
- no_geo=no_geo, put_locks=put_locks, lock_user=lock_user)
+ archive_name=archive_name,
+ operation_queryset=operation_queryset,
+ site_queryset=site_queryset,
+ cr_queryset=cr_queryset,
+ find_queryset=find_queryset,
+ warehouse_queryset=warehouse_queryset,
+ no_geo=no_geo,
+ put_locks=put_locks,
+ lock_user=lock_user,
+ )
# print("find")
find_serialization(
archive=archive,
- archive_name=archive_name, operation_queryset=operation_queryset,
- site_queryset=site_queryset, cr_queryset=cr_queryset,
- find_queryset=find_queryset, warehouse_queryset=warehouse_queryset,
- no_geo=no_geo, put_locks=put_locks, lock_user=lock_user)
+ archive_name=archive_name,
+ operation_queryset=operation_queryset,
+ site_queryset=site_queryset,
+ cr_queryset=cr_queryset,
+ find_queryset=find_queryset,
+ warehouse_queryset=warehouse_queryset,
+ no_geo=no_geo,
+ put_locks=put_locks,
+ lock_user=lock_user,
+ )
# print("warehouse")
warehouse_serialization(
archive=archive,
- archive_name=archive_name, operation_queryset=operation_queryset,
- site_queryset=site_queryset, cr_queryset=cr_queryset,
- find_queryset=find_queryset, warehouse_queryset=warehouse_queryset,
- no_geo=no_geo, put_locks=put_locks, lock_user=lock_user)
+ archive_name=archive_name,
+ operation_queryset=operation_queryset,
+ site_queryset=site_queryset,
+ cr_queryset=cr_queryset,
+ find_queryset=find_queryset,
+ warehouse_queryset=warehouse_queryset,
+ no_geo=no_geo,
+ put_locks=put_locks,
+ lock_user=lock_user,
+ )
return archive_name
-def restore_serialized(archive_name, user=None, delete_existing=False,
- release_locks=False):
+def restore_serialized(
+ archive_name, user=None, delete_existing=False, release_locks=False
+):
for app in apps.get_app_configs():
create_contenttypes(app, verbosity=1, interactive=False)
@@ -330,7 +424,8 @@ def restore_serialized(archive_name, user=None, delete_existing=False,
with tempfile.TemporaryDirectory() as tmp_dir_name:
zip_file.extract("media.zip", tmp_dir_name)
with zipfile.ZipFile(
- tmp_dir_name + os.sep + "media.zip", 'r') as media_zip:
+ tmp_dir_name + os.sep + "media.zip", "r"
+ ) as media_zip:
media_zip.extractall(settings.MEDIA_ROOT)
for current_dir, model_list in DIRS:
@@ -347,12 +442,15 @@ def restore_serialized(archive_name, user=None, delete_existing=False,
data = zip_file.read(json_filename).decode("utf-8")
# regenerate labels, add a new version, etc.
historized = hasattr(model, "history_modifier") and (
- hasattr(model, "history_creator"))
- releasing_locks = hasattr(model, "locked") and (
- release_locks)
- need_resave = hasattr(model, "CACHED_LABELS") or \
- hasattr(model, "cached_label") or \
- releasing_locks or (user and historized)
+ hasattr(model, "history_creator")
+ )
+ releasing_locks = hasattr(model, "locked") and (release_locks)
+ need_resave = (
+ hasattr(model, "CACHED_LABELS")
+ or hasattr(model, "cached_label")
+ or releasing_locks
+ or (user and historized)
+ )
idx = -1
for idx, obj in enumerate(deserialize("json", data)):
extra_attrs = {}
@@ -360,37 +458,36 @@ def restore_serialized(archive_name, user=None, delete_existing=False,
keys = obj.object.natural_key()
old_obj = None
try:
- old_obj = model.objects.get_by_natural_key(
- *keys)
+ old_obj = model.objects.get_by_natural_key(*keys)
except model.DoesNotExist:
pass
if old_obj:
- if historized and (old_obj.history_creator or
- old_obj.history_modifier):
+ if historized and (
+ old_obj.history_creator or old_obj.history_modifier
+ ):
extra_attrs = {
- "history_modifier_id":
- old_obj.history_modifier_id,
- "history_creator_id":
- old_obj.history_creator_id
+ "history_modifier_id": old_obj.history_modifier_id,
+ "history_creator_id": old_obj.history_creator_id,
}
if hasattr(model, "locked") and old_obj.locked:
- extra_attrs.update({
- "locked": old_obj.locked,
- "lock_user": old_obj.lock_user,
- })
+ extra_attrs.update(
+ {
+ "locked": old_obj.locked,
+ "lock_user": old_obj.lock_user,
+ }
+ )
obj.save()
if need_resave or extra_attrs:
obj = model.objects.get(id=obj.object.id)
if user:
obj.history_modifier = user
- if extra_attrs and \
- "history_creator_id" in extra_attrs:
+ if extra_attrs and "history_creator_id" in extra_attrs:
obj.history_creator_id = extra_attrs[
- "history_creator_id"]
+ "history_creator_id"
+ ]
else:
obj.history_creator = user
- if extra_attrs and \
- "locked" in extra_attrs:
+ if extra_attrs and "locked" in extra_attrs:
obj.locked = extra_attrs["locked"]
obj.lock_user = extra_attrs["lock_user"]
elif extra_attrs: