import datetime import json import importlib import os import tempfile import zipfile from rest_framework import serializers from zipfile import ZipFile from django.apps import apps from django.contrib.sites.models import Site from django.core.serializers import serialize from version import get_version from . import models class PublicSerializer(serializers.BaseSerializer): def to_representation(self, obj): return obj.public_representation() SERIALIZATION_VERSION = "1.0" def get_model_from_filename(filename): filename = filename.split(".")[0] # remove extension module_name, model_name = filename.split("__") module = importlib.import_module(module_name + ".models") return getattr(module, model_name) def serialization_info(): site = Site.objects.get_current() return { "serialize-version": SERIALIZATION_VERSION, "ishtar-version": get_version(), "domain": site.domain, "name": site.name, "date": datetime.datetime.now().isoformat() } def type_serialization(archive=False, return_empty_types=False, archive_name=None): """ Serialize all types models to JSON Used for import and export scripts :param return_empty_types: if True instead of serialization return empty types (default False) :param archive: if True return a zip file containing all the file serialized (defaukt False) :return: string containing the json serialization of types unless return_empty_types or archive is set to True """ if archive and return_empty_types: raise ValueError("archive and return_empty_types are incompatible") result = {} for model in apps.get_models(): if not isinstance(model(), models.GeneralType): continue model_name = model.__name__ model_name = str(model.__module__).split(".")[0] + "__" + model_name result[model_name] = serialize( "json", model.objects.all(), indent=2, use_natural_foreign_keys=True, use_natural_primary_keys=True ) if return_empty_types: return [k for k in result if not result[k]] if not archive: return result if not archive_name: tmpdir = tempfile.mkdtemp(prefix="ishtarexport-") + os.sep archive_name = tmpdir + "ishtar-{}.zip".format( datetime.date.today().strftime("%Y-%m-%d") ) if not archive_name.endswith(".zip"): archive_name += ".zip" with tempfile.TemporaryDirectory() as tmpdirname: os.mkdir(tmpdirname + os.sep + "types") with ZipFile(archive_name, 'w') as current_zip: base_filename = "info.json" filename = tmpdirname + os.sep + base_filename with open(filename, "w") as json_file: json_file.write( json.dumps(serialization_info(), indent=2) ) current_zip.write(filename, arcname=base_filename) for model_name in result: base_filename = model_name + ".json" filename = tmpdirname + os.sep + base_filename with open(filename, "w") as json_file: json_file.write(result[model_name]) current_zip.write(filename, arcname="types" + os.sep + base_filename) return archive_name def restore_serialized(archive_name, delete_existing=False): with zipfile.ZipFile(archive_name, "r") as zip_file: # check version info = json.loads(zip_file.read("info.json").decode("utf-8")) if info["serialize-version"] != SERIALIZATION_VERSION: raise ValueError( "This dump version: {} is not managed by this Ishtar " "installation".format(info["serialize-version"]) ) # restore types for json_filename in zip_file.namelist(): path = json_filename.split(os.sep) if len(path) != 2 or path[0] != "types": continue model = get_model_from_filename(path[-1]) if delete_existing: model.objects.all().delete()