diff options
-rw-r--r-- | ishtar_common/serializers.py | 58 | ||||
-rw-r--r-- | ishtar_common/tests.py | 25 |
2 files changed, 83 insertions, 0 deletions
diff --git a/ishtar_common/serializers.py b/ishtar_common/serializers.py index d7d998e96..31caa8cb7 100644 --- a/ishtar_common/serializers.py +++ b/ishtar_common/serializers.py @@ -1,6 +1,64 @@ +import datetime +import os +import tempfile from rest_framework import serializers +from zipfile import ZipFile + +from django.apps import apps +from django.core.serializers import serialize + +from . import models class PublicSerializer(serializers.BaseSerializer): def to_representation(self, obj): return obj.public_representation() + + +def type_model_serialization(archive=False, return_empty_types=False, + archive_name=None): + """ + Serialize all types models to JSON + Used for import and export scripts + + :param return_empty_types: if True instead of serialization return empty + types (default False) + :param archive: if True return a zip file containing all the file serialized + (defaukt False) + :return: string containing the json serialization of types unless + return_empty_types or archive is set to True + """ + if archive and return_empty_types: + raise ValueError("archive and return_empty_types are incompatible") + result = {} + for model in apps.get_models(): + if not isinstance(model(), models.GeneralType): + continue + model_name = model.__name__ + model_name = str(model.__module__).split(".")[0] + "__" + model_name + result[model_name] = serialize( + "json", model.objects.all(), + indent=2, + use_natural_foreign_keys=True, use_natural_primary_keys=True + ) + if return_empty_types: + return [k for k in result if not result[k]] + if not archive: + return result + if not archive_name: + tmpdir = tempfile.mkdtemp(prefix="ishtarexport-") + os.sep + archive_name = tmpdir + "ishtar-{}.zip".format( + datetime.date.today().strftime("%Y-%m-%d") + ) + if not archive_name.endswith(".zip"): + archive_name += ".zip" + with tempfile.TemporaryDirectory() as tmpdirname: + with ZipFile(archive_name, 'w') as current_zip: + for model_name in result: + base_filename = model_name + ".json" + filename = tmpdirname + os.sep + base_filename + with open(filename, "w") as json_file: + json_file.write(result[model_name]) + current_zip.write(filename, arcname=base_filename) + return archive_name + diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py index b056359ec..3289ad8f6 100644 --- a/ishtar_common/tests.py +++ b/ishtar_common/tests.py @@ -20,11 +20,13 @@ from bs4 import BeautifulSoup as Soup import csv import datetime +import importlib import io import json import os import shutil from io import StringIO +import zipfile from django.apps import apps @@ -46,6 +48,7 @@ from django.test.runner import DiscoverRunner from ishtar_common import models from ishtar_common import views from ishtar_common.apps import admin_site +from ishtar_common.serializers import type_model_serialization from ishtar_common.utils import post_save_geo, update_data, move_dict_data, \ rename_and_simplify_media_name, try_fix_file @@ -544,6 +547,28 @@ class CacheTest(TestCase): self.assertIn(b'href="/operation_modification/', response.content) +class SerializationTest(TestCase): + fixtures = [settings.ROOT_PATH + + '../fixtures/initial_data-auth-fr.json', + settings.ROOT_PATH + + '../ishtar_common/fixtures/initial_data-fr.json',] + + def test_type_serialization(self): + json_result = type_model_serialization() + for k in json_result.keys(): + module_name, model_name = k.split("__") + module = importlib.import_module(module_name + ".models") + model = getattr(module, model_name) + self.assertEqual(model.objects.count(), + len(json.loads(json_result[k]))) + + def test_type_serialization_zip(self): + # only check the validity of the zip, the content is tested above + zip_filename = type_model_serialization(archive=True) + with zipfile.ZipFile(zip_filename, "r") as zip_file: + self.assertIsNone(zip_file.testzip()) + + class AccessControlTest(TestCase): def test_administrator(self): admin, created = models.PersonType.objects.get_or_create( |