#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2017 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import csv import datetime import sys import fiona from fiona import crs as fiona_crs import json from osgeo import ogr import os import logging from pyproj import CRS import shutil import re import tempfile import zipfile from django.apps import apps from django.conf import settings from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.contenttypes.models import ContentType from django.contrib.gis.db import models from django.contrib.gis.gdal.error import GDALException from django.contrib.gis.geos import GEOSGeometry from django.contrib.gis.geos.error import GEOSException from django.core.exceptions import ValidationError, SuspiciousOperation from django.core.files import File from django.core.files.base import ContentFile from django.core.validators import validate_comma_separated_integer_list, MinValueValidator from django.db.models.base import ModelBase from django.db.models.signals import pre_delete from django.template.defaultfilters import slugify from django.utils.functional import cached_property from django.utils.translation import ugettext_lazy as _, pgettext_lazy UnoCalc = None ITALIC = None if settings.USE_LIBREOFFICE: try: from ishtar_common.libreoffice import UnoCalc from com.sun.star.awt.FontSlant import ITALIC except ImportError: pass from ishtar_common.model_managers import SlugModelManager from ishtar_common.utils import ( create_slug, get_all_related_m2m_objects_with_model, put_session_message, put_session_var, get_session_var, num2col, max_size_help, import_class, reverse_coordinates, ) from ishtar_common.data_importer import ( Importer, ImportFormater, IntegerFormater, FloatFormater, UnicodeFormater, DateFormater, TypeFormater, YearFormater, StrToBoolean, FileFormater, InseeFormater, ImporterError, UpperCaseFormater, LowerCaseFormater, ) from ishtar_common.utils import task from ishtar_common.ignf_utils import IGNF logger = logging.getLogger(__name__) def get_model_fields(model): """ Return a dict of fields from model """ fields = {} if not model: return fields for field in model._meta.get_fields(): fields[field.name] = field return fields class ImportModelManager(models.Manager): def get_by_natural_key(self, klass): return self.get(klass=klass) class ImporterModel(models.Model): name = models.CharField(_("Name"), max_length=200) klass = models.CharField(_("Class name"), max_length=200, unique=True) objects = ImportModelManager() class Meta: verbose_name = _("Data model") verbose_name_plural = _("Data models") ordering = ("name",) ADMIN_SECTION = _("General settings") def __str__(self): return self.name def natural_key(self): return (self.klass,) IMPORT_TYPES = ( ("tab", _("Table")), ("gis", _("GIS")), ) IMPORT_TYPES_DICT = dict(IMPORT_TYPES) class ImporterType(models.Model): """ Description of a table to be mapped with ishtar database """ name = models.CharField(_("Name"), max_length=200) slug = models.SlugField(_("Slug"), unique=True, max_length=100) description = models.CharField( _("Description"), blank=True, null=True, max_length=500 ) type = models.CharField( _("Type"), max_length=3, choices=IMPORT_TYPES, default="tab" ) tab_number = models.PositiveIntegerField( _("Tab number"), default=1, validators=[MinValueValidator(1)], help_text=_("When using an Excel or Calc file choose the tab number. Keep it to 1 by default.") ) layer_name = models.CharField( _("Layer name"), max_length=200, blank=True, null=True, help_text=_("For GIS file with multiple layers"), ) is_main_geometry = models.BooleanField( _("Set as main geometry"), default=True, help_text=_("Only relevant for GIS files"), ) users = models.ManyToManyField("IshtarUser", verbose_name=_("Users"), blank=True) associated_models = models.ForeignKey( ImporterModel, verbose_name=_("Associated model"), on_delete=models.SET_NULL, related_name="importer_type_associated", blank=True, null=True, ) created_models = models.ManyToManyField( ImporterModel, verbose_name=_("Models that can accept new items"), blank=True, help_text=_("Leave blank for no restrictions"), related_name="importer_type_created", ) is_template = models.BooleanField(_("Can be exported"), default=False) is_import = models.BooleanField(_("Can be import"), default=True) archive_required = models.BooleanField(_("Archive required"), default=False) unicity_keys = models.CharField( _('Unicity keys (separator ";")'), blank=True, null=True, max_length=500, help_text=_("Mandatory for update importer. Set to key that identify items " "without ambiguity. Warning: __ is not supported, only use level " "1 key.") ) available = models.BooleanField(_("Available"), default=True) objects = SlugModelManager() SERIALIZATION_EXCLUDE = ["users"] class Meta: verbose_name = _("Importer - Type") verbose_name_plural = _("Importer - Types") ordering = ("name",) ADMIN_SECTION = _("Imports") def natural_key(self): return (self.slug,) def __str__(self): return self.name @property def type_label(self): if self.type in IMPORT_TYPES_DICT: return IMPORT_TYPES_DICT[str(self.type)] return "" def get_libreoffice_template(self): if not UnoCalc: return ROW_NUMBER = 500 uno = UnoCalc() calc = uno.create_calc() main_sheet = uno.get_or_create_sheet(calc, 0, self.name) lst_sheet = uno.get_or_create_sheet(calc, 1, str(_("List types"))) if not calc: return col_number = 1 # user number so we start with 1 lst_col_number = 0 for column in self.columns.order_by("col_number").all(): while column.col_number > col_number: col_number += 1 # header cell = main_sheet.getCellByPosition(col_number - 1, 0) cell.CharWeight = 150 cell.setString(column.label) cell = main_sheet.getCellByPosition(col_number - 1, 1) cell.CharPosture = ITALIC cell.setString(column.description) # only managing the first target... ft = None for target in column.targets.all(): ft = target.formater_type if ft: break if not ft: continue # first we only manage TypeFormater if ft.formater_type != "TypeFormater": continue if not ft.options: # bad configuration continue model = import_class(ft.options) if not model: continue lst = [] for typ in model.get_types(instances=True): lst.append(str(typ)) end_row = uno.create_list( lst_sheet, lst_col_number, 0, str(model._meta.verbose_name), lst ) uno.set_cell_validation_list( main_sheet, col_number, 2, ROW_NUMBER + 2, lst_sheet, lst_col_number, [1, end_row], ) lst_col_number += 1 tmpdir = tempfile.mkdtemp(prefix="ishtar-templates-") dest_filename = "{}{}{}.ods".format(tmpdir, os.sep, self.name) uno.save_calc(calc, dest_filename) return dest_filename def get_importer_class(self, import_instance=None): OBJECT_CLS = import_class(self.associated_models.klass) DEFAULTS = dict( [(default.keys, default.values) for default in self.defaults.all()] ) LINE_FORMAT = [] LINE_EXPORT_FORMAT = [] idx = 0 for column in self.columns.order_by("col_number").all(): idx += 1 while column.col_number > idx: LINE_FORMAT.append(None) LINE_EXPORT_FORMAT.append(None) idx += 1 targets = [] formater_types = [] nb = column.targets.count() if not nb: LINE_FORMAT.append(None) if column.export_field_name: LINE_EXPORT_FORMAT.append( ImportFormater(column.export_field_name, label=column.label) ) continue force_news = [] concat_str = [] concat = [] for target in column.targets.order_by("pk").all(): ft = target.formater_type.get_formater_type( target, import_instance=import_instance ) if not ft: continue formater_types.append(ft) targets.append(target.target) concat_str.append(target.concat_str) force_news.append(target.force_new) concat.append(target.concat) formater_kwargs = {} if column.regexp_pre_filter: formater_kwargs["regexp"] = re.compile(column.regexp_pre_filter.regexp) if column.value_format: formater_kwargs["value_format"] = column.value_format.format_string formater_kwargs["concat"] = concat formater_kwargs["concat_str"] = concat_str formater_kwargs["duplicate_fields"] = [ (field.field_name, field.force_new, field.concat, field.concat_str) for field in column.duplicate_fields.all() ] formater_kwargs["label"] = column.label formater_kwargs["required"] = column.required formater_kwargs["force_new"] = force_news formater_kwargs["comment"] = column.description if column.export_field_name: formater_kwargs["export_field_name"] = [column.export_field_name] formater = ImportFormater(targets, formater_types, **formater_kwargs) LINE_FORMAT.append(formater) LINE_EXPORT_FORMAT.append(formater) UNICITY_KEYS = [] if self.unicity_keys: UNICITY_KEYS = [un.strip() for un in self.unicity_keys.split(";")] MODEL_CREATION_LIMIT = [] for modls in self.created_models.all(): MODEL_CREATION_LIMIT.append(import_class(modls.klass)) args = { "OBJECT_CLS": OBJECT_CLS, "DESC": self.description, "DEFAULTS": DEFAULTS, "LINE_FORMAT": LINE_FORMAT, "UNICITY_KEYS": UNICITY_KEYS, "LINE_EXPORT_FORMAT": LINE_EXPORT_FORMAT, "MODEL_CREATION_LIMIT": MODEL_CREATION_LIMIT, "TYPE": self.type, "MAIN_GEO": self.is_main_geometry, } name = str( "".join( x for x in slugify(self.name).replace("-", " ").title() if not x.isspace() ) ) newclass = type(name, (Importer,), args) return newclass def get_columns(self, importer_class=None): """ :param importer_class: importer class - if not provided get from self :return: (columns: list, columns_names: list) - column attributes, column labels """ if not importer_class: importer_class = self.get_importer_class() cols, col_names = [], [] for formater in importer_class.LINE_EXPORT_FORMAT: if not formater: cols.append("") col_names.append("") continue cols.append(formater.export_field_name) col_names.append(formater.label) return cols, col_names def save(self, *args, **kwargs): if not self.slug: self.slug = create_slug(ImporterType, self.name) return super(ImporterType, self).save(*args, **kwargs) class ImporterGroup(models.Model): name = models.CharField(_("Name"), max_length=200) slug = models.SlugField(_("Slug"), unique=True, max_length=100) description = models.TextField( _("Description"), blank=True, default="" ) available = models.BooleanField(_("Available"), default=True) class Meta: verbose_name = _("Importer - Group") verbose_name_plural = _("Importer - Groups") ordering = ("name",) ADMIN_SECTION = _("Imports") objects = SlugModelManager() def natural_key(self): return self.slug, def __str__(self): return self.name @property def importer_types_label(self) -> str: return " ; ".join([imp.importer_type.name for imp in self.importer_types.all()]) class ImporterGroupImporter(models.Model): group = models.ForeignKey(ImporterGroup, on_delete=models.CASCADE, related_name="importer_types") importer_type = models.ForeignKey(ImporterType, on_delete=models.CASCADE, related_name="groups") order = models.PositiveIntegerField(_("Order"), default=10, validators=[MinValueValidator(1)]) class Meta: ordering = ("group", "order") unique_together = ("group", "order") verbose_name = _("Importer - Group <-> Importer") def get_associated_model(parent_model, keys): model = None if isinstance(parent_model, str): OBJECT_CLS = import_class(parent_model) else: OBJECT_CLS = parent_model fields = get_model_fields(OBJECT_CLS) for idx, item in enumerate(keys): if item in ("-", ""): model = None elif not idx: if item not in fields: raise ImporterError( str( _( "Importer configuration error: " '"{}" is not available for "{}".' " Check your default and column " "configuration" ) ).format(item, OBJECT_CLS.__name__) ) field = fields[item] if hasattr(field, "remote_field") and hasattr(field.remote_field, "model"): model = field.remote_field.model if type(field) == ModelBase: model = field else: if not model: raise ImporterError( str( _( "Importer configuration error: " '"{}" is not available for "{}".' " Check your default and column " "configuration" ) ).format("__".join(keys[1:]), OBJECT_CLS.__name__) ) return get_associated_model(model, keys[1:]) return model class ImporterDefaultManager(models.Manager): def get_by_natural_key(self, importer_type, target): return self.get(importer_type__slug=importer_type, target=target) class ImporterDefault(models.Model): """ Targets of default values in an import """ importer_type = models.ForeignKey( ImporterType, related_name="defaults", on_delete=models.CASCADE ) target = models.CharField("Target", max_length=500) class Meta: verbose_name = _("Importer - Default") verbose_name_plural = _("Importer - Defaults") unique_together = ("importer_type", "target") ADMIN_SECTION = _("Imports") objects = ImporterDefaultManager() def __str__(self): return "{} - {}".format(self.importer_type, self.target) def natural_key(self): return self.importer_type.slug, self.target @property def keys(self): return tuple(t for t in self.target.split("__") if t not in ("-", "")) @property def associated_model(self): if not self.keys: return import_class(self.importer_type.associated_models.klass) return get_associated_model( self.importer_type.associated_models.klass, self.keys ) @property def values(self): values = {} for default_value in self.default_values.all(): target = default_value.target if target == "-": target = "" values[target.split("__")[0]] = default_value.get_value() return values class ImporterDefaultValuesManager(models.Manager): def get_by_natural_key(self, def_target_type, def_target, target): return self.get( default_target__importer_type__slug=def_target_type, default_target__target=def_target, target=target, ) class ImporterDefaultValues(models.Model): """ Default values in an import """ default_target = models.ForeignKey( ImporterDefault, related_name="default_values", on_delete=models.CASCADE ) target = models.CharField("Target", max_length=500) value = models.CharField("Value", max_length=500) objects = ImporterDefaultValuesManager() class Meta: verbose_name = _("Importer - Default value") verbose_name_plural = _("Importer - Default values") unique_together = ("default_target", "target") ADMIN_SECTION = _("Imports") def natural_key(self): return ( self.default_target.importer_type.slug, self.default_target.target, self.target, ) def __str__(self): return "{} - {}".format(self.default_target, self.target, self.value) def get_value(self): parent_model = self.default_target.associated_model target = self.target.strip() return self._get_value(parent_model, target) def _get_value(self, parent_model, target): if not parent_model: return self.value fields = get_model_fields(parent_model) if "__" in target: targets = target.split("__") if targets[0] not in fields: return new_target = "__".join(targets[1:]) field = fields[targets[0]] if not hasattr(field, "remote_field") or not hasattr( field.remote_field, "model" ): return self.value new_parent_model = field.remote_field.model return self._get_value(new_parent_model, new_target) if target not in fields: return field = fields[target] if target in ("srid", "txt_idx"): try: return parent_model.objects.get(**{target: self.value}) except (ValueError, parent_model.DoesNotExist): pass if not hasattr(field, "remote_field") or not hasattr( field.remote_field, "model" ): return self.value model = field.remote_field.model # if value is an id try: return model.objects.get(pk=int(self.value)) except (ValueError, model.DoesNotExist): pass # try with txt_idx try: return model.objects.get(txt_idx=self.value) except (ValueError, model.DoesNotExist): pass return "" class ImporterColumnManager(models.Manager): def get_by_natural_key(self, importer_type, col_number): return self.get(importer_type__slug=importer_type, col_number=col_number) class ImporterColumn(models.Model): """ Import file column description """ label = models.CharField(_("Label"), blank=True, null=True, max_length=200) importer_type = models.ForeignKey( ImporterType, related_name="columns", on_delete=models.CASCADE ) col_number = models.IntegerField(_("Column number"), default=1) description = models.TextField(_("Description"), blank=True, null=True) regexp_pre_filter = models.ForeignKey( "Regexp", blank=True, null=True, on_delete=models.SET_NULL, related_name="columns", ) value_format = models.ForeignKey( "ValueFormater", blank=True, null=True, on_delete=models.SET_NULL, related_name="columns", ) required = models.BooleanField(_("Required"), default=False) export_field_name = models.CharField( _("Export field name"), blank=True, null=True, max_length=200, help_text=_( "Fill this field if the field name is ambiguous for " "export. For instance: concatenated fields." ), ) objects = ImporterColumnManager() class Meta: verbose_name = _("Importer - Column") verbose_name_plural = _("Importer - Columns") ordering = ("importer_type", "col_number") unique_together = ("importer_type", "col_number") ADMIN_SECTION = _("Imports") def __str__(self): return "{} - {}".format(self.importer_type, self.col_number) @property def col_string(self): return num2col(self.col_number) def natural_key(self): return self.importer_type.slug, self.col_number def targets_lbl(self): return ", ".join([target.target for target in self.targets.all()]) def duplicate_fields_lbl(self): return ", ".join([dp.field_name or "" for dp in self.duplicate_fields.all()]) def formater_type_lbl(self): return ", ".join([str(target.formater_type) for target in self.targets.all()]) class ImporterDuplicateFieldManager(models.Manager): def get_by_natural_key(self, importer_type, col_number, field_name): return self.get( column__importer_type__slug=importer_type, column__col_number=col_number, field_name=field_name, ) class ImporterDuplicateField(models.Model): """ Direct copy of result in other fields """ column = models.ForeignKey( ImporterColumn, related_name="duplicate_fields", on_delete=models.CASCADE ) field_name = models.CharField( _("Field name"), blank=True, null=True, max_length=200 ) force_new = models.BooleanField(_("Force creation of new items"), default=False) concat = models.BooleanField(_("Concatenate with existing"), default=False) concat_str = models.CharField( _("Concatenate character"), max_length=5, blank=True, null=True ) objects = ImporterDuplicateFieldManager() class Meta: verbose_name = _("Importer - Duplicate field") verbose_name_plural = _("Importer - Duplicate fields") ordering = ("column", "field_name") unique_together = ("column", "field_name") ADMIN_SECTION = _("Imports") def natural_key(self): return self.column.importer_type.slug, self.column.col_number, self.field_name class NamedManager(models.Manager): def get_by_natural_key(self, name): return self.get(name=name) class Regexp(models.Model): name = models.CharField(_("Name"), max_length=100, unique=True) description = models.TextField(_("Description"), blank=True, null=True) regexp = models.CharField(_("Regular expression"), max_length=500) objects = NamedManager() class Meta: verbose_name = _("Importer - Regular expression") verbose_name_plural = _("Importer - Regular expressions") ADMIN_SECTION = _("Imports") def __str__(self): return self.name def natural_key(self): return (self.name,) class ValueFormater(models.Model): name = models.CharField(_("Name"), max_length=100, unique=True) slug = models.SlugField(_("Slug"), unique=True, max_length=100) description = models.TextField(_("Description"), blank=True, null=True) format_string = models.CharField( _("Format string"), max_length=100, help_text=_( "A string used to format a value using the Python " '"format()" method. The site https://pyformat.info/ ' 'provide good examples of usage. Only one "{}" entry ' 'is managed. Instead you can use "{item}". The input is ' 'assumed to be a string.' ), ) objects = SlugModelManager() class Meta: verbose_name = _("Importer - Value format") verbose_name_plural = _("Importer - Value formats") ADMIN_SECTION = _("Imports") def __str__(self): return self.name def clean(self): try: self.format_string.format("sample value") except ValueError: raise ValidationError( { "format_string": _( "The string provided generate an error. " "Fix it." ) } ) def natural_key(self): return (self.slug,) class ImportTargetManager(models.Manager): def get_by_natural_key(self, importer_type, col_number, target): return self.get( column__importer_type__slug=importer_type, column__col_number=col_number, target=target, ) class ImportTarget(models.Model): """ Ishtar database target for a column """ column = models.ForeignKey( ImporterColumn, related_name="targets", on_delete=models.CASCADE ) target = models.CharField("Target", max_length=500) formater_type = models.ForeignKey( "FormaterType", related_name="targets", on_delete=models.CASCADE ) force_new = models.BooleanField(_("Force creation of new items"), default=False) concat = models.BooleanField(_("Concatenate with existing"), default=False) concat_str = models.CharField( _("Concatenate character"), max_length=5, blank=True, null=True ) comment = models.TextField(_("Comment"), blank=True, null=True) objects = ImportTargetManager() class Meta: verbose_name = _("Importer - Target") verbose_name_plural = _("Importer - Targets") unique_together = ("column", "target") ADMIN_SECTION = _("Imports") def __str__(self): return self.target[:50] if self.target else self.comment @cached_property def verbose_name(self): if not self.column.description: return self.target[:50] desc = self.column.description desc = desc[0].lower() + desc[1:] return "{} - {}".format(self.target[:50], desc) def natural_key(self): return self.column.importer_type.slug, self.column.col_number, self.target @property def associated_model(self): if self.target.startswith("data__"): return try: return get_associated_model( self.column.importer_type.associated_models.klass, self.target.split("__"), ) except KeyError: return def get_choices(self): if ( self.formater_type.formater_type == "UnknowType" and self.column.importer_type.slug ): cls = self.column.importer_type.get_importer_class() formt = cls().line_format[self.column.col_number - 1] if hasattr(formt.formater, "choices"): return [("", "--" * 8)] + list(formt.formater.choices) return [("", "--" * 8)] if self.formater_type.formater_type == "StrToBoolean": return [("", "--" * 8), ("True", _("True")), ("False", _("False"))] if not self.associated_model or not hasattr(self.associated_model, "get_types"): return [] return self.associated_model.get_types() class TargetKeyGroup(models.Model): """ Group of target keys for imports. """ name = models.TextField(_("Name"), unique=True) all_user_can_use = models.BooleanField(_("All users can use it"), default=False) all_user_can_modify = models.BooleanField( _("All users can modify it"), default=False ) available = models.BooleanField(_("Available"), default=True) class Meta: verbose_name = _("Importer - Target key group") verbose_name_plural = _("Importer - Target key groups") ADMIN_SECTION = _("Imports") def __str__(self): return self.name class TargetKey(models.Model): """ User's link between import source and ishtar database. Also temporary used for GeneralType to point missing link before adding them in ItemKey table. A targetkey connection can be create to be applied to one particular import (associated_import), one particular user (associated_user), one particular group (associated_group) or to all imports (associated_import, associated_user and associated_group are empty). """ target = models.ForeignKey( ImportTarget, related_name="keys", on_delete=models.CASCADE ) key = models.TextField(_("Key")) value = models.TextField(_("Value"), blank=True, null=True) is_set = models.BooleanField(_("Is set"), default=False) associated_import = models.ForeignKey( "Import", blank=True, null=True, on_delete=models.SET_NULL ) associated_user = models.ForeignKey( "IshtarUser", blank=True, null=True, on_delete=models.SET_NULL ) associated_group = models.ForeignKey( TargetKeyGroup, blank=True, null=True, on_delete=models.SET_NULL ) class Meta: unique_together = ( "target", "key", "associated_user", "associated_import", ) verbose_name = _("Importer - Target key") verbose_name_plural = _("Importer - Targets keys") ordering = ("target", "key") ADMIN_SECTION = _("Imports") def __str__(self): return " - ".join([str(self.target), self.key[:50]]) def column_nb(self): # for the admin return self.target.column.col_number def importer_type(self): # for the admin return self.target.column.importer_type.name def format(self): if not self.is_set: return None if self.target.formater_type.formater_type == "StrToBoolean": if self.value in ("False", "0"): return False elif self.value: return True return return self.value def save(self, *args, **kwargs): obj = super(TargetKey, self).save(*args, **kwargs) if not self.value: return obj v = None associated_model = self.target.associated_model if associated_model and hasattr(self.target.associated_model, "add_key"): # pk is given try: v = self.target.associated_model.objects.get(pk=str(int(self.value))) except (ValueError, self.target.associated_model.DoesNotExist): # try with txt_idx try: v = self.target.associated_model.objects.get( txt_idx=str(self.value) ) except self.target.associated_model.DoesNotExist: pass if v: keys = {} if self.associated_group: keys["group"] = self.associated_group if self.associated_user: keys["user"] = self.associated_user else: keys["importer"] = self.associated_import v.add_key(self.key, **keys) return obj TARGET_MODELS = [ ("OrganizationType", _("Organization type")), ("ishtar_common.models.OrganizationType", _("Organization type")), ("ishtar_common.models.PersonType", _("Person type")), ("TitleType", _("Title")), ("SourceType", _("Source type")), ("ishtar_common.models.SourceType", _("Source type")), ("AuthorType", _("Author type")), ("Format", _("Format")), ("ProfileType", _("Profile type")), ("ishtar_common.models.Format", _("Format")), ("ishtar_common.models.LicenseType", _("License type")), ("ishtar_common.models.DocumentTag", _("Document tag")), ("ishtar_common.models.Language", _("Language")), ("ishtar_common.models.SupportType", _("Support type")), ("archaeological_operations.models.CulturalAttributionType", _("Cultural attribution type")), ("archaeological_operations.models.OperationType", _("Operation type")), ("archaeological_operations.models.Period", _("Period")), ("archaeological_operations.models.ReportState", _("Report state")), ("archaeological_operations.models.RemainType", _("Remain type")), ("archaeological_operations.models.RelationType", _("Operation relation type")), ("archaeological_operations.models.ActType", _("Act type")), ("archaeological_context_records.models.Unit", _("Unit")), ("archaeological_context_records.models.ActivityType", _("Activity type")), ( "archaeological_context_records.models.DocumentationType", _("Documentation type"), ), ("archaeological_context_records.models.DatingQuality", _("Dating quality")), ("archaeological_finds.models.MaterialType", _("Material")), ("archaeological_finds.models.ConservatoryState", _("Conservatory state")), ("archaeological_warehouse.models.ContainerType", _("Container type")), ("archaeological_warehouse.models.WarehouseDivision", _("Warehouse division")), ("archaeological_warehouse.models.WarehouseType", _("Warehouse type")), ("archaeological_finds.models.TreatmentType", _("Treatment type")), ( "archaeological_finds.models.TreatmentEmergencyType", _("Treatment emergency type"), ), ("archaeological_finds.models.ObjectType", _("Object type")), ("archaeological_finds.models.IntegrityType", _("Integrity type")), ("archaeological_finds.models.RemarkabilityType", _("Remarkability type")), ("archaeological_finds.models.AlterationType", _("Alteration type")), ("archaeological_finds.models.AlterationCauseType", _("Alteration cause type")), ("archaeological_finds.models.BatchType", _("Batch type")), ("archaeological_finds.models.CheckedType", _("Checked type")), ("archaeological_finds.models.MaterialTypeQualityType", _("Material type quality")), ("archaeological_finds.models.FunctionalArea", _("Functional area")), ( "archaeological_context_records.models.IdentificationType", _("Identification type"), ), ( "archaeological_context_records.models.RelationType", _("Context record relation type"), ), ("SpatialReferenceSystem", _("Spatial reference system")), ("SupportType", _("Support type")), ("TitleType", _("Title type")), ] TARGET_MODELS_KEYS = [tm[0] for tm in TARGET_MODELS] IMPORTER_TYPES = ( ("IntegerFormater", _("Integer")), ("FloatFormater", _("Float")), ("UnicodeFormater", _("String")), ("DateFormater", _("Date")), ("TypeFormater", _("Type")), ("YearFormater", _("Year")), ("InseeFormater", _("INSEE code")), ("UpperFormater", _("Upper case")), ("LowerFormater", _("Lower case")), ("StrToBoolean", _("String to boolean")), ("FileFormater", pgettext_lazy("filesystem", "File")), ("UnknowType", _("Unknow type")), ) IMPORTER_TYPES_DCT = { "IntegerFormater": IntegerFormater, "FloatFormater": FloatFormater, "UnicodeFormater": UnicodeFormater, "DateFormater": DateFormater, "TypeFormater": TypeFormater, "YearFormater": YearFormater, "StrToBoolean": StrToBoolean, "FileFormater": FileFormater, "InseeFormater": InseeFormater, "UpperFormater": UpperCaseFormater, "LowerFormater": LowerCaseFormater, "UnknowType": None, } DATE_FORMATS = ( ("%Y", _('4 digit year. e.g.: "2015"')), ("%Y/%m/%d", _('4 digit year/month/day. e.g.: "2015/02/04"')), ("%d/%m/%Y", _('Day/month/4 digit year. e.g.: "04/02/2015"')), ) IMPORTER_TYPES_CHOICES = {"TypeFormater": TARGET_MODELS, "DateFormater": DATE_FORMATS} class FormaterTypeManager(models.Manager): def get_by_natural_key(self, formater_type, options, many_split): return self.get( formater_type=formater_type, options=options, many_split=many_split ) class FormaterType(models.Model): formater_type = models.CharField( "Formater type", max_length=20, choices=IMPORTER_TYPES ) options = models.CharField(_("Options"), max_length=500, blank=True, null=True) many_split = models.CharField( _("Split character(s)"), max_length=10, blank=True, null=True ) objects = FormaterTypeManager() class Meta: verbose_name = _("Importer - Formater type") verbose_name_plural = _("Importer - Formater types") unique_together = ("formater_type", "options", "many_split") ordering = ("formater_type", "options") ADMIN_SECTION = _("Imports") def natural_key(self): return self.formater_type, self.options, self.many_split def __str__(self): return " - ".join( [ str(dict(IMPORTER_TYPES)[self.formater_type]) if self.formater_type in IMPORTER_TYPES_DCT else "" ] + [getattr(self, k) for k in ("options", "many_split") if getattr(self, k)] ) def get_choices(self): if self.format_type in IMPORTER_TYPES_CHOICES: return IMPORTER_TYPES_CHOICES[self.format_type] def get_formater_type(self, target, import_instance=None): if self.formater_type not in IMPORTER_TYPES_DCT.keys(): return kwargs = {"db_target": target, "import_instance": import_instance} if self.many_split: kwargs["many_split"] = self.many_split if self.formater_type == "TypeFormater": if self.options in dir(): model = dir()[self.options] else: try: model = import_class(self.options) except (AttributeError, SuspiciousOperation): logger.warning( "**WARN FormaterType.get_formater_type**: {} " "is not in valid.".format(self.options) ) return # must be explicit if non general type if self.options not in TARGET_MODELS_KEYS and \ not [True for m in model.__mro__ if m.__name__ == "GeneralType"]: logger.warning( "**WARN FormaterType.get_formater_type**: {} " "is not in valid.".format(self.options) ) return return TypeFormater(model, **kwargs) elif self.formater_type == "UnicodeFormater": if self.options: try: return UnicodeFormater(int(self.options.strip()), **kwargs) except ValueError: pass return UnicodeFormater(**kwargs) elif self.formater_type == "DateFormater": date_formats = self.options if self.many_split: date_formats = self.options.split(kwargs.pop("many_split")) return DateFormater(date_formats, **kwargs) elif self.formater_type == "StrToBoolean": return StrToBoolean(**kwargs) elif self.formater_type == "UnknowType": return else: return IMPORTER_TYPES_DCT[self.formater_type](**kwargs) IMPORT_STATE = ( ("C", _("Created")), ("AP", _("Analyse in progress")), ("A", _("Analysed")), ("HQ", _("Check modified in queue")), ("IQ", _("Import in queue")), ("HP", _("Check modified in progress")), ("IP", _("Import in progress")), ("PP", _("Post-processing in progress")), ("PI", _("Partially imported")), ("FE", _("Finished with errors")), ("F", _("Finished")), ("AC", _("Archived")), ) IMPORT_STATE_DCT = dict(IMPORT_STATE) IMPORT_GROUP_STATE = ( ("C", _("Created")), ("AP", _("Analyse in progress")), ("A", _("Analysed")), ("IQ", _("Import in queue")), ("IP", _("Import in progress")), ("PP", _("Post-processing in progress")), ("FE", _("Finished with errors")), ("F", _("Finished")), ("AC", _("Archived")), ) IMPORT_GROUP_STATE_DCT = dict(IMPORT_STATE) ENCODINGS = [ (settings.ENCODING, settings.ENCODING), (settings.ALT_ENCODING, settings.ALT_ENCODING), ("utf-8", "utf-8"), ] CSV_SEPS = ((",", ","), (";", ";"), ("|", "|")) @task() def delayed_import(import_pk): try: imp = Import.objects.get(pk=import_pk) except Import.DoesNotExist: return imp.importation() @task() def delayed_check(import_pk): try: imp = Import.objects.get(pk=import_pk) except Import.DoesNotExist: return imp.check_modified() def convert_geom(feature, srid): geo_type = feature["type"] if geo_type in ("LineString", "Polygon"): feature["type"] = "Multi" + geo_type feature["coordinates"] = [feature["coordinates"]] feature = GEOSGeometry(json.dumps(feature)) has_z = feature.hasz feature = f"SRID={srid};{feature.wkt}" IshtarSiteProfile = apps.get_model("ishtar_common", "IshtarSiteProfile") profile = IshtarSiteProfile.get_current_profile() srs = 4326 if not has_z else 4979 if profile.srs and profile.srs.srid: srs = profile.srs.srid if srs != srid: # Coordinates are reversed - should be fixed on Django 3.2 feature = reverse_coordinates( GEOSGeometry(feature).transform(srs, clone=True).ewkt) return feature IMPORT_GEOMETRY = { "Point": "point_2d", "3D Point": "point_3d", "MultiPoint": "multi_points", "LineString": "multi_line", "MultiLineString": "multi_line", "Polygon": "multi_polygon", "MultiPolygon": "multi_polygon", } class BaseImport(models.Model): user = models.ForeignKey( "IshtarUser", blank=True, null=True, on_delete=models.SET_NULL ) name = models.CharField(_("Name"), max_length=500, null=True) imported_file = models.FileField( _("Imported file"), upload_to="upload/imports/%Y/%m/", max_length=220, help_text=max_size_help(), blank=True, null=True, ) imported_images = models.FileField( _("Associated documents (zip file)"), upload_to="upload/imports/%Y/%m/", blank=True, null=True, max_length=220, help_text=max_size_help(), ) encoding = models.CharField( _("Encoding"), choices=ENCODINGS, default="utf-8", max_length=15, help_text=_("Only required for CSV file"), ) csv_sep = models.CharField( _("CSV separator"), choices=CSV_SEPS, default=",", max_length=1, help_text=_( "Separator for CSV file. Standard is comma but Microsoft " "Excel do not follow this standard and use semi-colon." ), ) skip_lines = models.IntegerField( _("Skip lines"), default=1, help_text=_("Number of header lines in your file (can be 0 and should be 0 for geopackage or Shapefile)."), ) creation_date = models.DateTimeField( _("Creation date"), auto_now_add=True, blank=True, null=True ) end_date = models.DateTimeField( _("End date"), auto_now_add=True, blank=True, null=True, editable=False ) class Meta: abstract = True class ImportGroup(BaseImport): importer_type = models.ForeignKey(ImporterGroup, on_delete=models.CASCADE, verbose_name=_("Importer group type")) current_import = models.PositiveIntegerField(_("Current import"), blank=True, null=True) state = models.CharField( _("State"), max_length=2, choices=IMPORT_GROUP_STATE, default="C" ) class Meta: verbose_name = _("Import - Group") verbose_name_plural = _("Import - Groups") ADMIN_SECTION = _("Imports") def __str__(self): return f"{self.name} ({self.importer_type.name})" @property def import_id(self): return f"group-{self.id}" def import_list(self): """ Sorted import list by order in the importer group """ import_list = [] for imp in self.imports.all(): igi = ImporterGroupImporter.objects.filter( group=self.importer_type, importer_type=imp.importer_type ) if not igi.count(): # inconsistent data continue import_list.append((igi.all()[0].order, imp)) return [imp for order, imp in sorted(import_list, key=lambda i: i[0])] @property def status(self): if self.state not in IMPORT_GROUP_STATE_DCT: return "" return IMPORT_GROUP_STATE_DCT[str(self.state)] def get_actions(self): """ Get available action relevant with the current status """ actions = [] if self.state == "C": actions.append(("A", _("Analyse"))) if self.state == "A": actions.append(("A", _("Re-analyse"))) actions.append(("I", _("Launch import"))) if self.state in ("F", "FE"): actions.append(("A", _("Re-analyse"))) actions.append(("I", _("Re-import"))) actions.append(("AC", _("Archive"))) if self.state == "AC": state = "FE" if any(1 for imp in self.imports.all() if imp.error_file) else "F" actions.append((state, _("Unarchive"))) actions.append(("D", _("Delete"))) return actions def initialize(self, user=None, session_key=None): self.state = "AP" self.end_date = datetime.datetime.now() self.save() for imp in self.imports.all(): imp.initialize(user, session_key) self.state = "A" self.end_date = datetime.datetime.now() self.save() def importation( self, session_key=None, line_to_process=None, simulate=False, return_importer_and_data=False, request=None, ): q = self.imports if not q.count(): return self.state = "IP" self.end_date = datetime.datetime.now() self.save() first = self.import_list()[0] first.importation( session_key=session_key, line_to_process=line_to_process, simulate=simulate, return_importer_and_data=return_importer_and_data, request=request ) # from the first import if all is good, cascade import has_error = any(i.error_file.name for i in q.all() if i.error_file.name) if has_error: self.state = "FE" else: self.state = "F" self.end_date = datetime.datetime.now() self.save() def get_all_imported(self): imported = [] for imp in self.imports.all(): for related, zorg in get_all_related_m2m_objects_with_model(imp): accessor = related.get_accessor_name() imported += [(accessor, obj) for obj in getattr(imp, accessor).all()] return sorted(imported, key=lambda i: i[1].__class__.__name__) def save(self, *args, **kwargs): add = self._state.adding super().save(*args, **kwargs) if not add: return name = f"{self.name} ({self.importer_type.name})" imports = [] imported_file, imported_images = None, None if self.imported_file: imported_file = ContentFile(self.imported_file.read()) imported_file.name = self.imported_file.name.split(os.sep)[-1] if self.imported_images: imported_images = ContentFile(self.imported_images.read()) imported_images.name = self.imported_images.name.split(os.sep)[-1] for import_type_relation in self.importer_type.importer_types.all(): import_type = import_type_relation.importer_type imp = Import.objects.create( name=name, importer_type=import_type, group=self, ) imports.append(imp) modified = False # TODO: only get the relevant sheet if imported_file: imp.imported_file = imported_file modified = True if import_type.archive_required and imported_images: imp.imported_images = imported_images modified = True if modified: imp.save() previous = None for imp in reversed(imports): if previous: imp.next_import = previous imp.save() previous = imp class Import(BaseImport): importer_type = models.ForeignKey(ImporterType, on_delete=models.CASCADE, verbose_name=_("Importer type")) # TODO - associated_group: relevant? associated_group = models.ForeignKey( TargetKeyGroup, blank=True, null=True, on_delete=models.SET_NULL, help_text=_( "If a group is selected, target key saved in this group will be used." ), ) error_file = models.FileField( _("Error file"), upload_to="upload/imports/%Y/%m/", blank=True, null=True, max_length=255, help_text=max_size_help(), ) result_file = models.FileField( _("Result file"), upload_to="upload/imports/%Y/%m/", blank=True, null=True, max_length=255, help_text=max_size_help(), ) match_file = models.FileField( _("Match file"), upload_to="upload/imports/%Y/%m/", blank=True, null=True, max_length=255, help_text=max_size_help(), ) archive_file = models.FileField( _("Archive file"), upload_to="upload/imports/%Y/%m/", blank=True, null=True, max_length=255, help_text=max_size_help(), ) state = models.CharField( _("State"), max_length=2, choices=IMPORT_STATE, default="C" ) conservative_import = models.BooleanField( _("Conservative import"), default=False, help_text=_("If set to true, do not overload existing values."), ) seconds_remaining = models.IntegerField( _("Remaining seconds"), blank=True, null=True, editable=False ) current_line = models.IntegerField(_("Current line"), blank=True, null=True) number_of_line = models.IntegerField(_("Number of line"), blank=True, null=True) imported_line_numbers = models.TextField( _("Imported line numbers"), blank=True, null=True, validators=[validate_comma_separated_integer_list], ) changed_checked = models.BooleanField(_("Changed have been checked"), default=False) changed_line_numbers = models.TextField( _("Changed line numbers"), blank=True, null=True, validators=[validate_comma_separated_integer_list], ) group = models.ForeignKey(ImportGroup, blank=True, null=True, on_delete=models.CASCADE, verbose_name=_("Group"), related_name="imports") next_import = models.ForeignKey( "Import", blank=True, null=True, on_delete=models.SET_NULL, verbose_name=_("Next import"), related_name="imports") class Meta: verbose_name = _("Import - Import") verbose_name_plural = _("Import - Imports") ADMIN_SECTION = _("Imports") def __str__(self): return "{} | {}".format(self.name or "-", self.importer_type) @property def import_id(self): return str(self.id) def need_matching(self): return bool( TargetKey.objects.filter(associated_import=self, is_set=False).count() ) @property def errors(self): if not self.error_file: return [] errors = [] with open(self.error_file.path, "rt") as csvfile: reader = csv.DictReader(csvfile, fieldnames=["line", "column", "error"]) for idx, row in enumerate(reader): if not idx: # pass the header continue errors.append(row) return errors def get_number_of_lines(self): if self.number_of_line: return self.number_of_line if self.importer_type.type == "gis": return if not self.imported_file or not self.imported_file.path: return filename = self.imported_file.path encodings = [self.encoding] encodings += [coding for coding, c in ENCODINGS if coding != self.encoding] for encoding in encodings: try: with open(filename, "r", encoding=encoding) as f: reader = csv.reader(f, delimiter=self.csv_sep) nb = sum(1 for __ in reader) - self.skip_lines except UnicodeDecodeError: pass # try the next encoding except csv.Error: raise ImporterError(_("Error in the CSV file.")) self.number_of_line = nb self.save() return nb @property def progress_percent(self): if not self.current_line or not self.number_of_line: return 0 return int((float(self.current_line) / float(self.number_of_line)) * 100) def add_imported_line(self, idx_line): if not self.number_of_line: self.get_number_of_lines() if self.imported_line_numbers and str( idx_line ) in self.imported_line_numbers.split(","): return if self.imported_line_numbers: self.imported_line_numbers += "," else: self.imported_line_numbers = "" self.imported_line_numbers += str(idx_line) self.current_line = idx_line self.save() def add_changed_line(self, idx_line): if self.changed_line_numbers and str( idx_line ) in self.changed_line_numbers.split(","): return if self.changed_line_numbers: self.changed_line_numbers += "," else: self.changed_line_numbers = "" self.changed_line_numbers += str(idx_line) self.save() def remove_changed_line(self, idx_line): if not self.changed_line_numbers: return line_numbers = self.changed_line_numbers.split(",") if str(idx_line) not in line_numbers: return line_numbers.pop(line_numbers.index(str(idx_line))) self.changed_line_numbers = ",".join(line_numbers) self.save() def has_changes(self, idx_line): if not self.changed_checked: return True if not self.changed_line_numbers: return line_numbers = self.changed_line_numbers.split(",") return str(idx_line) in line_numbers def line_is_imported(self, idx_line): return self.imported_line_numbers and str( idx_line ) in self.imported_line_numbers.split(",") def get_actions(self): """ Get available action relevant with the current status """ IshtarSiteProfile = apps.get_model("ishtar_common", "IshtarSiteProfile") profile = IshtarSiteProfile.get_current_profile() actions = [] if self.state == "C": actions.append(("A", _("Analyse"))) if self.state in ("A", "PI"): actions.append(("A", _("Re-analyse"))) actions.append(("I", _("Launch import"))) if profile.experimental_feature: if self.changed_checked: actions.append(("IS", _("Step by step import"))) actions.append(("CH", _("Re-check for changes"))) else: actions.append(("CH", _("Check for changes"))) if self.state in ("F", "FE"): actions.append(("A", _("Re-analyse"))) actions.append(("I", _("Re-import"))) if profile.experimental_feature: if self.changed_checked: actions.append(("IS", _("Step by step re-import"))) actions.append(("CH", _("Re-check for changes"))) else: actions.append(("CH", _("Check for changes"))) actions.append(("AC", _("Archive"))) if self.state == "AC": state = "FE" if self.error_file else "F" actions.append((state, _("Unarchive"))) if self.state in ("C", "A"): actions.append(("ED", _("Edit"))) actions.append(("D", _("Delete"))) return actions @property def imported_filename(self): return self.imported_file.name.split(os.sep)[-1] @property def status(self): if self.state not in IMPORT_STATE_DCT: return "" return IMPORT_STATE_DCT[self.state] def get_importer_instance(self): return self.importer_type.get_importer_class(import_instance=self)( skip_lines=self.skip_lines, import_instance=self, conservative_import=self.conservative_import, ) def _data_table_tab(self): imported_file = self.imported_file.path tmpdir = None if zipfile.is_zipfile(imported_file): z = zipfile.ZipFile(imported_file) filename = None for name in z.namelist(): # get first CSV file found if name.endswith(".csv"): filename = name break if not filename: return [] tmpdir = tempfile.mkdtemp(prefix="tmp-ishtar-") imported_file = z.extract(filename, tmpdir) encodings = [self.encoding] encodings += [coding for coding, c in ENCODINGS if coding != self.encoding] for encoding in encodings: try: with open(imported_file, encoding=encoding) as csv_file: vals = [ line for line in csv.reader(csv_file, delimiter=self.csv_sep) ] if tmpdir: shutil.rmtree(tmpdir) return vals except UnicodeDecodeError: pass # try the next encoding except csv.Error: raise ImporterError(_("Error in the CSV file.")) if tmpdir: shutil.rmtree(tmpdir) return [] def get_gis_attr(self): return self._data_table_gis(get_gis_attr=True) def _data_table_gis(self, get_gis_attr=False): self.gis_attr = None imported_file = self.imported_file.path tmp_dir = None file_type = "gpkg" if zipfile.is_zipfile(imported_file): z = zipfile.ZipFile(imported_file) imported_file = None filenames = [] for name in z.namelist(): # get first CSV file found name_l = name.lower() if name_l.endswith(".gpkg"): filenames = [name] break if name_l.endswith(".shp"): file_type = "shp" filenames.append(name) continue for end in [".cpg", ".prj", ".shx", ".dbf"]: if name_l.endswith(end): filenames.append(name) continue if not filenames: return [] tmp_dir = tempfile.mkdtemp(prefix="tmp-ishtar-") for filename in filenames: if filename.lower().endswith(".shp") or filename.lower().endswith( ".gpkg" ): imported_file = z.extract(filename, tmp_dir) else: z.extract(filename, tmp_dir) elif imported_file.endswith(".csv"): return self._data_table_tab() elif not imported_file.endswith(".gpkg"): raise ImporterError(_("Invalid GIS file.")) if not imported_file: raise ImporterError(_("Invalid GIS file.")) kwargs = {} if self.importer_type.layer_name: kwargs["layer"] = self.importer_type.layer_name try: with fiona.open(imported_file, **kwargs) as collection: schema = collection.schema geometry = schema["geometry"] if geometry not in IMPORT_GEOMETRY: raise ImporterError(_(f'Geometry "{geometry}" not managed.')) self.gis_attr = IMPORT_GEOMETRY[geometry] if get_gis_attr: if tmp_dir: shutil.rmtree(tmp_dir) return self.gis_attr properties = schema["properties"].keys() crs = fiona_crs.to_string(collection.crs) if not crs: driver_type = {"shp": "ESRI Shapefile", "gpkg": "GPKG"} driver = ogr.GetDriverByName(driver_type[file_type]) shape = driver.Open(imported_file) layer = shape.GetLayer() crs = layer.GetSpatialRef() auth = crs.GetAttrValue("AUTHORITY", 0) srid = crs.GetAttrValue("AUTHORITY", 1) if auth == "EPSG": srid = int(srid) elif auth == "IGNF" and srid in IGNF: srid = IGNF[srid] else: raise ImporterError(_("CRS not managed.")) # https://pyproj4.github.io/pyproj/stable/gotchas.html#axis-order-changes-in-proj-6 elif crs.startswith("+init=epsg:"): srid = crs[len("+init=epsg:"):] else: srid = CRS.from_proj4(crs).to_epsg() data = [] # Warning: RuntimeWarning: Sequential read of iterator was interrupted. # Resetting iterator. # not relevant -> bug in fiona 1.8.18 (fixed in 1.8.19) if not self.number_of_line: self.number_of_line = len(collection) self.save() for idx, feature in enumerate(collection): try: line = [ convert_geom(feature["geometry"], srid) ] for prop in properties: value = feature["properties"][prop] if value is None: value = "" line.append(str(value)) data.append(line) except (TypeError, GDALException, GEOSException) as e: raise ImporterError( _(f"Error reading feature {idx + 1} - {e}") ) except fiona.errors.DriverError: raise ImporterError(_("Invalid GIS file.")) if tmp_dir: shutil.rmtree(tmp_dir) return data @property def data_table(self): if self.importer_type.type == "tab": return self._data_table_tab() if self.importer_type.type == "gis": return self._data_table_gis() def initialize(self, user=None, session_key=None): self.state = "AP" self.end_date = datetime.datetime.now() self.save() try: self.get_importer_instance().initialize( self.data_table, user=user, output="db" ) except ImporterError as e: if session_key: put_session_message(session_key, e.msg, "danger") self.state = "C" self.save() return self.state = "A" self.end_date = datetime.datetime.now() self.save() def delayed_check_modified(self, session_key): if not settings.USE_BACKGROUND_TASK: return self.check_modified(session_key=session_key) put_session_message( session_key, str(_("Modification check {} added to the queue")).format(self.name), "info", ) self.state = "HQ" self.end_date = datetime.datetime.now() self.save() return delayed_check.delay(self.pk) def check_modified(self, session_key=None): self.state = "HP" self.end_date = datetime.datetime.now() self.changed_line_numbers = "" self.changed_checked = False self.save() for idx in range(self.skip_lines, self.get_number_of_lines() + 1): try: imprt, data = self.importation( simulate=True, line_to_process=idx, return_importer_and_data=True ) except IOError as e: # error is identified as a change self.add_changed_line(idx) continue # no data is not normal and an error is identified as a change if not data or not data[0]: self.add_changed_line(idx) continue # new objects is a change if imprt.new_objects: self.add_changed_line(idx) continue # check all updated fields changed = False for path, obj, values, updated_values in imprt.updated_objects: if changed: break for k in updated_values.keys(): if changed: break current_value = getattr(obj, k) updated_value = updated_values[k] if hasattr(current_value, "all"): current_value = list(current_value.all()) changed = False for v in updated_value: if v not in current_value: changed = True break else: if current_value != updated_value: changed = True break if changed: self.add_changed_line(idx) continue self.remove_changed_line(idx) self.changed_checked = True self.save() def delayed_importation(self, request, session_key): if not settings.USE_BACKGROUND_TASK: return self.importation(request=request, session_key=session_key) put_session_message( session_key, str(_("Import {} added to the queue")).format(self.name), "info", ) self.state = "IQ" self.end_date = datetime.datetime.now() self.save() return delayed_import.delay(self.pk) def importation( self, session_key=None, line_to_process=None, simulate=False, return_importer_and_data=False, request=None, ): self.state = "IP" self.end_date = datetime.datetime.now() if not line_to_process: # full import self.imported_line_numbers = "" self.current_line = 0 self.save() importer = self.get_importer_instance() try: data = importer.importation( self.data_table, user=self.user, line_to_process=line_to_process, simulate=simulate, ) except IOError: error_message = str(_("Error on imported file: {}")).format( self.imported_file ) importer.errors = [error_message] if session_key: put_session_message(session_key, error_message, "warning") ids = get_session_var(session_key, "current_import_id") if not ids: ids = [] ids.append(self.pk) put_session_var(session_key, "current_import_id", ids) if line_to_process: self.state = "PI" else: self.state = "FE" self.save() if not return_importer_and_data: return return importer, None # result file filename = slugify(self.importer_type.name) now = datetime.datetime.now().isoformat("-").replace(":", "") result_file = filename + "_result_%s.csv" % now self.result_file.save( result_file, ContentFile(importer.get_csv_result().encode("utf-8")) ) if importer.errors: if line_to_process: self.state = "PI" else: self.state = "FE" error_file = filename + "_errors_%s.csv" % now self.error_file.save( error_file, ContentFile(importer.get_csv_errors().encode("utf-8")) ) msg = str(_("Import {} finished with errors")).format(self.name) msg_cls = "warning" else: if line_to_process: self.state = "PI" else: self.state = "F" self.error_file = None msg = str(_("Import {} finished with no errors")).format(self.name) msg_cls = "primary" if session_key and request: put_session_message(session_key, msg, msg_cls) ids = ( request.session["current_import_id"] if "current_import_id" in request.session else [] ) ids.append(self.pk) put_session_var(session_key, "current_import_id", ids) if importer.match_table: match_file = filename + "_match_%s.csv" % now self.match_file.save( match_file, ContentFile(importer.get_csv_matches().encode("utf-8")) ) self.end_date = datetime.datetime.now() self.save() if self.next_import and not importer.errors: return self.next_import.importation( session_key=session_key, line_to_process=line_to_process, simulate=simulate, return_importer_and_data=return_importer_and_data, request=request ) if return_importer_and_data: return importer, data def _unarchive(self): if not self.archive_file: return with tempfile.TemporaryDirectory() as tmp_dir_name: # extract the current archive current_zip = zipfile.ZipFile(self.archive_file.path, "r") name_list = current_zip.namelist() if "content.json" not in name_list: return for name in name_list: current_zip.extract(name, tmp_dir_name) current_zip.close() content_name = os.path.join(tmp_dir_name, "content.json") try: with open(content_name, "r") as content: files = json.loads(content.read()) except (IOError, json.JSONDecodeError): return today = datetime.date.today() for attr in files: filename = files[attr] full_filename = os.path.join(tmp_dir_name, filename) with open(full_filename, "rb") as raw_file: getattr(self, attr).save( "upload/imports/{}/{:02d}/{}".format( today.year, today.month, filename ), File(raw_file), ) os.remove(self.archive_file.path) setattr(self, "archive_file", None) self.state = "FE" if self.error_file else "F" self.save() return True def _archive(self): file_attr = ["imported_file", "error_file", "result_file", "match_file"] files = [ (k, getattr(self, k).path, getattr(self, k).name.split(os.sep)[-1]) for k in file_attr if getattr(self, k) ] self._archive_pending = True with tempfile.TemporaryDirectory() as tmpdir: base_name = "{}.zip".format(slugify(self.name)) archive_name = os.path.join(tmpdir, base_name) with zipfile.ZipFile(archive_name, "w") as current_zip: zip_content = {} for k, path, name in files: try: current_zip.write(path, arcname=name) zip_content[k] = name except OSError: pass content_name = os.path.join(tmpdir, "content.json") with open(content_name, "w") as content: content.write(json.dumps(zip_content)) current_zip.write(content_name, arcname="content.json") today = datetime.date.today() with open( archive_name, "rb", ) as raw_file: self.archive_file.save( "upload/imports/{}/{:02d}/{}".format( today.year, today.month, base_name ), File(raw_file), ) IshtarSiteProfile = apps.get_model("ishtar_common", "IshtarSiteProfile") profile = IshtarSiteProfile.get_current_profile() if profile.delete_image_zip_on_archive: file_attr.append("imported_images") for attr in file_attr: file_field = getattr(self, attr) if file_field: try: os.remove(file_field.path) except FileNotFoundError: pass setattr(self, attr, None) self.save() self._archive_pending = False def archive(self): self.state = "AC" self.end_date = datetime.datetime.now() self._archive() def unarchive(self, state): if not self._unarchive(): self.state = state self.save() # only save if no save previously def get_all_imported(self): imported = [] for related, zorg in get_all_related_m2m_objects_with_model(self): accessor = related.get_accessor_name() imported += [(accessor, obj) for obj in getattr(self, accessor).all()] return imported def save(self, *args, **kwargs): super(Import, self).save(*args, **kwargs) if ( self.state == "AC" and not getattr(self, "_archive_pending", False) and not self.archive_file ): self._archive() def pre_delete_import(sender, **kwargs): # deleted imported items when an import is delete instance = kwargs.get("instance") if not instance: return to_delete = [] for accessor, imported in instance.get_all_imported(): to_delete.append(imported) post_delete_to_update = {} for item in to_delete: if hasattr(item, "post_delete_to_update"): item._no_pre_delete = True for klass, values in item.post_delete_to_update(): if klass not in post_delete_to_update: post_delete_to_update[klass] = set(values) else: post_delete_to_update[klass].update(values) item.delete() for klass in post_delete_to_update: for item_id in post_delete_to_update[klass]: q = klass.objects.filter(pk=item_id) if q.count(): q.all()[0].save() pre_delete.connect(pre_delete_import, sender=Import) class ItemKey(models.Model): key = models.TextField(_("Key")) content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE) object_id = models.PositiveIntegerField() content_object = GenericForeignKey("content_type", "object_id") importer = models.ForeignKey( Import, null=True, blank=True, help_text=_("Specific key to an import"), on_delete=models.SET_NULL, ) user = models.ForeignKey( "IshtarUser", blank=True, null=True, on_delete=models.SET_NULL ) group = models.ForeignKey( TargetKeyGroup, blank=True, null=True, on_delete=models.SET_NULL ) class Meta: verbose_name = _("Importer - Item key") verbose_name_plural = _("Imports - Item keys") ADMIN_SECTION = _("Imports") def __str__(self): return self.key