summaryrefslogtreecommitdiff
path: root/ishtar_common
diff options
context:
space:
mode:
authorÉtienne Loks <etienne.loks@iggdrasil.net>2019-09-07 18:48:54 +0200
committerÉtienne Loks <etienne.loks@iggdrasil.net>2019-09-07 18:48:54 +0200
commit1bc05fe8c33817b7eb16f8fdc49407f23daa9af9 (patch)
treefb6373f59ae98764019d7ec7ff964d6c60dacd83 /ishtar_common
parent2308e167713697748a087d415bf9cc2ca29e6e6f (diff)
downloadIshtar-1bc05fe8c33817b7eb16f8fdc49407f23daa9af9.tar.bz2
Ishtar-1bc05fe8c33817b7eb16f8fdc49407f23daa9af9.zip
Serialization: manage documenst (with filters) - refactoring
Diffstat (limited to 'ishtar_common')
-rw-r--r--ishtar_common/models.py1
-rw-r--r--ishtar_common/serializers.py260
-rw-r--r--ishtar_common/serializers_utils.py203
-rw-r--r--ishtar_common/tests.py208
4 files changed, 479 insertions, 193 deletions
diff --git a/ishtar_common/models.py b/ishtar_common/models.py
index 2898c7934..0a5fc30bf 100644
--- a/ishtar_common/models.py
+++ b/ishtar_common/models.py
@@ -5102,6 +5102,7 @@ class Document(BaseHistorizedItem, OwnPerms, ImageModel, ValueGetter, MainItem):
QUICK_ACTIONS = [
QA_EDIT
]
+ SERIALIZATION_FILES = ["image", "thumbnail", "associated_file"]
title = models.TextField(_("Title"), blank=True, default='')
associated_file = models.FileField(
diff --git a/ishtar_common/serializers.py b/ishtar_common/serializers.py
index 034828835..c6cd118c7 100644
--- a/ishtar_common/serializers.py
+++ b/ishtar_common/serializers.py
@@ -1,8 +1,4 @@
-from collections import OrderedDict
-from copy import deepcopy
-import datetime
import json
-import importlib
import os
import tempfile
import zipfile
@@ -11,13 +7,19 @@ from zipfile import ZipFile
from django.apps import apps
from django.conf import settings
-from django.contrib.sites.models import Site
-from django.core.serializers import deserialize, serialize
+from django.core.serializers import deserialize
from django.db.models import Q
-from version import get_version
from . import models
-from archaeological_warehouse import models as warehouse_models
+
+from ishtar_common.serializers_utils import generic_get_results, \
+ archive_serialization, generic_archive_files, SERIALIZATION_VERSION, \
+ get_model_from_filename
+
+from archaeological_operations.serializers import operation_serialization
+from archaeological_context_records.serializers import cr_serialization
+from archaeological_finds.serializers import find_serialization
+from archaeological_warehouse.serializers import warehouse_serialization
class PublicSerializer(serializers.BaseSerializer):
@@ -25,189 +27,6 @@ class PublicSerializer(serializers.BaseSerializer):
return obj.public_representation()
-SERIALIZATION_VERSION = "1.0"
-
-
-def get_model_from_filename(filename):
- filename = filename.split(".")[0] # remove extension
- module_name, model_name = filename.split("__")
- module = importlib.import_module(module_name + ".models")
- return getattr(module, model_name)
-
-
-def serialization_info():
- site = Site.objects.get_current()
- return {
- "serialize-version": SERIALIZATION_VERSION,
- "ishtar-version": get_version(),
- "domain": site.domain,
- "name": site.name,
- "date": datetime.datetime.now().isoformat()
- }
-
-
-def archive_serialization(result, archive_dir=None, archive=False,
- return_empty_types=False, archive_name=None):
- """
- Serialize all types models to JSON
- Used for import and export scripts
-
- :param result: serialization results
- :param archive_dir: directory inside the archive (default None)
- :param return_empty_types: if True instead of serialization return empty
- types (default False)
- :param archive: if True return a zip file containing all the file serialized
- (default False)
- :param archive_name: path to the archive if not provided a new archive is
- created
- :return: string containing the json serialization of types unless
- return_empty_types or archive is set to True
- """
- if archive and return_empty_types:
- raise ValueError("archive and return_empty_types are incompatible")
- if return_empty_types:
- return [k for k in result if not result[k]]
- if not archive:
- return result
- archive_created = False
- if not archive_name:
- archive_created = True
- tmpdir = tempfile.mkdtemp(prefix="ishtarexport-") + os.sep
- archive_name = tmpdir + "ishtar-{}.zip".format(
- datetime.date.today().strftime("%Y-%m-%d")
- )
- if not archive_name.endswith(".zip"):
- archive_name += ".zip"
- mode = "w" if archive_created else "a"
- with tempfile.TemporaryDirectory() as tmpdirname:
- if archive_dir:
- os.mkdir(tmpdirname + os.sep + archive_dir)
-
- with ZipFile(archive_name, mode) as current_zip:
- if archive_created:
- base_filename = "info.json"
- filename = tmpdirname + os.sep + base_filename
- with open(filename, "w") as json_file:
- json_file.write(
- json.dumps(serialization_info(), indent=2)
- )
- current_zip.write(filename, arcname=base_filename)
-
- for dir_name, model_name in result:
- base_filename = model_name + ".json"
- filename = tmpdirname + os.sep + base_filename
- with open(filename, "w") as json_file:
- json_file.write(result[(dir_name, model_name)])
- arcname = base_filename
- if dir_name:
- arcname = dir_name + os.sep + base_filename
- current_zip.write(filename, arcname=arcname)
- return archive_name
-
-
-def generic_get_results(model_list, dirname, no_geo=True,
- result_queryset=None):
- result = OrderedDict()
- for model in model_list:
- base_model_name = model.__name__
- model_name = str(model.__module__).split(".")[0] + "__" + \
- base_model_name
-
- if result_queryset and base_model_name in result_queryset:
- base_q = result_queryset[base_model_name]
- else:
- base_q = model.objects
- q = base_q
- recursion = None
- if hasattr(model, "parent"):
- recursion = "parent"
- elif hasattr(model, "inverse_relation"):
- recursion = "inverse_relation"
- elif hasattr(model, "children"):
- recursion = "children__id"
- if recursion:
- q = q.filter(**{recursion + "__isnull": True})
-
- key = (dirname, model_name)
- result[key] = serialize(
- "json", q.distinct().all(),
- indent=2,
- use_natural_foreign_keys=True, use_natural_primary_keys=True,
- )
-
- if recursion:
- serialized = [item["id"] for item in q.values("id").all()]
- recursion_in = recursion
- if not recursion.endswith("_id"):
- recursion_in += "_id"
- recursion_in += "__in"
- q = base_q.filter(**{recursion_in: serialized}
- ).exclude(id__in=serialized)
- while q.count():
- v = serialize(
- "json", q.all(), indent=2, use_natural_foreign_keys=True,
- use_natural_primary_keys=True)
- new_result = json.loads(result[key])
- new_result += json.loads(v)
- result[key] = json.dumps(new_result, indent=2)
- serialized += [item["id"] for item in q.values("id").all()]
- q = base_q.filter(**{recursion_in: serialized}
- ).exclude(id__in=serialized)
- # managed circular
- q = base_q.exclude(id__in=serialized)
- if q.count():
- v = serialize(
- "json", q.all(), indent=2, use_natural_foreign_keys=True,
- use_natural_primary_keys=True)
- result_to_add = json.loads(v)
- result_cleaned = deepcopy(result_to_add)
- for res in result_cleaned: # first add with no recursion
- res["fields"][recursion] = None
- new_result = json.loads(result[key])
- new_result += result_cleaned
- new_result += result_to_add
- result[key] = json.dumps(new_result, indent=2)
-
- excluded_fields = ["history_modifier", "history_creator", "imports"]
- if hasattr(model, "SERIALIZATION_EXCLUDE"):
- excluded_fields = list(model.SERIALIZATION_EXCLUDE)
- if no_geo:
- excluded_fields += ["center", "limit"] + [
- field.name for field in models.GeoItem._meta.get_fields()
- ]
- if excluded_fields:
- new_result = json.loads(result[key])
- for idx in range(len(new_result)):
- for excluded_field in excluded_fields:
- if excluded_field in new_result[idx]["fields"]:
- new_result[idx]["fields"].pop(excluded_field)
- result[key] = json.dumps(new_result, indent=2)
- return result
-
-
-def generic_archive_files(model_list, archive_name=None):
- result = []
- for model in model_list:
- if hasattr(model, "SERIALIZATION_FILES"):
- for item in model.objects.all():
- for attr in model.SERIALIZATION_FILES:
- media = getattr(item, attr)
- result.append((media.path, media.name))
-
- archive_created = False
- if not archive_name:
- archive_created = True
- tmpdir = tempfile.mkdtemp(prefix="ishtarexport-") + os.sep
- archive_name = tmpdir + "media.zip"
- if not archive_name.endswith(".zip"):
- archive_name += ".zip"
- mode = "w" if archive_created else "a"
- with ZipFile(archive_name, mode) as current_zip:
- for media_path, name in result:
- current_zip.write(media_path, arcname=name)
- return archive_name
-
-
TYPE_MODEL_EXCLUDE = ["Area", "OperationTypeOld"]
@@ -294,6 +113,65 @@ def directory_serialization(archive=False, return_empty_types=False,
return full_archive
+def document_serialization(archive=False, return_empty_types=False,
+ archive_name=None, operation_queryset=None,
+ site_queryset=None, cr_queryset=None,
+ find_queryset=None, warehouse_queryset=None):
+ result_queryset = {}
+ get_queryset_attr = None
+ if operation_queryset:
+ get_queryset_attr = {"operation_queryset": operation_queryset,
+ "get_queryset": True}
+ elif site_queryset:
+ get_queryset_attr = {"site_queryset": site_queryset,
+ "get_queryset": True}
+ elif cr_queryset:
+ get_queryset_attr = {"cr_queryset": cr_queryset,
+ "get_queryset": True}
+ elif find_queryset:
+ get_queryset_attr = {"find_queryset": find_queryset,
+ "get_queryset": True}
+ elif warehouse_queryset:
+ get_queryset_attr = {"warehouse_queryset": warehouse_queryset,
+ "get_queryset": True}
+
+ if get_queryset_attr:
+ queries = operation_serialization(**get_queryset_attr)
+ queries.update(cr_serialization(**get_queryset_attr))
+ queries.update(find_serialization(**get_queryset_attr))
+ queries.update(warehouse_serialization(**get_queryset_attr))
+ q = None
+ for model, attr in (
+ ("Operation", "operations"),
+ ("ArchaeologicalSite", "sites"),
+ ("ContextRecord", "context_records"),
+ ("Find", "finds"),
+ ("Warehouse", "warehouses"),
+ ("Container", "containers")):
+ values = queries[model].values_list("id", flat=True)
+ base_q = Q(**{attr + "__id__in": values})
+ if not q:
+ q = base_q
+ else:
+ q |= base_q
+ result_queryset["Document"] = models.Document.objects.filter(q)
+
+ result = generic_get_results([models.Document], "documents",
+ result_queryset=result_queryset)
+ media_archive = None
+ if archive:
+ media_archive = generic_archive_files([models.Document],
+ result_queryset=result_queryset)
+ full_archive = archive_serialization(
+ result, archive_dir="documents", archive=archive,
+ return_empty_types=return_empty_types, archive_name=archive_name)
+ if not media_archive:
+ return full_archive
+ with ZipFile(full_archive, 'a') as current_zip:
+ current_zip.write(media_archive, arcname="media.zip")
+ return full_archive
+
+
def restore_serialized(archive_name, delete_existing=False):
with zipfile.ZipFile(archive_name, "r") as zip_file:
# check version
diff --git a/ishtar_common/serializers_utils.py b/ishtar_common/serializers_utils.py
new file mode 100644
index 000000000..4bd655269
--- /dev/null
+++ b/ishtar_common/serializers_utils.py
@@ -0,0 +1,203 @@
+from collections import OrderedDict
+from copy import deepcopy
+import datetime
+import json
+import importlib
+import os
+import tempfile
+from zipfile import ZipFile
+
+from django.contrib.sites.models import Site
+from django.core.serializers import serialize
+
+from version import get_version
+from . import models
+
+
+SERIALIZATION_VERSION = "1.0"
+
+
+def get_model_from_filename(filename):
+ filename = filename.split(".")[0] # remove extension
+ module_name, model_name = filename.split("__")
+ module = importlib.import_module(module_name + ".models")
+ return getattr(module, model_name)
+
+
+def serialization_info():
+ site = Site.objects.get_current()
+ return {
+ "serialize-version": SERIALIZATION_VERSION,
+ "ishtar-version": get_version(),
+ "domain": site.domain,
+ "name": site.name,
+ "date": datetime.datetime.now().isoformat()
+ }
+
+
+def archive_serialization(result, archive_dir=None, archive=False,
+ return_empty_types=False, archive_name=None):
+ """
+ Serialize all types models to JSON
+ Used for import and export scripts
+
+ :param result: serialization results
+ :param archive_dir: directory inside the archive (default None)
+ :param return_empty_types: if True instead of serialization return empty
+ types (default False)
+ :param archive: if True return a zip file containing all the file serialized
+ (default False)
+ :param archive_name: path to the archive if not provided a new archive is
+ created
+ :return: string containing the json serialization of types unless
+ return_empty_types or archive is set to True
+ """
+ if archive and return_empty_types:
+ raise ValueError("archive and return_empty_types are incompatible")
+ if return_empty_types:
+ return [k for k in result if not result[k]]
+ if not archive:
+ return result
+ archive_created = False
+ if not archive_name:
+ archive_created = True
+ tmpdir = tempfile.mkdtemp(prefix="ishtarexport-") + os.sep
+ archive_name = tmpdir + "ishtar-{}.zip".format(
+ datetime.date.today().strftime("%Y-%m-%d")
+ )
+ if not archive_name.endswith(".zip"):
+ archive_name += ".zip"
+ mode = "w" if archive_created else "a"
+ with tempfile.TemporaryDirectory() as tmpdirname:
+ if archive_dir:
+ os.mkdir(tmpdirname + os.sep + archive_dir)
+
+ with ZipFile(archive_name, mode) as current_zip:
+ if archive_created:
+ base_filename = "info.json"
+ filename = tmpdirname + os.sep + base_filename
+ with open(filename, "w") as json_file:
+ json_file.write(
+ json.dumps(serialization_info(), indent=2)
+ )
+ current_zip.write(filename, arcname=base_filename)
+
+ for dir_name, model_name in result:
+ base_filename = model_name + ".json"
+ filename = tmpdirname + os.sep + base_filename
+ with open(filename, "w") as json_file:
+ json_file.write(result[(dir_name, model_name)])
+ arcname = base_filename
+ if dir_name:
+ arcname = dir_name + os.sep + base_filename
+ current_zip.write(filename, arcname=arcname)
+ return archive_name
+
+
+def generic_get_results(model_list, dirname, no_geo=True,
+ result_queryset=None):
+ result = OrderedDict()
+ for model in model_list:
+ base_model_name = model.__name__
+ model_name = str(model.__module__).split(".")[0] + "__" + \
+ base_model_name
+
+ if result_queryset and base_model_name in result_queryset:
+ base_q = result_queryset[base_model_name]
+ else:
+ base_q = model.objects
+ q = base_q
+ recursion = None
+ if hasattr(model, "parent"):
+ recursion = "parent"
+ elif hasattr(model, "inverse_relation"):
+ recursion = "inverse_relation"
+ elif hasattr(model, "children"):
+ recursion = "children__id"
+ if recursion:
+ q = q.filter(**{recursion + "__isnull": True})
+
+ key = (dirname, model_name)
+ result[key] = serialize(
+ "json", q.distinct().all(),
+ indent=2,
+ use_natural_foreign_keys=True, use_natural_primary_keys=True,
+ )
+
+ if recursion:
+ serialized = [item["id"] for item in q.values("id").all()]
+ recursion_in = recursion
+ if not recursion.endswith("_id"):
+ recursion_in += "_id"
+ recursion_in += "__in"
+ q = base_q.filter(**{recursion_in: serialized}
+ ).exclude(id__in=serialized)
+ while q.count():
+ v = serialize(
+ "json", q.all(), indent=2, use_natural_foreign_keys=True,
+ use_natural_primary_keys=True)
+ new_result = json.loads(result[key])
+ new_result += json.loads(v)
+ result[key] = json.dumps(new_result, indent=2)
+ serialized += [item["id"] for item in q.values("id").all()]
+ q = base_q.filter(**{recursion_in: serialized}
+ ).exclude(id__in=serialized)
+ # managed circular
+ q = base_q.exclude(id__in=serialized)
+ if q.count():
+ v = serialize(
+ "json", q.all(), indent=2, use_natural_foreign_keys=True,
+ use_natural_primary_keys=True)
+ result_to_add = json.loads(v)
+ result_cleaned = deepcopy(result_to_add)
+ for res in result_cleaned: # first add with no recursion
+ res["fields"][recursion] = None
+ new_result = json.loads(result[key])
+ new_result += result_cleaned
+ new_result += result_to_add
+ result[key] = json.dumps(new_result, indent=2)
+
+ excluded_fields = ["history_modifier", "history_creator", "imports"]
+ if hasattr(model, "SERIALIZATION_EXCLUDE"):
+ excluded_fields = list(model.SERIALIZATION_EXCLUDE)
+ if no_geo:
+ excluded_fields += ["center", "limit"] + [
+ field.name for field in models.GeoItem._meta.get_fields()
+ ]
+ if excluded_fields:
+ new_result = json.loads(result[key])
+ for idx in range(len(new_result)):
+ for excluded_field in excluded_fields:
+ if excluded_field in new_result[idx]["fields"]:
+ new_result[idx]["fields"].pop(excluded_field)
+ result[key] = json.dumps(new_result, indent=2)
+ return result
+
+
+def generic_archive_files(model_list, archive_name=None, result_queryset=None):
+ if not result_queryset:
+ result_queryset = {}
+ result = []
+ for model in model_list:
+ if model.__name__ in result_queryset.keys():
+ query = result_queryset[model.__name__]
+ else:
+ query = model.objects
+ if hasattr(model, "SERIALIZATION_FILES"):
+ for item in query.all():
+ for attr in model.SERIALIZATION_FILES:
+ media = getattr(item, attr)
+ result.append((media.path, media.name))
+
+ archive_created = False
+ if not archive_name:
+ archive_created = True
+ tmpdir = tempfile.mkdtemp(prefix="ishtarexport-") + os.sep
+ archive_name = tmpdir + "media.zip"
+ if not archive_name.endswith(".zip"):
+ archive_name += ".zip"
+ mode = "w" if archive_created else "a"
+ with ZipFile(archive_name, mode) as current_zip:
+ for media_path, name in result:
+ current_zip.write(media_path, arcname=name)
+ return archive_name
diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py
index 5fbc8b875..96737c765 100644
--- a/ishtar_common/tests.py
+++ b/ishtar_common/tests.py
@@ -51,10 +51,12 @@ from ishtar_common import models
from ishtar_common import views
from ishtar_common.apps import admin_site
from ishtar_common.serializers import type_serialization, \
- SERIALIZATION_VERSION, serialization_info, \
+ SERIALIZATION_VERSION, \
restore_serialized, conf_serialization, CONF_MODEL_LIST, \
importer_serialization, IMPORT_MODEL_LIST, geo_serialization, \
- GEO_MODEL_LIST, directory_serialization, DIRECTORY_MODEL_LIST
+ GEO_MODEL_LIST, directory_serialization, DIRECTORY_MODEL_LIST, \
+ document_serialization
+from ishtar_common.serializers_utils import serialization_info
from ishtar_common.utils import post_save_geo, update_data, move_dict_data, \
rename_and_simplify_media_name, try_fix_file
@@ -599,6 +601,19 @@ class CacheTest(TestCase):
class GenericSerializationTest:
+ def create_document_default(self):
+ image_path = os.path.join(settings.ROOT_PATH, "..", "ishtar_common",
+ "tests", "test.png")
+ self.documents = []
+ for idx in range(12):
+ self.documents.append(models.Document.objects.create(
+ title="Test{}".format(idx),
+ associated_file=SimpleUploadedFile(
+ 'test.txt', b'no real content'),
+ image=SimpleUploadedFile(
+ name='test.png', content=open(image_path, 'rb').read(),
+ content_type='image/png')))
+
def generic_serialization_test(self, serialize, no_test=False, kwargs=None):
if not kwargs:
kwargs = {}
@@ -748,6 +763,188 @@ class SerializationTest(GenericSerializationTest, TestCase):
self.create_directory_default()
self.generic_serialization_test(directory_serialization)
+ def create_document_default(self):
+ super(SerializationTest, self).create_document_default()
+ from archaeological_operations.models import Operation, \
+ ArchaeologicalSite, OperationType
+ from archaeological_context_records.models import ContextRecord
+ from archaeological_finds.models import Find, BaseFind
+ from archaeological_warehouse.models import Warehouse, Container, \
+ ContainerLocalisation, WarehouseDivision, WarehouseDivisionLink, \
+ WarehouseType, ContainerType
+
+ operation_type = OperationType.objects.all()[0]
+ dct = {'year': 2010, 'operation_type_id': operation_type.pk,
+ "code_patriarche": "66666"}
+ operation1 = Operation.objects.create(**dct)
+ operation1.documents.add(self.documents[0])
+ dct["code_patriarche"] = "66667"
+ operation2 = Operation.objects.create(**dct)
+ operation2.documents.add(self.documents[1])
+
+ site1 = ArchaeologicalSite.objects.create(reference="3333", name="test")
+ operation1.archaeological_sites.add(site1)
+ site1.documents.add(self.documents[2])
+ site2 = ArchaeologicalSite.objects.create(reference="444", name="test2")
+ operation2.archaeological_sites.add(site2)
+ site2.documents.add(self.documents[3])
+
+ dct = {'label': "Context record1", "operation": operation1}
+ cr1 = ContextRecord.objects.create(**dct)
+ cr1.documents.add(self.documents[4])
+ dct = {'label': "Context record2", "operation": operation2}
+ cr2 = ContextRecord.objects.create(**dct)
+ cr2.documents.add(self.documents[5])
+
+ dct = {'label': "Base find", "context_record": cr1}
+ base_find1 = BaseFind.objects.create(**dct)
+ dct = {'label': "Base find2", "context_record": cr2}
+ base_find2 = BaseFind.objects.create(**dct)
+
+ dct = {'label': "Find1"}
+ find1 = Find.objects.create(**dct)
+ find1.documents.add(self.documents[6])
+ find1.base_finds.add(base_find1)
+ dct = {'label': "Find2"}
+ find2 = Find.objects.create(**dct)
+ find2.documents.add(self.documents[7])
+ find2.base_finds.add(base_find2)
+
+ w1 = Warehouse.objects.create(
+ name="Test1",
+ external_id="test",
+ warehouse_type=WarehouseType.objects.all()[0],
+ )
+ w1.documents.add(self.documents[8])
+ w2 = Warehouse.objects.create(
+ name="Test2",
+ external_id="test2",
+ warehouse_type=WarehouseType.objects.all()[0],
+ )
+ w2.documents.add(self.documents[9])
+ self.warehouses = [w1, w2]
+ c1 = Container.objects.create(
+ location=w1,
+ responsible=w1,
+ container_type=ContainerType.objects.all()[0],
+ reference="Réf1",
+ index=1,
+ external_id="ref1-1"
+ )
+ c1.documents.add(self.documents[10])
+ c2 = Container.objects.create(
+ location=w2,
+ responsible=w2,
+ container_type=ContainerType.objects.all()[0],
+ reference="Réf2",
+ index=2,
+ external_id="ref2-2"
+ )
+ c2.documents.add(self.documents[11])
+ find1.container = c1
+ find1.container_ref = c1
+ find1.save()
+ find2.container = c2
+ find2.container_ref = c2
+ find2.save()
+ wd1 = WarehouseDivision.objects.create(
+ label="Étagère", txt_idx="etagere"
+ )
+ wd2 = WarehouseDivision.objects.create(
+ label="Allée", txt_idx="allee"
+ )
+ wl1 = WarehouseDivisionLink.objects.create(
+ warehouse=w1,
+ division=wd1
+ )
+ wl2 = WarehouseDivisionLink.objects.create(
+ warehouse=w2,
+ division=wd2
+ )
+ ContainerLocalisation.objects.create(
+ container=c1,
+ division=wl1,
+ reference="A1"
+ )
+ ContainerLocalisation.objects.create(
+ container=c2,
+ division=wl2,
+ reference="A2"
+ )
+
+ def test_base_document_serialization(self):
+ self.create_document_default()
+ self.generic_serialization_test(document_serialization)
+
+ def test_document_serialization(self):
+ self.create_document_default()
+ res = self.generic_serialization_test(
+ document_serialization)
+ docs = json.loads(
+ res[('documents', 'ishtar_common__Document')]
+ )
+ self.assertEqual(len(docs), 12)
+
+ from archaeological_operations.models import Operation, \
+ ArchaeologicalSite
+ result_queryset = Operation.objects.filter(
+ code_patriarche="66666")
+ res = self.generic_serialization_test(
+ document_serialization, no_test=True,
+ kwargs={"operation_queryset": result_queryset}
+ )
+ docs = json.loads(
+ res[('documents', 'ishtar_common__Document')]
+ )
+ self.assertEqual(len(docs), 6)
+
+ result_queryset = ArchaeologicalSite.objects.filter(
+ id=ArchaeologicalSite.objects.all()[0].id)
+ res = self.generic_serialization_test(
+ document_serialization, no_test=True,
+ kwargs={"site_queryset": result_queryset}
+ )
+ docs = json.loads(
+ res[('documents', 'ishtar_common__Document')]
+ )
+ self.assertEqual(len(docs), 6)
+
+ from archaeological_context_records.models import ContextRecord
+ result_queryset = ContextRecord.objects.filter(
+ id=ContextRecord.objects.all()[0].id)
+ res = self.generic_serialization_test(
+ document_serialization, no_test=True,
+ kwargs={"cr_queryset": result_queryset}
+ )
+ docs = json.loads(
+ res[('documents', 'ishtar_common__Document')]
+ )
+ self.assertEqual(len(docs), 6)
+
+ from archaeological_finds.models import Find
+ result_queryset = Find.objects.filter(
+ id=Find.objects.all()[0].id)
+ res = self.generic_serialization_test(
+ document_serialization, no_test=True,
+ kwargs={"find_queryset": result_queryset}
+ )
+ docs = json.loads(
+ res[('documents', 'ishtar_common__Document')]
+ )
+ self.assertEqual(len(docs), 6)
+
+ from archaeological_warehouse.models import Warehouse
+ result_queryset = Warehouse.objects.filter(
+ id=Warehouse.objects.all()[0].id)
+ res = self.generic_serialization_test(
+ document_serialization, no_test=True,
+ kwargs={"warehouse_queryset": result_queryset}
+ )
+ docs = json.loads(
+ res[('documents', 'ishtar_common__Document')]
+ )
+ self.assertEqual(len(docs), 6)
+
def test_serialization_zip(self):
zip_filename = type_serialization(archive=True)
# only check the validity of the zip, the type content is tested above
@@ -845,6 +1042,13 @@ class SerializationTest(GenericSerializationTest, TestCase):
self.generic_restore_test(zip_filename, current_number,
DIRECTORY_MODEL_LIST)
+ def test_document_restore(self):
+ self.create_document_default()
+ current_number, zip_filename = self.generic_restore_test_genzip(
+ [models.Document], document_serialization)
+ self.generic_restore_test(zip_filename, current_number,
+ [models.Document])
+
class AccessControlTest(TestCase):
def test_administrator(self):