summaryrefslogtreecommitdiff
path: root/ishtar_common
diff options
context:
space:
mode:
authorÉtienne Loks <etienne.loks@iggdrasil.net>2019-09-02 11:56:27 +0200
committerÉtienne Loks <etienne.loks@iggdrasil.net>2019-09-02 13:06:36 +0200
commit9d82b632716c190746623f851beed1a67e027abf (patch)
treec84d62588fedd9bd1e4023b8a4c9f1f08161c048 /ishtar_common
parent80152f2f2692fd576b55ab7e6adb3292770bbc68 (diff)
downloadIshtar-9d82b632716c190746623f851beed1a67e027abf.tar.bz2
Ishtar-9d82b632716c190746623f851beed1a67e027abf.zip
Serialization: manage medias, manage model order (serialization and restore), serialize conf
Diffstat (limited to 'ishtar_common')
-rw-r--r--ishtar_common/models.py81
-rw-r--r--ishtar_common/serializers.py130
-rw-r--r--ishtar_common/tests.py72
3 files changed, 251 insertions, 32 deletions
diff --git a/ishtar_common/models.py b/ishtar_common/models.py
index 35c35e97b..8ee72e7b3 100644
--- a/ishtar_common/models.py
+++ b/ishtar_common/models.py
@@ -1256,15 +1256,27 @@ class RelationItem(models.Model):
render_below=render_below, full=full)
+class JsonDataSectionManager(models.Manager):
+ def get_by_natural_key(self, name, app_label, model):
+ return self.get(name=name,
+ content_type__app_label=app_label,
+ content_type__model=model)
+
+
class JsonDataSection(models.Model):
content_type = models.ForeignKey(ContentType)
name = models.CharField(_("Name"), max_length=200)
order = models.IntegerField(_("Order"), default=10)
+ objects = JsonDataSectionManager()
class Meta:
verbose_name = _("Json data - Menu")
verbose_name_plural = _("Json data - Menus")
ordering = ['order', 'name']
+ unique_together = ("name", "content_type")
+
+ def natural_key(self):
+ return (self.name, self.content_type.app_label, self.content_type.model)
def __str__(self):
return "{} - {}".format(self.content_type, self.name)
@@ -1281,6 +1293,12 @@ JSON_VALUE_TYPES = (
)
+class JsonDataFieldManager(models.Manager):
+ def get_by_natural_key(self, key, app_label, model):
+ return self.get(key=key, content_type__app_label=app_label,
+ content_type__model=model)
+
+
class JsonDataField(models.Model):
name = models.CharField(_("Name"), max_length=200)
content_type = models.ForeignKey(ContentType)
@@ -1300,11 +1318,16 @@ class JsonDataField(models.Model):
on_delete=models.SET_NULL)
custom_forms = models.ManyToManyField(
"CustomForm", blank=True, through="CustomFormJsonField")
+ objects = JsonDataFieldManager()
class Meta:
verbose_name = _("Json data - Field")
verbose_name_plural = _("Json data - Fields")
ordering = ['order', 'name']
+ unique_together = ("content_type", "key")
+
+ def natural_key(self):
+ return (self.key, self.content_type.app_label, self.content_type.model)
def __str__(self):
return "{} - {}".format(self.content_type, self.name)
@@ -2794,6 +2817,7 @@ class IshtarSiteProfile(models.Model, Cached):
help_text=_("Spatial Reference System used for display when no SRS is "
"defined")
)
+ objects = SlugModelManager()
class Meta:
verbose_name = _("Ishtar site profile")
@@ -2803,6 +2827,9 @@ class IshtarSiteProfile(models.Model, Cached):
def __str__(self):
return str(self.label)
+ def natural_key(self):
+ return (self.slug,)
+
def has_overload(self, key):
return self.config and self.config in ALTERNATE_CONFIGS and \
hasattr(ALTERNATE_CONFIGS[self.config], key)
@@ -2883,6 +2910,11 @@ post_save.connect(cached_site_changed, sender=IshtarSiteProfile)
post_delete.connect(cached_site_changed, sender=IshtarSiteProfile)
+class CustomFormManager(models.Manager):
+ def get_by_natural_key(self, name, form):
+ return self.get(name=name, form=form)
+
+
class CustomForm(models.Model):
name = models.CharField(_("Name"), max_length=250)
form = models.CharField(_("Form"), max_length=250)
@@ -2897,11 +2929,18 @@ class CustomForm(models.Model):
"user and user type is useless."))
users = models.ManyToManyField('IshtarUser', blank=True)
user_types = models.ManyToManyField('PersonType', blank=True)
+ objects = CustomFormManager()
+
+ SERIALIZATION_EXCLUDE = ("users", )
class Meta:
verbose_name = _("Custom form")
verbose_name_plural = _("Custom forms")
ordering = ['name', 'form']
+ unique_together = (('name', 'form'),)
+
+ def natural_key(self):
+ return (self.name, self.form)
def __str__(self):
return "{} - {}".format(self.name, self.form)
@@ -2987,13 +3026,40 @@ class CustomForm(models.Model):
return res
+class ExcludedFieldManager(models.Manager):
+ def get_by_natural_key(self, custom_form_name, custom_form_form,
+ field):
+ return self.get(custom_form__name=custom_form_name,
+ custom_form__form=custom_form_form,
+ field=field)
+
+
class ExcludedField(models.Model):
custom_form = models.ForeignKey(CustomForm, related_name='excluded_fields')
field = models.CharField(_("Field"), max_length=250)
+ objects = ExcludedFieldManager()
class Meta:
verbose_name = _("Excluded field")
verbose_name_plural = _("Excluded fields")
+ unique_together = ("custom_form", "field")
+
+ def natural_key(self):
+ return (self.custom_form.name , self.custom_form.form,
+ self.field)
+
+
+class CustomFormJsonFieldManager(models.Manager):
+ def get_by_natural_key(self, custom_form_name, custom_form_form,
+ json_field_key, json_field_app_label,
+ json_field_model):
+ return self.get(
+ custom_form__name=custom_form_name,
+ custom_form__form=custom_form_form,
+ json_field__key=json_field_key,
+ json_field__content_type__app_label=json_field_app_label,
+ json_field__content_type__model=json_field_model
+ )
class CustomFormJsonField(models.Model):
@@ -3004,10 +3070,19 @@ class CustomFormJsonField(models.Model):
default='')
order = models.IntegerField(verbose_name=_("Order"), default=1)
help_text = models.TextField(_("Help"), blank=True, null=True)
+ objects = CustomFormJsonFieldManager()
class Meta:
verbose_name = _("Custom form - Json data field")
verbose_name_plural = _("Custom form - Json data fields")
+ unique_together = ("custom_form", "json_field")
+
+ def natural_key(self):
+ return (
+ self.custom_form.name, self.custom_form.form,
+ self.json_field.key, self.json_field.content_type.app_label,
+ self.json_field.content_type.model
+ )
class GlobalVar(models.Model, Cached):
@@ -3015,16 +3090,21 @@ class GlobalVar(models.Model, Cached):
description = models.TextField(_("Description of the variable"),
null=True, blank=True)
value = models.TextField(_("Value"), null=True, blank=True)
+ objects = SlugModelManager()
class Meta:
verbose_name = _("Global variable")
verbose_name_plural = _("Global variables")
ordering = ['slug']
+ def natural_key(self):
+ return (self.slug,)
+
def __str__(self):
return str(self.slug)
+
def cached_globalvar_changed(sender, **kwargs):
if not kwargs['instance']:
return
@@ -3328,6 +3408,7 @@ class DocumentTemplate(models.Model):
help_text=_("Only relevant for label template")
)
objects = SlugModelManager()
+ SERIALIZATION_FILES = ("template", )
class Meta:
verbose_name = _("Document template")
diff --git a/ishtar_common/serializers.py b/ishtar_common/serializers.py
index 187686321..e9b904d6f 100644
--- a/ishtar_common/serializers.py
+++ b/ishtar_common/serializers.py
@@ -1,3 +1,4 @@
+from collections import OrderedDict
from copy import deepcopy
import datetime
import json
@@ -9,6 +10,7 @@ from rest_framework import serializers
from zipfile import ZipFile
from django.apps import apps
+from django.conf import settings
from django.contrib.sites.models import Site
from django.core.serializers import deserialize, serialize
@@ -72,11 +74,12 @@ def archive_serialization(result, archive_dir=None, archive=False,
)
if not archive_name.endswith(".zip"):
archive_name += ".zip"
+ mode = "w" if archive_created else "a"
with tempfile.TemporaryDirectory() as tmpdirname:
if archive_dir:
os.mkdir(tmpdirname + os.sep + archive_dir)
- with ZipFile(archive_name, 'w') as current_zip:
+ with ZipFile(archive_name, mode) as current_zip:
if archive_created:
base_filename = "info.json"
filename = tmpdirname + os.sep + base_filename
@@ -86,19 +89,21 @@ def archive_serialization(result, archive_dir=None, archive=False,
)
current_zip.write(filename, arcname=base_filename)
- for model_name in result:
+ for dir_name, model_name in result:
base_filename = model_name + ".json"
filename = tmpdirname + os.sep + base_filename
with open(filename, "w") as json_file:
- json_file.write(result[model_name])
- current_zip.write(filename,
- arcname="types" + os.sep + base_filename)
+ json_file.write(result[(dir_name, model_name)])
+ arcname = base_filename
+ if dir_name:
+ arcname = dir_name + os.sep + base_filename
+ current_zip.write(filename, arcname=arcname)
return archive_name
def type_serialization(archive=False, return_empty_types=False,
archive_name=None):
- result = {}
+ result = OrderedDict()
for model in apps.get_models():
if not isinstance(model(), models.GeneralType):
continue
@@ -113,9 +118,9 @@ def type_serialization(archive=False, return_empty_types=False,
recursion = "inverse_relation"
if recursion:
q = q.filter(**{recursion + "__isnull": True})
- result[model_name] = serialize(
- "json", q.all(),
- indent=2,
+ key = ("types", model_name)
+ result[key] = serialize(
+ "json", q.all(), indent=2,
use_natural_foreign_keys=True, use_natural_primary_keys=True
)
if recursion:
@@ -126,9 +131,9 @@ def type_serialization(archive=False, return_empty_types=False,
v = serialize(
"json", q.all(), indent=2, use_natural_foreign_keys=True,
use_natural_primary_keys=True)
- new_result = json.loads(result[model_name])
+ new_result = json.loads(result[key])
new_result += json.loads(v)
- result[model_name] = json.dumps(new_result, indent=2)
+ result[key] = json.dumps(new_result, indent=2)
serialized += [item["id"] for item in q.values("id").all()]
q = base_q.filter(**{recursion + "_id__in": serialized}
).exclude(id__in=serialized)
@@ -142,15 +147,78 @@ def type_serialization(archive=False, return_empty_types=False,
result_cleaned = deepcopy(result_to_add)
for res in result_cleaned: # first add with no recursion
res["fields"][recursion] = None
- new_result = json.loads(result[model_name])
+ new_result = json.loads(result[key])
new_result += result_cleaned
new_result += result_to_add
- result[model_name] = json.dumps(new_result, indent=2)
+ result[key] = json.dumps(new_result, indent=2)
return archive_serialization(result, archive_dir="types", archive=archive,
return_empty_types=return_empty_types,
archive_name=archive_name)
+def generic_get_results(model_list, dirname):
+ result = OrderedDict()
+ for model in model_list:
+ model_name = model.__name__
+ model_name = str(model.__module__).split(".")[0] + "__" + model_name
+ key = (dirname, model_name)
+ result[key] = serialize(
+ "json", model.objects.all(),
+ indent=2,
+ use_natural_foreign_keys=True, use_natural_primary_keys=True,
+ )
+ if hasattr(model, "SERIALIZATION_EXCLUDE"):
+ new_result = json.loads(result[key])
+ for idx in range(len(new_result)):
+ for excluded_field in model.SERIALIZATION_EXCLUDE:
+ new_result[idx]["fields"].pop(excluded_field)
+ result[key] = json.dumps(new_result)
+ return result
+
+
+def generic_archive_files(model_list, archive_name=None):
+ result = []
+ for model in model_list:
+ if hasattr(model, "SERIALIZATION_FILES"):
+ for item in model.objects.all():
+ for attr in model.SERIALIZATION_FILES:
+ media = getattr(item, attr)
+ result.append((media.path, media.name))
+
+ archive_created = False
+ if not archive_name:
+ archive_created = True
+ tmpdir = tempfile.mkdtemp(prefix="ishtarexport-") + os.sep
+ archive_name = tmpdir + "media.zip"
+ if not archive_name.endswith(".zip"):
+ archive_name += ".zip"
+ mode = "w" if archive_created else "a"
+ with ZipFile(archive_name, mode) as current_zip:
+ for media_path, name in result:
+ current_zip.write(media_path, arcname=name)
+ return archive_name
+
+
+CONF_MODEL_LIST = [
+ models.IshtarSiteProfile, models.GlobalVar, models.CustomForm,
+ models.ExcludedField, models.JsonDataSection, models.JsonDataField,
+ models.CustomFormJsonField, models.ImporterModel,
+ models.DocumentTemplate
+]
+
+
+def conf_serialization(archive=False, return_empty_types=False,
+ archive_name=None):
+ media_archive = generic_archive_files(CONF_MODEL_LIST)
+ result = generic_get_results(CONF_MODEL_LIST, "common_conf")
+ full_archive = archive_serialization(
+ result, archive_dir="common_conf", archive=archive,
+ return_empty_types=return_empty_types, archive_name=archive_name)
+ with ZipFile(full_archive, 'a') as current_zip:
+ current_zip.write(media_archive, arcname="media.zip")
+ return full_archive
+
+
def restore_serialized(archive_name, delete_existing=False):
with zipfile.ZipFile(archive_name, "r") as zip_file:
# check version
@@ -161,14 +229,28 @@ def restore_serialized(archive_name, delete_existing=False):
"installation".format(info["serialize-version"])
)
- # restore types
- for json_filename in zip_file.namelist():
- path = json_filename.split(os.sep)
- if len(path) != 2 or path[0] != "types":
- continue
- if delete_existing:
- model = get_model_from_filename(path[-1])
- model.objects.all().delete()
- data = zip_file.read(json_filename).decode("utf-8")
- for obj in deserialize("json", data):
- obj.save()
+ DIRS = (
+ ("types", [None]), ("common_conf", CONF_MODEL_LIST)
+ )
+ namelist = zip_file.namelist()
+ for current_dir, model_list in DIRS:
+ for current_model in model_list:
+ for json_filename in namelist:
+ path = json_filename.split(os.sep)
+ if len(path) != 2 or path[0] != current_dir:
+ continue
+ model = get_model_from_filename(path[-1])
+ if current_model and current_model != model:
+ continue
+ if delete_existing:
+ model.objects.all().delete()
+ data = zip_file.read(json_filename).decode("utf-8")
+ for obj in deserialize("json", data):
+ obj.save()
+ # restore media
+ if "media.zip" in namelist:
+ with tempfile.TemporaryDirectory() as tmp_dir_name:
+ zip_file.extract("media.zip", tmp_dir_name)
+ with zipfile.ZipFile(
+ tmp_dir_name + os.sep + "media.zip", 'r') as media_zip:
+ media_zip.extractall(settings.MEDIA_ROOT) \ No newline at end of file
diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py
index 7e1948bf0..0e3832e24 100644
--- a/ishtar_common/tests.py
+++ b/ishtar_common/tests.py
@@ -37,6 +37,7 @@ from django.contrib.contenttypes.models import ContentType
from django.core.cache import cache
from django.core.exceptions import ValidationError
from django.core.files import File as DjangoFile
+from django.core.files.uploadedfile import SimpleUploadedFile
from django.core.management import call_command
from django.core.urlresolvers import reverse
from django.db.models.fields import BooleanField
@@ -50,8 +51,8 @@ from ishtar_common import models
from ishtar_common import views
from ishtar_common.apps import admin_site
from ishtar_common.serializers import type_serialization, \
- SERIALIZATION_VERSION, get_model_from_filename, serialization_info, \
- restore_serialized
+ SERIALIZATION_VERSION, serialization_info, \
+ restore_serialized, conf_serialization, CONF_MODEL_LIST
from ishtar_common.utils import post_save_geo, update_data, move_dict_data, \
rename_and_simplify_media_name, try_fix_file
@@ -598,22 +599,58 @@ class CacheTest(TestCase):
class SerializationTest(TestCase):
fixtures = COMMON_FIXTURES + WAREHOUSE_FIXTURES
- def test_type_serialization(self):
- json_result = type_serialization()
- for k in json_result.keys():
+ def generic_serialization_test(self, serialize):
+ json_result = serialize()
+ for key in json_result.keys():
+ __, k = key
module_name, model_name = k.split("__")
module = importlib.import_module(module_name + ".models")
model = getattr(module, model_name)
current_count = model.objects.count()
- serialization_count = len(json.loads(json_result[k]))
+ serialization_count = len(json.loads(json_result[key]))
# has to be at least equal (can be superior for model with
- # recursivity)
+ # recursion)
self.assertTrue(
serialization_count >= current_count,
msg="Serialization for model {}.{} failed. {} serialized {} "
"expected".format(module.__name__, model_name,
serialization_count, current_count))
+ def test_type_serialization(self):
+ self.generic_serialization_test(type_serialization)
+
+ def create_default_conf(self):
+ values = {}
+ models.get_current_profile() # create a default profile
+ models.GlobalVar.objects.create(slug="test")
+ cform = models.CustomForm.objects.create(
+ name="Test", form='ishtar_common.forms.TestForm')
+ models.ExcludedField.objects.create(custom_form=cform,
+ field="ExcludedField")
+ CT = ContentType.objects.get_for_model(models.OrganizationType)
+ models.JsonDataSection.objects.create(
+ content_type=CT,
+ name="Test",
+ )
+ JF = models.JsonDataField.objects.create(
+ name="Test", content_type=CT, key="test"
+ )
+ models.CustomFormJsonField.objects.create(
+ custom_form=cform, json_field=JF, label="test"
+ )
+ mod = models.ImporterModel.objects.get(
+ klass="ishtar_common.models.Organization"
+ )
+ values["document_template"] = models.DocumentTemplate.objects.create(
+ name="Test", slug="test", associated_model=mod,
+ template=SimpleUploadedFile('test.txt', b'no real content')
+ )
+ return values
+
+ def test_conf_serialization(self):
+ self.create_default_conf()
+ self.generic_serialization_test(conf_serialization)
+
def test_serialization_zip(self):
zip_filename = type_serialization(archive=True)
# only check the validity of the zip, the type content is tested above
@@ -633,7 +670,6 @@ class SerializationTest(TestCase):
info = serialization_info()
info["serialize-version"] = "-42"
json_file.write(json.dumps(info, indent=2))
-
zip_file.write(filename, arcname=base_filename)
with self.assertRaises(ValueError):
restore_serialized(zip_filename)
@@ -677,6 +713,26 @@ class SerializationTest(TestCase):
self.assertTrue(OperationRT.objects.filter(
inverse_relation__isnull=False).count())
+ def test_conf_restore(self):
+ values = self.create_default_conf()
+ current_number = {}
+ for model in CONF_MODEL_LIST:
+ current_number[model.__name__] = model.objects.count()
+ zip_filename = conf_serialization(archive=True)
+ os.remove(values["document_template"].template.path)
+
+ restore_serialized(zip_filename, delete_existing=True)
+ for model in CONF_MODEL_LIST:
+ previous_nb = current_number[model.__name__]
+ current_nb = model.objects.count()
+ self.assertEqual(
+ previous_nb, current_nb,
+ msg="Restore for model {} failed. Initial: {}, restored: "
+ "{}.".format(model.__name__, previous_nb, current_nb))
+ self.assertTrue(
+ os.path.isfile(values["document_template"].template.path)
+ )
+
class AccessControlTest(TestCase):
def test_administrator(self):