#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2017 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import csv import datetime import pandas import random from pathlib import Path import fiona from fiona import crs as fiona_crs import json from osgeo import ogr import os import logging from pyproj import CRS import shutil import re import tempfile import zipfile from django.apps import apps from django.conf import settings from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.contenttypes.models import ContentType from django.contrib.gis.db import models from django.contrib.gis.db.models import Manager from django.contrib.gis.gdal.error import GDALException from django.contrib.gis.geos import GEOSGeometry from django.contrib.gis.geos.error import GEOSException from django.core.exceptions import ValidationError, SuspiciousOperation from django.core.files import File from django.core.files.base import ContentFile from django.core.validators import validate_comma_separated_integer_list, MinValueValidator from django.db.models import Q from django.db.models.base import ModelBase from django.db.models.signals import pre_delete from django.template.defaultfilters import slugify from django.utils.functional import cached_property from django.utils.translation import ugettext_lazy as _, pgettext_lazy UnoCalc = None ITALIC = None if settings.USE_LIBREOFFICE: try: from ishtar_common.libreoffice import UnoCalc from com.sun.star.awt.FontSlant import ITALIC except ImportError: pass from ishtar_common.model_managers import SlugModelManager from ishtar_common.utils import ( create_slug, np_format_int_float, generate_dict_from_list, get_all_related_m2m_objects_with_model, get_session_var, import_class, StrJSONEncoder, max_size_help, num2col, put_session_message, put_session_var, reverse_coordinates, update_data, OwnPerms, SheetItem ) from ishtar_common.data_importer import ( Importer, ImportFormater, IntegerFormater, FloatFormater, UnicodeFormater, DateFormater, TypeFormater, YearFormater, StrToBoolean, FileFormater, InseeFormater, ImporterError, UpperCaseFormater, LowerCaseFormater, ) from ishtar_common.utils import task from ishtar_common.ignf_utils import IGNF logger = logging.getLogger(__name__) def get_model_fields(model): """ Return a dict of fields from model """ fields = {} if not model: return fields for field in model._meta.get_fields(): fields[field.name] = field return fields class ImportModelManager(models.Manager): def get_by_natural_key(self, klass): return self.get(klass=klass) class ImporterModel(models.Model): name = models.CharField(_("Name"), max_length=200) klass = models.CharField(_("Class name"), max_length=200, unique=True) objects = ImportModelManager() class Meta: verbose_name = _("Data model") verbose_name_plural = _("Data models") ordering = ("name",) ADMIN_SECTION = _("General settings") def __str__(self): return self.name def natural_key(self): return (self.klass,) IMPORT_TYPES = ( ("tab", _("Table")), ("gis", _("GIS")), ) IMPORT_TYPES_DICT = dict(IMPORT_TYPES) class ImporterType(models.Model): """ Description of a table to be mapped with ishtar database """ name = models.CharField(_("Name"), max_length=200) slug = models.SlugField(_("Slug"), unique=True, max_length=100) description = models.CharField( _("Description"), blank=True, null=True, max_length=500 ) type = models.CharField( _("Type"), max_length=3, choices=IMPORT_TYPES, default="tab" ) available = models.BooleanField(_("Available"), default=True) users = models.ManyToManyField("IshtarUser", verbose_name=_("Users"), blank=True) associated_models = models.ForeignKey( ImporterModel, verbose_name=_("Associated model"), on_delete=models.SET_NULL, related_name="importer_type_associated", blank=True, null=True, ) created_models = models.ManyToManyField( ImporterModel, verbose_name=_("Models that can accept new items"), blank=True, help_text=_("Leave blank for no restrictions"), related_name="importer_type_created", ) unicity_keys = models.CharField( _('Unicity keys (separator ";")'), blank=True, null=True, max_length=500, help_text=_("Mandatory for update importer. Set to key that identify items " "without ambiguity. Warning: __ is not supported, only use level " "1 key.") ) is_main_geometry = models.BooleanField( _("Set as main geometry"), default=True, help_text=_("Only relevant for GIS files"), ) is_import = models.BooleanField(_("Can be imported"), default=True) is_template = models.BooleanField(_("Can be exported"), default=False) default_header_len = models.PositiveSmallIntegerField(_("Default header length"), default=1) archive_required = models.BooleanField(_("Archive required"), default=False) tab_number = models.PositiveIntegerField( _("Tab number"), default=1, validators=[MinValueValidator(1)], help_text=_("When using an Excel or Calc file choose the tab number. Keep it to 1 by default.") ) layer_name = models.CharField( _("Layer name"), max_length=200, blank=True, null=True, help_text=_("For GIS file with multiple layers"), ) pre_import_message = models.TextField( _("Pre-import form message"), blank=True, default="", max_length=500 ) ignore_errors = models.TextField( verbose_name=_("Error messages to ignore"), blank=True, null=True, help_text=_("If an error is encountered with the following character strings, the error is not reported in " "the error file. Each message is separated with a line break.") ) debug = models.BooleanField(verbose_name=_("Debug"), default=False) objects = SlugModelManager() SERIALIZATION_EXCLUDE = ["users"] class Meta: verbose_name = _("Importer - Type") verbose_name_plural = _("Importer - Types") ordering = ("name",) ADMIN_SECTION = _("Imports") def natural_key(self): return (self.slug,) def __str__(self): return self.name @classmethod def is_own(cls, ishtar_user): return bool(cls.objects.filter(users__pk=ishtar_user.pk).count()) @property def type_label(self): if self.type in IMPORT_TYPES_DICT: return IMPORT_TYPES_DICT[str(self.type)] return "" @property def importer_groups_label(self) -> str: return " ; ".join([imp.group.name for imp in self.groups.all()]) def get_libreoffice_template(self): if not UnoCalc: return ROW_NUMBER = 500 uno = UnoCalc() calc = uno.create_calc() main_sheet = uno.get_or_create_sheet(calc, 0, self.name) lst_sheet = uno.get_or_create_sheet(calc, 1, str(_("List types"))) if not calc: return col_number = 1 # user number so we start with 1 lst_col_number = 0 for column in self.columns.filter(col_number__gt=0).order_by("col_number").all(): while column.col_number > col_number: col_number += 1 # header cell = main_sheet.getCellByPosition(col_number - 1, 0) cell.CharWeight = 150 cell.setString(column.label) cell = main_sheet.getCellByPosition(col_number - 1, 1) cell.CharPosture = ITALIC cell.setString(column.description) # only managing the first target... ft = None for target in column.targets.all(): ft = target.formater_type if ft: break if not ft: continue # first we only manage TypeFormater if ft.formater_type != "TypeFormater": continue if not ft.options: # bad configuration continue model = import_class(ft.options) if not model: continue lst = [] for typ in model.get_types(instances=True): lst.append(str(typ)) end_row = uno.create_list( lst_sheet, lst_col_number, 0, str(model._meta.verbose_name), lst ) uno.set_cell_validation_list( main_sheet, col_number, 2, ROW_NUMBER + 2, lst_sheet, lst_col_number, [1, end_row], ) lst_col_number += 1 tmpdir = tempfile.mkdtemp(prefix="ishtar-templates-") dest_filename = "{}{}{}.ods".format(tmpdir, os.sep, self.name) uno.save_calc(calc, dest_filename) return dest_filename def get_default_values(self) -> list: """ Get list of 2-tuple to manage default values. The first item is a tuple of mandatory fields for the values to be relevant. Empty if the default values always applies. The second item is the dict of default values. """ return [(default.mandatory_keys, default.values) for default in self.defaults.all()] def get_importer_class(self, import_instance=None): object_cls = import_class(self.associated_models.klass) pre_import_values = {} if import_instance: pre_import_values = import_instance.pre_import_values defaults = import_instance.default_values else: defaults = self.get_default_values() ignore_errors = tuple() if self.ignore_errors: ignore_errors = tuple([error.strip() for error in self.ignore_errors.split("\n")]) LINE_FORMAT = [] LINE_EXPORT_FORMAT = [] idx = 0 for column in self.columns.filter(col_number__gt=0).order_by("col_number").all(): idx += 1 while column.col_number > idx: LINE_FORMAT.append(None) LINE_EXPORT_FORMAT.append(None) idx += 1 targets = [] formater_types = [] nb = column.targets.count() if not nb: LINE_FORMAT.append(None) if column.export_field_name: LINE_EXPORT_FORMAT.append( ImportFormater(column.export_field_name, label=column.label) ) continue force_news = [] concat_str = [] concat = [] for target in column.targets.order_by("pk").all(): ft = target.formater_type.get_formater_type( target, import_instance=import_instance ) if not ft: continue formater_types.append(ft) targets.append(target.target) concat_str.append(target.concat_str) force_news.append(target.force_new) concat.append(target.concat) formater_kwargs = {} if column.regexp_pre_filter: formater_kwargs["regexp"] = re.compile(column.regexp_pre_filter.regexp) if column.value_format: formater_kwargs["value_format"] = column.value_format.format_string formater_kwargs["concat"] = concat formater_kwargs["concat_str"] = concat_str formater_kwargs["duplicate_fields"] = [ (field.field_name, field.force_new, field.concat, field.concat_str) for field in column.duplicate_fields.all() ] formater_kwargs["label"] = column.label formater_kwargs["required"] = column.required formater_kwargs["force_new"] = force_news formater_kwargs["comment"] = column.description if column.export_field_name: formater_kwargs["export_field_name"] = [column.export_field_name] formater = ImportFormater(targets, formater_types, **formater_kwargs) LINE_FORMAT.append(formater) LINE_EXPORT_FORMAT.append(formater) UNICITY_KEYS = [] if self.unicity_keys: UNICITY_KEYS = [un.strip() for un in self.unicity_keys.split(";")] MODEL_CREATION_LIMIT = [] for modls in self.created_models.all(): MODEL_CREATION_LIMIT.append(import_class(modls.klass)) args = { "OBJECT_CLS": object_cls, "DESC": self.description, "DEFAULTS": defaults, "PRE_IMPORT_VALUES": pre_import_values, "IGNORE_ERRORS": ignore_errors, "LINE_FORMAT": LINE_FORMAT, "UNICITY_KEYS": UNICITY_KEYS, "LINE_EXPORT_FORMAT": LINE_EXPORT_FORMAT, "MODEL_CREATION_LIMIT": MODEL_CREATION_LIMIT, "TYPE": self.type, "MAIN_GEO": self.is_main_geometry, "DEBUG": self.debug } name = str( "".join( x for x in slugify(self.name).replace("-", " ").title() if not x.isspace() ) ) newclass = type(name, (Importer,), args) return newclass def get_columns(self, importer_class=None): """ :param importer_class: importer class - if not provided get from self :return: (columns: list, columns_names: list) - column attributes, column labels """ if not importer_class: importer_class = self.get_importer_class() cols, col_names = [], [] for formater in importer_class.LINE_EXPORT_FORMAT: if not formater: cols.append("") col_names.append("") continue cols.append(formater.export_field_name) col_names.append(formater.label) return cols, col_names def save(self, *args, **kwargs): if not self.slug: self.slug = create_slug(ImporterType, self.name) return super(ImporterType, self).save(*args, **kwargs) class ImporterGroup(models.Model): name = models.CharField(_("Name"), max_length=200) slug = models.SlugField(_("Slug"), unique=True, max_length=100) description = models.TextField( _("Description"), blank=True, default="" ) available = models.BooleanField(_("Available"), default=True) users = models.ManyToManyField("IshtarUser", verbose_name=_("Users"), blank=True) class Meta: verbose_name = _("Importer - Group") verbose_name_plural = _("Importer - Groups") ordering = ("name",) ADMIN_SECTION = _("Imports") objects = SlugModelManager() def natural_key(self): return self.slug, def __str__(self): return self.name @classmethod def is_own(cls, ishtar_user): return bool(cls.objects.filter(users__pk=ishtar_user.pk).count()) @property def importer_types_label(self) -> str: return " ; ".join([imp.importer_type.name for imp in self.importer_types.all()]) class ImporterGroupImporterManager(Manager): def get_by_natural_key(self, group, importer): return self.get(group__slug=group, importer_type__slug=importer) class ImporterGroupImporter(models.Model): group = models.ForeignKey(ImporterGroup, on_delete=models.CASCADE, related_name="importer_types") importer_type = models.ForeignKey(ImporterType, on_delete=models.CASCADE, related_name="groups") order = models.PositiveIntegerField(_("Order"), default=10, validators=[MinValueValidator(1)]) objects = ImporterGroupImporterManager() class Meta: ordering = ("group", "order") unique_together = ("group", "order") verbose_name = _("Importer - Group <-> Importer") def natural_key(self): return self.group.slug, self.importer_type.slug def get_associated_model(parent_model, keys): model = None if isinstance(parent_model, str): OBJECT_CLS = import_class(parent_model) else: OBJECT_CLS = parent_model fields = get_model_fields(OBJECT_CLS) for idx, item in enumerate(keys): if item in ("-", ""): model = None elif not idx: if item not in fields: raise ImporterError( str( _( "Importer configuration error: " '"{}" is not available for "{}".' " Check your default and column " "configuration" ) ).format(item, OBJECT_CLS.__name__) ) field = fields[item] if hasattr(field, "remote_field") and hasattr(field.remote_field, "model"): model = field.remote_field.model if type(field) == ModelBase: model = field else: if not model: raise ImporterError( str( _( "Importer configuration error: " '"{}" is not available for "{}".' " Check your default and column " "configuration" ) ).format("__".join(keys[1:]), OBJECT_CLS.__name__) ) return get_associated_model(model, keys[1:]) return model class ImporterDefaultManager(models.Manager): def get_by_natural_key(self, importer_type, target): return self.get(importer_type__slug=importer_type, target=target) class ImporterDefault(models.Model): """ Targets of default values in an import """ importer_type = models.ForeignKey( ImporterType, related_name="defaults", on_delete=models.CASCADE ) target = models.CharField( _("Target"), max_length=500, help_text=_('The target of the default values. Can be set to empty with "-". ' 'Use the "__" notation to pass between models.') ) required_fields = models.CharField( _("Required fields"), max_length=500, blank=True, default="", help_text=_( "Theses defaults values only apply if the designated fields are not empty. " 'Use the "__" notation to pass between models.' "Leave empty to always apply." ) ) class Meta: verbose_name = _("Importer - Default") verbose_name_plural = _("Importer - Defaults") unique_together = ("importer_type", "target") ADMIN_SECTION = _("Imports") objects = ImporterDefaultManager() def __str__(self): return "{} - {}".format(self.importer_type, self.target) def natural_key(self): return self.importer_type.slug, self.target @property def keys(self): return tuple(t for t in self.target.split("__") if t not in ("-", "")) @property def associated_model(self): if not self.keys: return import_class(self.importer_type.associated_models.klass) return get_associated_model( self.importer_type.associated_models.klass, self.keys ) @property def mandatory_keys(self): return tuple(k for k in self.required_fields.split("__") if k) @property def values(self): values = {} for default_value in self.default_values.all(): targets = list(self.keys) target = default_value.target if target != "-": targets += target.split("__") value = default_value.value # explicit key or id is not set (try to get the value) if targets[-1] not in ("srid", "txt_idx", "slug", "id", "pk"): value = default_value.get_value() dct = generate_dict_from_list(targets, value) values = update_data(values, dct) return values class ImporterDefaultValuesManager(models.Manager): def get_by_natural_key(self, def_target_type, def_target, target): return self.get( default_target__importer_type__slug=def_target_type, default_target__target=def_target, target=target, ) class ImporterDefaultValues(models.Model): """ Default values in an import """ default_target = models.ForeignKey( ImporterDefault, related_name="default_values", on_delete=models.CASCADE ) target = models.CharField("Target", max_length=500) value = models.CharField("Value", max_length=500) objects = ImporterDefaultValuesManager() class Meta: verbose_name = _("Importer - Default value") verbose_name_plural = _("Importer - Default values") unique_together = ("default_target", "target") ADMIN_SECTION = _("Imports") def natural_key(self): return ( self.default_target.importer_type.slug, self.default_target.target, self.target, ) def __str__(self): return "{} - {}".format(self.default_target, self.target, self.value) def get_value(self): parent_model = self.default_target.associated_model target = self.target.strip() return self._get_value(parent_model, target) def _get_value(self, parent_model, target): if not parent_model: return self.value fields = get_model_fields(parent_model) if "__" in target: targets = target.split("__") if targets[0] not in fields: return new_target = "__".join(targets[1:]) field = fields[targets[0]] if not hasattr(field, "remote_field") or not hasattr( field.remote_field, "model" ): return self.value new_parent_model = field.remote_field.model return self._get_value(new_parent_model, new_target) if hasattr(parent_model, target) and hasattr(getattr(parent_model, target), "importer_trigger"): return self.value elif target not in fields: return field = fields[target] if target in ("srid", "txt_idx", "slug"): try: return parent_model.objects.get(**{target: self.value}) except (ValueError, parent_model.DoesNotExist): pass if not hasattr(field, "remote_field") or not hasattr( field.remote_field, "model" ): return self.value model = field.remote_field.model # if value is an id try: return model.objects.get(pk=int(self.value)) except (ValueError, model.DoesNotExist): pass # try with txt_idx try: return model.objects.get(txt_idx=self.value) except (ValueError, model.DoesNotExist): pass return "" class ImporterColumnManager(models.Manager): def get_by_natural_key(self, importer_type, col_number): return self.get(importer_type__slug=importer_type, col_number=col_number) class ImporterColumn(models.Model): """ Import file column description """ label = models.CharField(_("Label"), blank=True, null=True, max_length=200) importer_type = models.ForeignKey( ImporterType, related_name="columns", on_delete=models.CASCADE ) col_number = models.SmallIntegerField( _("Column number"), default=1, help_text=_("Column number in the table. Put 0 or negative number for pre-importer field.") ) description = models.TextField(_("Description"), blank=True, null=True) regexp_pre_filter = models.ForeignKey( "Regexp", blank=True, null=True, on_delete=models.SET_NULL, related_name="columns", ) value_format = models.ForeignKey( "ValueFormater", blank=True, null=True, on_delete=models.SET_NULL, related_name="columns", ) required = models.BooleanField(_("Required"), default=False) export_field_name = models.CharField( _("Export field name"), blank=True, null=True, max_length=200, help_text=_( "Fill this field if the field name is ambiguous for " "export. For instance: concatenated fields." ), ) objects = ImporterColumnManager() class Meta: verbose_name = _("Importer - Column") verbose_name_plural = _("Importer - Columns") ordering = ("importer_type", "col_number") unique_together = ("importer_type", "col_number") ADMIN_SECTION = _("Imports") def __str__(self): return "{} - {}".format(self.importer_type, self.col_number) @property def col_string(self): return num2col(self.col_number) def natural_key(self): return self.importer_type.slug, self.col_number def targets_lbl(self): return ", ".join([target.target for target in self.targets.all()]) def duplicate_fields_lbl(self): return ", ".join([dp.field_name or "" for dp in self.duplicate_fields.all()]) def formater_type_lbl(self): return ", ".join([str(target.formater_type) for target in self.targets.all()]) class ImporterDuplicateFieldManager(models.Manager): def get_by_natural_key(self, importer_type, col_number, field_name): return self.get( column__importer_type__slug=importer_type, column__col_number=col_number, field_name=field_name, ) class ImporterDuplicateField(models.Model): """ Direct copy of result in other fields """ column = models.ForeignKey( ImporterColumn, related_name="duplicate_fields", on_delete=models.CASCADE ) field_name = models.CharField( _("Field name"), blank=True, null=True, max_length=200 ) force_new = models.BooleanField(_("Force creation of new items"), default=False) concat = models.BooleanField(_("Concatenate with existing"), default=False) concat_str = models.CharField( _("Concatenate character"), max_length=5, blank=True, null=True ) objects = ImporterDuplicateFieldManager() class Meta: verbose_name = _("Importer - Duplicate field") verbose_name_plural = _("Importer - Duplicate fields") ordering = ("column", "field_name") unique_together = ("column", "field_name") ADMIN_SECTION = _("Imports") def natural_key(self): return self.column.importer_type.slug, self.column.col_number, self.field_name class NamedManager(models.Manager): def get_by_natural_key(self, name): return self.get(name=name) class Regexp(models.Model): name = models.CharField(_("Name"), max_length=100, unique=True) description = models.TextField(_("Description"), blank=True, null=True) regexp = models.CharField(_("Regular expression"), max_length=500) objects = NamedManager() class Meta: verbose_name = _("Importer - Regular expression") verbose_name_plural = _("Importer - Regular expressions") ADMIN_SECTION = _("Imports") def __str__(self): return self.name def natural_key(self): return (self.name,) class ValueFormater(models.Model): name = models.CharField(_("Name"), max_length=100, unique=True) slug = models.SlugField(_("Slug"), unique=True, max_length=100) description = models.TextField(_("Description"), blank=True, null=True) format_string = models.CharField( _("Format string"), max_length=100, help_text=_( "A string used to format a value using the Python " '"format()" method. The site https://pyformat.info/ ' 'provide good examples of usage. Only one "{}" entry ' 'is managed. Instead you can use "{item}". The input is ' 'assumed to be a string.' ), ) objects = SlugModelManager() class Meta: verbose_name = _("Importer - Value format") verbose_name_plural = _("Importer - Value formats") ADMIN_SECTION = _("Imports") def __str__(self): return self.name def clean(self): try: self.format_string.format("sample value") except ValueError: raise ValidationError( { "format_string": _( "The string provided generate an error. " "Fix it." ) } ) def natural_key(self): return (self.slug,) class ImportTargetManager(models.Manager): def get_by_natural_key(self, importer_type, col_number, target): return self.get( column__importer_type__slug=importer_type, column__col_number=col_number, target=target, ) class ImportTarget(models.Model): """ Ishtar database target for a column """ column = models.ForeignKey( ImporterColumn, related_name="targets", on_delete=models.CASCADE ) target = models.CharField("Target", max_length=500) formater_type = models.ForeignKey( "FormaterType", related_name="targets", on_delete=models.CASCADE ) force_new = models.BooleanField(_("Force creation of new items"), default=False) concat = models.BooleanField(_("Concatenate with existing"), default=False) concat_str = models.CharField( _("Concatenate character"), max_length=5, blank=True, null=True ) comment = models.TextField(_("Comment"), blank=True, null=True) objects = ImportTargetManager() class Meta: verbose_name = _("Importer - Target") verbose_name_plural = _("Importer - Targets") unique_together = ("column", "target") ADMIN_SECTION = _("Imports") def __str__(self): return self.target[:50] if self.target else self.comment @cached_property def verbose_name(self): if not self.column.description: return self.target[:50] desc = self.column.description desc = desc[0].lower() + desc[1:] return "{} - {}".format(self.target[:50], desc) def natural_key(self): return self.column.importer_type.slug, self.column.col_number, self.target @property def associated_model(self): if self.target.startswith("data__"): return try: return get_associated_model( self.column.importer_type.associated_models.klass, self.target.split("__"), ) except KeyError: return def get_choices(self): if ( self.formater_type.formater_type == "UnknowType" and self.column.importer_type.slug ): cls = self.column.importer_type.get_importer_class() formt = cls().line_format[self.column.col_number - 1] if hasattr(formt.formater, "choices"): return [("", "--" * 8)] + list(formt.formater.choices) return [("", "--" * 8)] if self.formater_type.formater_type == "StrToBoolean": return [("", "--" * 8), ("True", _("True")), ("False", _("False"))] if not self.associated_model or not hasattr(self.associated_model, "get_types"): return [] return self.associated_model.get_types() class TargetKeyGroup(models.Model): """ Group of target keys for imports. """ name = models.TextField(_("Name"), unique=True) all_user_can_use = models.BooleanField(_("All users can use it"), default=False) all_user_can_modify = models.BooleanField( _("All users can modify it"), default=False ) available = models.BooleanField(_("Available"), default=True) class Meta: verbose_name = _("Importer - Target key group") verbose_name_plural = _("Importer - Target key groups") ADMIN_SECTION = _("Imports") def __str__(self): return self.name class TargetKey(models.Model): """ User's link between import source and ishtar database. Also temporary used for GeneralType to point missing link before adding them in ItemKey table. A targetkey connection can be create to be applied to one particular import (associated_import), one particular user (associated_user), one particular group (associated_group) or to all imports (associated_import, associated_user and associated_group are empty). """ target = models.ForeignKey( ImportTarget, related_name="keys", on_delete=models.CASCADE ) key = models.TextField(_("Key")) value = models.TextField(_("Value"), blank=True, null=True) is_set = models.BooleanField(_("Is set"), default=False) associated_import = models.ForeignKey( "Import", blank=True, null=True, on_delete=models.SET_NULL ) associated_user = models.ForeignKey( "IshtarUser", blank=True, null=True, on_delete=models.SET_NULL ) associated_group = models.ForeignKey( TargetKeyGroup, blank=True, null=True, on_delete=models.SET_NULL ) class Meta: unique_together = ( "target", "key", "associated_user", "associated_import", ) verbose_name = _("Importer - Target key") verbose_name_plural = _("Importer - Targets keys") ordering = ("target", "key") ADMIN_SECTION = _("Imports") def __str__(self): return " - ".join([str(self.target), self.key[:50]]) def column_nb(self): # for the admin return self.target.column.col_number def importer_type(self): # for the admin return self.target.column.importer_type.name def format(self): if not self.is_set: return None if self.target.formater_type.formater_type == "StrToBoolean": if self.value in ("False", "0"): return False elif self.value: return True return return self.value def save(self, *args, **kwargs): obj = super(TargetKey, self).save(*args, **kwargs) if not self.value: return obj v = None associated_model = self.target.associated_model if associated_model and hasattr(self.target.associated_model, "add_key"): # pk is given try: v = self.target.associated_model.objects.get(pk=str(int(self.value))) except (ValueError, self.target.associated_model.DoesNotExist): # try with txt_idx try: v = self.target.associated_model.objects.get( txt_idx=str(self.value) ) except self.target.associated_model.DoesNotExist: pass if v: keys = {} if self.associated_group: keys["group"] = self.associated_group if self.associated_user: keys["user"] = self.associated_user else: keys["importer"] = self.associated_import v.add_key(self.key, **keys) return obj TARGET_MODELS = [ ("OrganizationType", _("Organization type")), ("ishtar_common.models.OrganizationType", _("Organization type")), ("ishtar_common.models.PersonType", _("Person type")), ("TitleType", _("Title")), ("SourceType", _("Source type")), ("ishtar_common.models.SourceType", _("Source type")), ("AuthorType", _("Author type")), ("Format", _("Format")), ("ProfileType", _("Profile type")), ("ishtar_common.models.Format", _("Format")), ("ishtar_common.models.LicenseType", _("License type")), ("ishtar_common.models.DocumentTag", _("Document tag")), ("ishtar_common.models.Language", _("Language")), ("ishtar_common.models.SupportType", _("Support type")), ("archaeological_operations.models.CulturalAttributionType", _("Cultural attribution type")), ("archaeological_operations.models.OperationType", _("Operation type")), ("archaeological_operations.models.Period", _("Period")), ("archaeological_operations.models.ReportState", _("Report state")), ("archaeological_operations.models.RemainType", _("Remain type")), ("archaeological_operations.models.RelationType", _("Operation relation type")), ("archaeological_operations.models.ActType", _("Act type")), ("archaeological_context_records.models.Unit", _("Unit")), ("archaeological_context_records.models.ActivityType", _("Activity type")), ( "archaeological_context_records.models.DocumentationType", _("Documentation type"), ), ("archaeological_context_records.models.DatingQuality", _("Dating quality")), ("archaeological_finds.models.MaterialType", _("Material")), ("archaeological_finds.models.ConservatoryState", _("Conservatory state")), ("archaeological_warehouse.models.ContainerType", _("Container type")), ("archaeological_warehouse.models.WarehouseDivision", _("Warehouse division")), ("archaeological_warehouse.models.WarehouseType", _("Warehouse type")), ("archaeological_finds.models.TreatmentType", _("Treatment type")), ( "archaeological_finds.models.TreatmentEmergencyType", _("Treatment emergency type"), ), ("archaeological_finds.models.ObjectType", _("Object type")), ("archaeological_finds.models.IntegrityType", _("Integrity type")), ("archaeological_finds.models.RemarkabilityType", _("Remarkability type")), ("archaeological_finds.models.AlterationType", _("Alteration type")), ("archaeological_finds.models.AlterationCauseType", _("Alteration cause type")), ("archaeological_finds.models.BatchType", _("Batch type")), ("archaeological_finds.models.CheckedType", _("Checked type")), ("archaeological_finds.models.MaterialTypeQualityType", _("Material type quality")), ("archaeological_finds.models.FunctionalArea", _("Functional area")), ( "archaeological_context_records.models.IdentificationType", _("Identification type"), ), ( "archaeological_context_records.models.RelationType", _("Context record relation type"), ), ("SpatialReferenceSystem", _("Spatial reference system")), ("SupportType", _("Support type")), ("TitleType", _("Title type")), ] TARGET_MODELS_KEYS = [tm[0] for tm in TARGET_MODELS] IMPORTER_TYPES = ( ("IntegerFormater", _("Integer")), ("FloatFormater", _("Float")), ("UnicodeFormater", _("String")), ("DateFormater", _("Date")), ("TypeFormater", _("Type")), ("YearFormater", _("Year")), ("InseeFormater", _("INSEE code")), ("UpperFormater", _("Upper case")), ("LowerFormater", _("Lower case")), ("StrToBoolean", _("String to boolean")), ("FileFormater", pgettext_lazy("filesystem", "File")), ("UnknowType", _("Unknow type")), ) IMPORTER_TYPES_DCT = { "IntegerFormater": IntegerFormater, "FloatFormater": FloatFormater, "UnicodeFormater": UnicodeFormater, "DateFormater": DateFormater, "TypeFormater": TypeFormater, "YearFormater": YearFormater, "StrToBoolean": StrToBoolean, "FileFormater": FileFormater, "InseeFormater": InseeFormater, "UpperFormater": UpperCaseFormater, "LowerFormater": LowerCaseFormater, "UnknowType": None, } DATE_FORMATS = ( ("%Y", _('4 digit year. e.g.: "2015"')), ("%Y/%m/%d", _('4 digit year/month/day. e.g.: "2015/02/04"')), ("%d/%m/%Y", _('Day/month/4 digit year. e.g.: "04/02/2015"')), ) IMPORTER_TYPES_CHOICES = {"TypeFormater": TARGET_MODELS, "DateFormater": DATE_FORMATS} FORMATER_WIDGETS_DCT = { "IntegerFormater": ("IntegerField", None), "FloatFormater": ("FloatField", None), "UnicodeFormater": ("CharField", None), "DateFormater": ("DateField", None), "TypeFormater": ("ChoiceField", None), "YearFormater": ("IntegerField", None), "StrToBoolean": ("BooleanField", None), "FileFormater": ("FileField", None), "InseeFormater": ("IntegerField", None), "UpperFormater": ("CharField", None), "LowerFormater": ("CharField", None), "UnknowType": ("CharField", None), } class FormaterTypeManager(models.Manager): def get_by_natural_key(self, formater_type, options, many_split): return self.get( formater_type=formater_type, options=options, many_split=many_split ) class FormaterType(models.Model): formater_type = models.CharField( _("Formater type"), max_length=20, choices=IMPORTER_TYPES ) options = models.CharField(_("Options"), max_length=500, blank=True, null=True) many_split = models.CharField( _("Split character(s)"), max_length=10, blank=True, null=True ) objects = FormaterTypeManager() class Meta: verbose_name = _("Importer - Formater type") verbose_name_plural = _("Importer - Formater types") unique_together = ("formater_type", "options", "many_split") ordering = ("formater_type", "options") ADMIN_SECTION = _("Imports") def natural_key(self): return self.formater_type, self.options, self.many_split @property def associated_model(self): if hasattr(self, "__associated_model"): return self.__associated_model if self.formater_type != "TypeFormater" or not self.options: self.__associated_model = None return self.__associated_model = None options = self.options.split(".") if len(options) == 1: app = "ishtar_common" else: app = options[0] model_name = options[-1] try: self.__associated_model = apps.get_model(app, model_name) except LookupError: pass return self.__associated_model def __str__(self): return " - ".join( [ str(dict(IMPORTER_TYPES)[self.formater_type]) if self.formater_type in IMPORTER_TYPES_DCT else "" ] + [getattr(self, k) for k in ("options", "many_split") if getattr(self, k)] ) def get_choices(self): if self.format_type in IMPORTER_TYPES_CHOICES: return IMPORTER_TYPES_CHOICES[self.format_type] def get_formater_type(self, target, import_instance=None): if self.formater_type not in IMPORTER_TYPES_DCT.keys(): return kwargs = {"db_target": target, "import_instance": import_instance} if self.many_split: kwargs["many_split"] = self.many_split if self.formater_type == "TypeFormater": if self.options in dir(): model = dir()[self.options] else: try: model = import_class(self.options) except (AttributeError, SuspiciousOperation): logger.warning( "**WARN FormaterType.get_formater_type**: {} " "is not in valid.".format(self.options) ) return # must be explicit if non general type if self.options not in TARGET_MODELS_KEYS and \ not [True for m in model.__mro__ if m.__name__ == "GeneralType"]: logger.warning( "**WARN FormaterType.get_formater_type**: {} " "is not in valid.".format(self.options) ) return return TypeFormater(model, **kwargs) elif self.formater_type == "UnicodeFormater": if self.options: try: return UnicodeFormater(int(self.options.strip()), **kwargs) except ValueError: pass return UnicodeFormater(**kwargs) elif self.formater_type == "DateFormater": date_formats = self.options if self.many_split: date_formats = self.options.split(kwargs.pop("many_split")) return DateFormater(date_formats, **kwargs) elif self.formater_type == "StrToBoolean": return StrToBoolean(**kwargs) elif self.formater_type == "UnknowType": return else: return IMPORTER_TYPES_DCT[self.formater_type](**kwargs) IMPORT_STATE = ( ("C", _("Created")), ("AP", _("Analyse in progress")), ("A", _("Analysed")), ("HQ", _("Check modified in queue")), ("IQ", _("Import in queue")), ("HP", _("Check modified in progress")), ("IP", _("Import in progress")), ("PP", _("Post-processing in progress")), ("PI", _("Partially imported")), ("FE", _("Finished with errors")), ("F", _("Finished")), ("AC", _("Archived")), ) IMPORT_STATE_DCT = dict(IMPORT_STATE) IMPORT_GROUP_STATE = ( ("C", _("Created")), ("AP", _("Analyse in progress")), ("A", _("Analysed")), ("IQ", _("Import in queue")), ("IP", _("Import in progress")), ("PP", _("Post-processing in progress")), ("FE", _("Finished with errors")), ("F", _("Finished")), ("AC", _("Archived")), ) IMPORT_GROUP_STATE_DCT = dict(IMPORT_STATE) ENCODINGS = [ (settings.ENCODING, settings.ENCODING), (settings.ALT_ENCODING, settings.ALT_ENCODING), ("utf-8", "utf-8"), ] CSV_SEPS = ((",", ","), (";", ";"), ("|", "|")) @task() def delayed_import(import_pk): try: imp = Import.objects.get(pk=import_pk) except Import.DoesNotExist: return imp.importation() @task() def delayed_import_group(import_pk): try: imp = ImportGroup.objects.get(pk=import_pk) except ImportGroup.DoesNotExist: return imp.importation() @task() def delayed_check(import_pk): try: imp = Import.objects.get(pk=import_pk) except Import.DoesNotExist: return imp.check_modified() def convert_geom(feature, srid): if not feature: return feature srid = int(srid) geo_type = feature["type"] if geo_type in ("LineString", "Polygon"): feature["type"] = "Multi" + geo_type feature["coordinates"] = [feature["coordinates"]] feature = GEOSGeometry(json.dumps(feature)) has_z = feature.hasz feature = f"SRID={srid};{feature.wkt}" IshtarSiteProfile = apps.get_model("ishtar_common", "IshtarSiteProfile") profile = IshtarSiteProfile.get_current_profile() srs = 4326 if not has_z else 4979 if profile.srs and profile.srs.srid: srs = profile.srs.srid if srs != srid: # Coordinates are reversed - should be fixed on Django 3.2 feature = reverse_coordinates( GEOSGeometry(feature).transform(srs, clone=True).ewkt) return feature IMPORT_GEOMETRY = { "Point": "point_2d", "3D Point": "point_3d", "MultiPoint": "multi_points", "LineString": "multi_line", "MultiLineString": "multi_line", "Polygon": "multi_polygon", "MultiPolygon": "multi_polygon", } class BaseImport(models.Model, OwnPerms, SheetItem): user = models.ForeignKey( "IshtarUser", blank=True, null=True, on_delete=models.SET_NULL ) name = models.CharField(_("Name"), max_length=500, null=True) imported_file = models.FileField( _("Imported file"), upload_to="upload/imports/%Y/%m/", max_length=220, help_text=max_size_help, blank=True, null=True, ) imported_images = models.FileField( _("Associated documents (zip file)"), upload_to="upload/imports/%Y/%m/", blank=True, null=True, max_length=220, help_text=max_size_help, ) archive_file = models.FileField( _("Archive file"), upload_to="upload/imports/%Y/%m/", blank=True, null=True, max_length=255, help_text=max_size_help, ) encoding = models.CharField( _("Encoding"), choices=ENCODINGS, default="utf-8", max_length=15, help_text=_("Only required for CSV file"), ) csv_sep = models.CharField( _("CSV separator"), choices=CSV_SEPS, default=",", max_length=1, help_text=_( "Separator for CSV file. Standard is comma but Microsoft " "Excel do not follow this standard and use semi-colon." ), ) skip_lines = models.IntegerField( _("Skip lines"), default=1, help_text=_("Number of header lines in your file (can be 0 and should be 0 for geopackage or Shapefile)."), ) imported_media_link = models.URLField( _("Associated media (web link to a zip file or a path)"), blank=True, null=True ) creation_date = models.DateTimeField( _("Creation date"), auto_now_add=True, blank=True, null=True ) end_date = models.DateTimeField( _("End date"), auto_now_add=True, blank=True, null=True, editable=False ) state = None NO_ODTPDF_EXPORT = True class Meta: abstract = True @classmethod def get_permissions_for_actions(cls, user, session): if not hasattr(user, "ishtaruser") or not user.ishtaruser: return False, False, False, False can_edit_all, can_delete_all, can_edit_own, can_delete_own = False, False, False, False if user.is_superuser: can_edit_all = True can_delete_all = True if user.ishtaruser.has_right("change_import", session=session): can_edit_all = True elif user.ishtaruser.has_right("change_own_import", session=session): can_edit_own = True if user.ishtaruser.has_right("delete_import", session=session): can_delete_all = True elif user.ishtaruser.has_right("delete_own_import", session=session): can_delete_own = True return can_edit_all, can_delete_all, can_edit_own, can_delete_own @classmethod def query_can_access(cls, user, perm="view_import"): """ Filter the query to check access permissions :param user: User instance :return: import query """ q = cls.objects if not isinstance(perm, (list, tuple)): perm = [perm] if user.is_superuser or (hasattr(user, "ishtaruser") and user.ishtaruser and any(user.ishtaruser.has_right(p) for p in perm)): return q q = q.filter(Q(importer_type__users__pk=user.ishtaruser.pk)) return q @classmethod def get_query_owns(cls, ishtaruser): return cls._construct_query_own( "", [ {"importer_type__users__pk": ishtaruser.pk}, ], ) @property def group_prefix(self): return "" @property def has_pre_import_form(self) -> bool: raise NotImplemented() @property def pre_import_form_is_valid(self) -> bool: raise NotImplemented() def _archive(self): raise NotImplemented() def _unarchive(self): raise NotImplemented() def archive(self): self.state = "AC" self.end_date = datetime.datetime.now() self._archive() def unarchive(self, state): if not self._unarchive(): self.state = state self.save() # only save if no save previously def get_imported_images(self): return self.imported_images def delayed_importation(self, request, session_key): if not settings.USE_BACKGROUND_TASK: return self.importation(request=request, session_key=session_key) put_session_message( session_key, str(_("Import {} added to the queue")).format(self.name), "info", ) self.state = "IQ" self.end_date = datetime.datetime.now() self.save() if self.SLUG == "import": return delayed_import.delay(self.pk) else: return delayed_import_group.delay(self.pk) def importation( self, session_key=None, line_to_process=None, simulate=False, return_importer_and_data=False, request=None, ): raise NotImplemented() def save(self, *args, **kwargs): super().save(*args, **kwargs) if ( self.state == "AC" and not getattr(self, "_archive_pending", False) and not self.archive_file and not getattr(self, "group", False) ): self._archive() class ImportGroup(BaseImport): importer_type = models.ForeignKey(ImporterGroup, on_delete=models.CASCADE, verbose_name=_("Importer group type")) current_import = models.PositiveIntegerField(_("Current import"), blank=True, null=True) state = models.CharField( _("State"), max_length=2, choices=IMPORT_GROUP_STATE, default="C" ) class Meta: verbose_name = _("Import - Group") verbose_name_plural = _("Import - Groups") permissions = ( ("view_own_importgroup", "Can view own Import Group"), ("add_own_importgroup", "Can add own Import Group"), ("change_own_importgroup", "Can change own Import Group"), ("delete_own_importgroup", "Can delete own Import Group"), ) ADMIN_SECTION = _("Imports") SLUG = "importgroup" SHOW_URL = "show-importgroup" def __str__(self): return f"{self.name} ({self.importer_type.name})" @property def group_prefix(self): return "group-" @property def has_pre_import_form(self) -> bool: return False @property def pre_import_items(self) -> list: """ Get list of tuple : (label, value) of pre-import values for this group import. Cached in memory. :return: pre import as a list of 2-tuple """ if hasattr(self, "_pre_import_items"): return self._pre_import_items values = [] for imprt in self.imports.all(): values += imprt.pre_import_items self._pre_import_items = values return values @property def has_error(self) -> bool: return any(1 for imprt in self.imports.all() if imprt.has_error) @property def pre_import_form_is_valid(self) -> bool: return not any(-1 for imp in self.imports.all() if not imp.pre_import_form_is_valid) @property def import_id(self): return f"group-{self.id}" def import_list(self): """ Sorted import list by order in the importer group """ import_list = [] for imp in self.imports.all(): igi = ImporterGroupImporter.objects.filter( group=self.importer_type, importer_type=imp.importer_type ) if not igi.count(): # inconsistent data continue import_list.append((igi.all()[0].order, imp)) return [imp for order, imp in sorted(import_list, key=lambda i: i[0])] @property def status(self): if self.state not in IMPORT_GROUP_STATE_DCT: return "" return IMPORT_GROUP_STATE_DCT[str(self.state)] def get_actions(self, can_edit=False, can_delete=False): """ Get available action relevant with the current status """ actions = [] if not can_edit and not can_delete: return actions if can_edit and self.state == "C": actions.append(("A", _("Analyse"))) if can_edit and self.state == "A": actions.append(("A", _("Re-analyse"))) if not any(-1 for imp in self.import_list() if not imp.pre_import_form_is_valid): actions.append(("I", _("Launch import"))) if can_edit and self.state in ("F", "FE"): actions.append(("A", _("Re-analyse"))) actions.append(("I", _("Re-import"))) actions.append(("AC", _("Archive"))) if can_edit and self.state == "AC": state = "FE" if any(1 for imp in self.imports.all() if imp.error_file) else "F" actions.append((state, _("Unarchive"))) if can_delete: actions.append(("D", _("Delete"))) return actions def initialize(self, user=None, session_key=None): self.state = "AP" self.end_date = datetime.datetime.now() self.save() for imp in self.imports.all(): imp.initialize(user, session_key) self.state = "A" self.end_date = datetime.datetime.now() self.save() def importation( self, session_key=None, line_to_process=None, simulate=False, return_importer_and_data=False, request=None, ): q = self.imports if not q.count(): return self.state = "IP" self.end_date = datetime.datetime.now() self.save() first = self.import_list()[0] first.importation( session_key=session_key, line_to_process=line_to_process, simulate=simulate, return_importer_and_data=return_importer_and_data, request=request ) # from the first import if all is good, cascade import has_error = any(i.error_file.name for i in q.all() if i.error_file.name) if has_error: self.state = "FE" else: self.state = "F" self.end_date = datetime.datetime.now() self.save() def _unarchive(self): if not self.archive_file: return today = datetime.date.today() sub_imports_ids = [(Import, imp.pk) for imp in self.import_list()] with (tempfile.TemporaryDirectory("-ishtar") as tmp_dir_name): # extract the current archive current_zip = zipfile.ZipFile(self.archive_file.path, "r") name_list = current_zip.namelist() if "content.json" not in name_list: return for name in name_list: current_zip.extract(name, tmp_dir_name) current_zip.close() content_name = os.path.join(tmp_dir_name, "content.json") try: with open(content_name, "r") as content: files = json.loads(content.read()) except (IOError, json.JSONDecodeError): return for attr in files: filename = files[attr] full_filename = os.path.join(tmp_dir_name, filename) current_imports_ids = [(ImportGroup, self.pk)] # identify imports to attach the file if attr == "imported_file": current_imports_ids += sub_imports_ids # imported_file is attached to each imports elif attr.startswith("sub-"): # identify the sub import to attach the file # archive filename is "sub-{subimport_order}-{attribute}" split_attr = attr.split("-") if len(split_attr) < 3: continue try: idx = int(split_attr[1]) except (ValueError, IndexError): continue if idx > (len(sub_imports_ids) - 1): continue current_imports_ids = [sub_imports_ids[idx]] attr = "-".join(split_attr[2:]) for idx, current_import_id in enumerate(current_imports_ids): model, current_import_id = current_import_id current_import = model.objects.get(pk=current_import_id) path = Path(full_filename) c_filename = path.name if idx: c_filename = f"{idx:02d}-" + c_filename path = Path(full_filename) """ # TODO: should be enough... # but need to be set explicitly, why? with path.open(mode="rb") as raw_file: getattr(current_import, attr).save(c_filename, File(raw_file)) """ w_filename = os.path.join( settings.MEDIA_ROOT, "upload/imports/{}/{:02d}/{}".format( today.year, today.month, c_filename ) ) with path.open(mode="rb") as raw_file: with open(w_filename, "wb") as w_file: w_file.write(raw_file.read()) with open(w_filename, mode="rb") as raw_file: f = File(raw_file, name=c_filename) f.path = w_filename setattr(current_import, attr, f) current_import.save() getattr(current_import, attr).name = w_filename[len(settings.MEDIA_ROOT):] current_import.save() os.remove(self.archive_file.path) self.refresh_from_db() setattr(self, "archive_file", None) self.state = "FE" if self.has_error else "F" self.save() for model, sub_import_id in sub_imports_ids: sub_import = model.objects.get(pk=sub_import_id) sub_import.state = "FE" if sub_import.has_error else "F" sub_import.save() return True def _archive(self): file_attr = ["imported_file"] sub_import_file_attr = ["error_file", "result_file", "match_file"] files = [ (k, getattr(self, k).path, getattr(self, k).name.split(os.sep)[-1]) for k in file_attr if getattr(self, k) ] import_list = self.import_list() for idx, sub_import in enumerate(import_list): files += [ (f"sub-{idx}-{k}", getattr(sub_import, k).path, getattr(sub_import, k).name.split(os.sep)[-1]) for k in sub_import_file_attr if getattr(sub_import, k) ] with tempfile.TemporaryDirectory("-ishtar") as tmpdir: base_name = "{}.zip".format(slugify(self.name)) archive_name = os.path.join(tmpdir, base_name) with zipfile.ZipFile(archive_name, "w") as current_zip: zip_content = {} for k, path, name in files: try: current_zip.write(path, arcname=name) zip_content[k] = name except OSError: pass content_name = os.path.join(tmpdir, "content.json") with open(content_name, "w") as content: content.write(json.dumps(zip_content)) current_zip.write(content_name, arcname="content.json") with open( archive_name, "rb", ) as raw_file: self.archive_file.save(base_name, File(raw_file)) IshtarSiteProfile = apps.get_model("ishtar_common", "IshtarSiteProfile") profile = IshtarSiteProfile.get_current_profile() if profile.delete_image_zip_on_archive: sub_import_file_attr.append("imported_images") for attr in file_attr: file_field = getattr(self, attr) if file_field: try: os.remove(file_field.path) except FileNotFoundError: pass setattr(self, attr, None) sub_import_file_attr.append("imported_file") sub_import_file_attr.append("imported_values") if profile.delete_image_zip_on_archive: sub_import_file_attr.append("imported_images") for sub_import in import_list: for attr in sub_import_file_attr: file_field = getattr(sub_import, attr) if file_field: try: os.remove(file_field.path) except FileNotFoundError: pass setattr(sub_import, attr, None) sub_import.state = "AC" sub_import.save() self.save() self._archive_pending = False def _get_all_related(self, key): imported = [] for imp in self.imports.all(): for related, zorg in get_all_related_m2m_objects_with_model(imp, related_name=key): accessor = related.get_accessor_name() imported += [(accessor, obj) for obj in getattr(imp, accessor).all()] return sorted(imported, key=lambda i: i[1].__class__.__name__) def get_all_imported(self): return self._get_all_related("imported_") def get_all_updated(self): return self._get_all_related("import_updated_") def get_imported_values(self): return self.imported_file def save(self, *args, **kwargs): add = self._state.adding super().save(*args, **kwargs) if not add: return name = f"{self.name} ({self.importer_type.name})" imports = [] imported_file, imported_images = None, None if self.imported_file: imported_file = ContentFile(self.imported_file.read()) imported_file.name = self.imported_file.name.split(os.sep)[-1] if self.imported_images: imported_images = ContentFile(self.imported_images.read()) imported_images.name = self.imported_images.name.split(os.sep)[-1] for import_type_relation in self.importer_type.importer_types.all(): importer_type = import_type_relation.importer_type skip_lines = self.skip_lines if skip_lines == -1: # use default skip_lines = importer_type.default_header_len imp = Import.objects.create( name=name, importer_type=importer_type, group=self, encoding=self.encoding, csv_sep=self.csv_sep, skip_lines=skip_lines, ) imports.append(imp) modified = False # TODO: only get the relevant sheet if imported_file: imp.imported_file = imported_file modified = True if importer_type.archive_required and imported_images: imp.imported_images = imported_images modified = True if modified: imp.save() previous = None for imp in reversed(imports): if previous: imp.next_import = previous imp.save() previous = imp class Import(BaseImport): importer_type = models.ForeignKey(ImporterType, on_delete=models.CASCADE, verbose_name=_("Importer type")) # TODO - associated_group: relevant? associated_group = models.ForeignKey( TargetKeyGroup, blank=True, null=True, on_delete=models.SET_NULL, help_text=_( "If a group is selected, target key saved in this group will be used." ), ) imported_values = models.FileField( _("Imported values"), upload_to="upload/imports/%Y/%m/", max_length=220, help_text=max_size_help, blank=True, null=True, ) error_file = models.FileField( _("Error file"), upload_to="upload/imports/%Y/%m/", blank=True, null=True, max_length=255, help_text=max_size_help, ) result_file = models.FileField( _("Result file"), upload_to="upload/imports/%Y/%m/", blank=True, null=True, max_length=255, help_text=max_size_help, ) match_file = models.FileField( _("Match file"), upload_to="upload/imports/%Y/%m/", blank=True, null=True, max_length=255, help_text=max_size_help, ) state = models.CharField( _("State"), max_length=2, choices=IMPORT_STATE, default="C" ) conservative_import = models.BooleanField( _("Conservative import"), default=False, help_text=_("If set to true, do not overload existing values."), ) seconds_remaining = models.IntegerField( _("Remaining seconds"), blank=True, null=True, editable=False ) current_line = models.IntegerField(_("Current line"), blank=True, null=True) number_of_line = models.IntegerField(_("Number of line"), blank=True, null=True) imported_line_numbers = models.TextField( _("Imported line numbers"), blank=True, null=True, validators=[validate_comma_separated_integer_list], ) changed_checked = models.BooleanField(_("Changed have been checked"), default=False) changed_line_numbers = models.TextField( _("Changed line numbers"), blank=True, null=True, validators=[validate_comma_separated_integer_list], ) group = models.ForeignKey(ImportGroup, blank=True, null=True, on_delete=models.CASCADE, verbose_name=_("Group"), related_name="imports") next_import = models.ForeignKey( "Import", blank=True, null=True, on_delete=models.SET_NULL, verbose_name=_("Next import"), related_name="imports") debug = models.TextField(verbose_name=_("Debug"), blank=True, default="") class Meta: verbose_name = _("Import - Import") verbose_name_plural = _("Import - Imports") permissions = ( ("view_own_import", "Can view own Import"), ("add_own_import", "Can add own Import"), ("change_own_import", "Can change own Import"), ("delete_own_import", "Can delete own Import"), ) ADMIN_SECTION = _("Imports") SLUG = "import" SHOW_URL = "show-import" def __str__(self): return "{} | {}".format(self.name or "-", self.importer_type) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._initial_imported_file = self.imported_file.path if self.imported_file else "" def is_available(self, ishtar_user) -> bool: if ishtar_user.is_superuser or self.user == ishtar_user: return True return False @property def import_id(self): return str(self.id) def need_matching(self): return bool( TargetKey.objects.filter(associated_import=self, is_set=False).count() ) def get_imported_images(self): if not self.group: return self.imported_images if self.importer_type.archive_required: return self.group.imported_images @property def errors(self): if not self.error_file: return [] errors = [] with open(self.error_file.path, "rt") as csvfile: reader = csv.DictReader(csvfile, fieldnames=["line", "column", "error"]) for idx, row in enumerate(reader): if not idx: # pass the header continue errors.append(row) return errors @property def default_values(self) -> dict: """ Get DB default values for this importer. Cached in memory. :return: defaults values as a dict """ if hasattr(self, "_default_values"): return self._default_values self._default_values = self.importer_type.get_default_values() return self._default_values @property def has_pre_import_form(self) -> bool: """ Check if a pre-import form is available """ return bool(self.importer_type.columns.filter(col_number__lte=0).count()) @property def has_error(self) -> bool: if not self.error_file: return False if self.error_file and not self.error_lines.count(): self.parse_error_file() return bool(self.error_lines.filter(ignored=False).count()) @property def pre_import_form_is_valid(self) -> bool: for column in self.importer_type.columns.filter(col_number__lte=0, required=True): q = ImportColumnValue.objects.filter(column=column, import_item=self) if not q.count() or q.all()[0].value == "": return False return True def _get_pre_import_value(self, value, target): if value == "": return many = target.formater_type.many_split if many: if value.startswith("['") and value.endswith("']"): value = value[2:-2].split("', '") associated_model = target.formater_type.associated_model if associated_model: if many: try: value = [associated_model.objects.get(pk=int(pk)) for pk in value] except (associated_model.DoesNotExist, ValueError): return else: try: value = associated_model.objects.get(pk=value) except (associated_model.DoesNotExist, ValueError): return return value @property def pre_import_items(self) -> list: """ Get list of tuple : (label, value) of pre-import values for this import. Cached in memory. :return: pre import as a list of 2-tuple """ if hasattr(self, "_pre_import_items"): return self._pre_import_items values = [] for column in self.importer_type.columns.filter(col_number__lte=0): q = ImportColumnValue.objects.filter(column=column, import_item=self) if not q.count(): continue column_value = q.all()[0] q = column_value.column.targets.all() if not q.count(): continue target = q.all()[0] value = column_value.value value = self._get_pre_import_value(value, target) if value is None: continue values.append((column_value.column.label, value)) self._pre_import_items = values return values @property def pre_import_values(self) -> dict: """ Get DB pre import values for this importer. Cached in memory. :return: pre import as a dict """ if hasattr(self, "_pre_import_values"): return self._pre_import_values values = {} for column in self.importer_type.columns.filter(col_number__lte=0): q = ImportColumnValue.objects.filter(column=column, import_item=self) if not q.count(): continue column_value = q.all()[0] q = column_value.column.targets.all() if not q.count(): continue target = q.all()[0] value = column_value.value value = self._get_pre_import_value(value, target) if value is None: continue keys = target.target.split("__") dct = generate_dict_from_list(keys, value) values = update_data(values, dct) q = column_value.column.duplicate_fields.all() if not q.count(): continue for dup in q.all(): keys = dup.field_name.split("__") dct = generate_dict_from_list(keys, value) values = update_data(values, dct) self._pre_import_values = values return values def get_imported_values(self): if self.imported_values: return self.imported_values return self.imported_file def set_imported_values(self): if not self.imported_file: return name = self.imported_file.name.lower() ext = name.split(".")[-1] if ext not in ("ods", "xls", "xlsx", "xlsm"): return imported_file_path = os.path.abspath(self.imported_file.path) media_root = os.path.abspath(settings.MEDIA_ROOT) if not imported_file_path.startswith(media_root): return try: data = pandas.read_excel( imported_file_path, sheet_name=(self.importer_type.tab_number or 1) - 1, ) except Exception: return data = data.dropna(how="all") # drop empty rows if data.empty: return col_numbers = [c.col_number for c in self.importer_type.columns.all()] if not col_numbers: return last_column = max(col_numbers) filename = ".".join(imported_file_path.split('.')[:-1]) + f"-{random.randint(1, 10000):05d}.csv" try: data.to_csv(filename, index=False, columns=data.columns[range(last_column)], float_format=np_format_int_float) except IndexError: error_file = f"{slugify(self.name)}_errors_{datetime.datetime.now().strftime('%Y-%m-%d-%H%M')}.csv" error = str(_("Error while opening the file. Particularly check the number of columns of your file.")) csv_file = f"line,error\n-,{error}" self.error_file.save( error_file, ContentFile(csv_file.encode("utf-8")) ) return # TODO: date_format options do not do the job... Dirty hack DATE = re.compile( r"^(1[0-9]|20|[1-9])\d{2}-(0[1-9]|1[012])-(0[1-9]|[12][0-9]|3[01]) 00:00:00" ) GET_DATE = re.compile(r"(\d?\d{3})-(\d{2})-(\d{2}) 00:00:00") with tempfile.NamedTemporaryFile("w", delete=False) as tf: w = csv.writer(tf) with open(filename, "r") as f: r = csv.reader(f) for line in r: w.writerow([ "-".join(GET_DATE.match(cell).groups()) if DATE.match(cell) and GET_DATE.match(cell) else cell for cell in line ]) tf.close() shutil.move(tf.name, filename) name = filename[len(media_root):] if name.startswith(os.sep): name = name[1:] self.imported_values.name = name self._initial_imported_file = self.imported_file.path return True def get_number_of_lines(self): if self.number_of_line: return self.number_of_line imported_values = self.get_imported_values() if (not imported_values or not imported_values.path or not imported_values.path.endswith(".csv")) \ and self.importer_type.type == "gis": return self._data_table_gis(get_number_only=True) if not imported_values or not imported_values.path: return filename = imported_values.path encodings = [self.encoding] encodings += [coding for coding, c in ENCODINGS if coding != self.encoding] for encoding in encodings: try: with open(filename, "r", encoding=encoding) as f: reader = csv.reader(f, delimiter=self.csv_sep) nb = sum(1 for __ in reader) - self.skip_lines except UnicodeDecodeError: pass # try the next encoding except csv.Error: raise ImporterError(_("Error in the CSV file.")) self.number_of_line = nb self.save() return nb @property def progress_percent(self): if not self.current_line or not self.number_of_line: return 0 return int((float(self.current_line) / float(self.number_of_line)) * 100) def add_imported_line(self, idx_line): if not self.number_of_line: self.get_number_of_lines() if self.imported_line_numbers and str( idx_line ) in self.imported_line_numbers.split(","): return if self.imported_line_numbers: self.imported_line_numbers += "," else: self.imported_line_numbers = "" self.imported_line_numbers += str(idx_line) self.current_line = idx_line self.save() def add_changed_line(self, idx_line): if self.changed_line_numbers and str( idx_line ) in self.changed_line_numbers.split(","): return if self.changed_line_numbers: self.changed_line_numbers += "," else: self.changed_line_numbers = "" self.changed_line_numbers += str(idx_line) self.save() def remove_changed_line(self, idx_line): if not self.changed_line_numbers: return line_numbers = self.changed_line_numbers.split(",") if str(idx_line) not in line_numbers: return line_numbers.pop(line_numbers.index(str(idx_line))) self.changed_line_numbers = ",".join(line_numbers) self.save() def has_changes(self, idx_line): if not self.changed_checked: return True if not self.changed_line_numbers: return line_numbers = self.changed_line_numbers.split(",") return str(idx_line) in line_numbers def line_is_imported(self, idx_line): return self.imported_line_numbers and str( idx_line ) in self.imported_line_numbers.split(",") def get_actions(self, can_edit=False, can_delete=False): """ Get available action relevant with the current status """ IshtarSiteProfile = apps.get_model("ishtar_common", "IshtarSiteProfile") profile = IshtarSiteProfile.get_current_profile() actions = [] if not can_edit and not can_delete: return actions if can_edit and self.state == "C": actions.append(("A", _("Analyse"))) if can_edit and self.state in ("A", "PI"): actions.append(("A", _("Re-analyse"))) if self.pre_import_form_is_valid: actions.append(("I", _("Launch import"))) if profile.experimental_feature: if self.changed_checked: actions.append(("IS", _("Step by step import"))) actions.append(("CH", _("Re-check for changes"))) else: actions.append(("CH", _("Check for changes"))) if can_edit and self.state in ("F", "FE"): actions.append(("A", _("Re-analyse"))) actions.append(("I", _("Re-import"))) if profile.experimental_feature: if self.changed_checked: actions.append(("IS", _("Step by step re-import"))) actions.append(("CH", _("Re-check for changes"))) else: actions.append(("CH", _("Check for changes"))) actions.append(("AC", _("Archive"))) if can_edit and self.state == "AC": state = "FE" if self.error_file else "F" actions.append((state, _("Unarchive"))) if can_delete and self.state in ("C", "A"): actions.append(("ED", _("Edit"))) if can_delete: actions.append(("D", _("Delete"))) return actions @property def imported_filename(self): return self.imported_file.name.split(os.sep)[-1] @property def status(self): if self.state not in IMPORT_STATE_DCT: return "" return IMPORT_STATE_DCT[self.state] def get_importer_instance(self): return self.importer_type.get_importer_class(import_instance=self)( skip_lines=self.skip_lines, import_instance=self, conservative_import=self.conservative_import, ) def _data_table_tab(self): imported_values = self.get_imported_values().path tmpdir = None if zipfile.is_zipfile(imported_values): z = zipfile.ZipFile(imported_values) filename = None for name in z.namelist(): # get first CSV file found if name.endswith(".csv"): filename = name break if not filename: return [] tmpdir = tempfile.mkdtemp(prefix="tmp-ishtar-") imported_values = z.extract(filename, tmpdir) encodings = [self.encoding] encodings += [coding for coding, c in ENCODINGS if coding != self.encoding] for encoding in encodings: try: with open(imported_values, encoding=encoding) as csv_file: vals = [ line for line in csv.reader(csv_file, delimiter=self.csv_sep) ] if tmpdir: shutil.rmtree(tmpdir) return vals except UnicodeDecodeError: pass # try the next encoding except csv.Error: raise ImporterError(_("Error in the CSV file.")) if tmpdir: shutil.rmtree(tmpdir) return [] def get_gis_attr(self): return self._data_table_gis(get_gis_attr=True) def _data_table_gis(self, get_gis_attr=False, get_number_only=False): self.gis_attr = None imported_values = self.get_imported_values().path tmp_dir = None file_type = "gpkg" if zipfile.is_zipfile(imported_values): z = zipfile.ZipFile(imported_values) imported_values = None filenames = [] for name in z.namelist(): # get first CSV file found name_l = name.lower() if name_l.endswith(".gpkg"): filenames = [name] break if name_l.endswith(".shp"): file_type = "shp" filenames.append(name) continue for end in [".cpg", ".prj", ".shx", ".dbf"]: if name_l.endswith(end): filenames.append(name) continue if not filenames: return [] tmp_dir = tempfile.mkdtemp(prefix="tmp-ishtar-") for filename in filenames: if filename.lower().endswith(".shp") or filename.lower().endswith( ".gpkg" ): imported_values = z.extract(filename, tmp_dir) else: z.extract(filename, tmp_dir) elif imported_values.endswith(".csv"): return self._data_table_tab() elif not imported_values.endswith(".gpkg"): raise ImporterError(_("Invalid GIS file.")) if not imported_values: raise ImporterError(_("Invalid GIS file.")) kwargs = {} if self.importer_type.layer_name: kwargs["layer"] = self.importer_type.layer_name try: with fiona.open(imported_values, **kwargs) as collection: schema = collection.schema geometry = schema["geometry"] if geometry not in IMPORT_GEOMETRY: raise ImporterError(_(f'Geometry "{geometry}" not managed.')) self.gis_attr = IMPORT_GEOMETRY[geometry] if get_gis_attr: if tmp_dir: shutil.rmtree(tmp_dir) return self.gis_attr properties = schema["properties"].keys() crs = fiona_crs.to_string(collection.crs) if not crs: driver_type = {"shp": "ESRI Shapefile", "gpkg": "GPKG"} driver = ogr.GetDriverByName(driver_type[file_type]) shape = driver.Open(imported_values) layer = shape.GetLayer() crs = layer.GetSpatialRef() auth = crs.GetAttrValue("AUTHORITY", 0) srid = crs.GetAttrValue("AUTHORITY", 1) if auth == "EPSG": srid = int(srid) elif auth == "IGNF" and srid in IGNF: srid = IGNF[srid] else: raise ImporterError(_("CRS not managed.")) # https://pyproj4.github.io/pyproj/stable/gotchas.html#axis-order-changes-in-proj-6 elif crs.startswith("+init=epsg:"): srid = crs[len("+init=epsg:"):] else: srid = CRS.from_proj4(crs).to_epsg() data = [] # Warning: RuntimeWarning: Sequential read of iterator was interrupted. # Resetting iterator. # not relevant -> bug in fiona 1.8.18 (fixed in 1.8.19) if not self.number_of_line or get_number_only: self.number_of_line = len(collection) self.save() if get_number_only: return self.number_of_line for idx, feature in enumerate(collection): try: line = [ convert_geom(feature["geometry"], srid) ] for prop in properties: value = feature["properties"][prop] if value is None: value = "" line.append(str(value)) data.append(line) except (TypeError, GDALException, GEOSException) as e: raise ImporterError( _(f"Error reading feature {idx + 1} - {e}") ) except fiona.errors.DriverError: raise ImporterError(_("Invalid GIS file.")) if tmp_dir: shutil.rmtree(tmp_dir) return data @property def data_table(self): if self.importer_type.type == "tab": return self._data_table_tab() if self.importer_type.type == "gis": return self._data_table_gis() def initialize(self, user=None, session_key=None): self.state = "AP" self.end_date = datetime.datetime.now() if user and not self.user: self.user = user self.save() try: self.get_importer_instance().initialize( self.data_table, user=user, output="db" ) except ImporterError as e: if session_key: put_session_message(session_key, e.msg, "danger") self.state = "C" self.save() return self.state = "A" self.end_date = datetime.datetime.now() self.save() def delayed_check_modified(self, session_key): if not settings.USE_BACKGROUND_TASK: return self.check_modified(session_key=session_key) put_session_message( session_key, str(_("Modification check {} added to the queue")).format(self.name), "info", ) self.state = "HQ" self.end_date = datetime.datetime.now() self.save() return delayed_check.delay(self.pk) def check_modified(self, session_key=None): self.state = "HP" self.end_date = datetime.datetime.now() self.changed_line_numbers = "" self.changed_checked = False self.save() for idx in range(self.skip_lines, self.get_number_of_lines() + 1): try: imprt, data = self.importation( simulate=True, line_to_process=idx, return_importer_and_data=True ) except IOError as e: # error is identified as a change self.add_changed_line(idx) continue # no data is not normal and an error is identified as a change if not data or not data[0]: self.add_changed_line(idx) continue # new objects is a change if imprt.new_objects: self.add_changed_line(idx) continue # check all updated fields changed = False for path, obj, values, updated_values in imprt.updated_objects: if changed: break for k in updated_values.keys(): if changed: break current_value = getattr(obj, k) updated_value = updated_values[k] if hasattr(current_value, "all"): current_value = list(current_value.all()) changed = False for v in updated_value: if v not in current_value: changed = True break else: if current_value != updated_value: changed = True break if changed: self.add_changed_line(idx) continue self.remove_changed_line(idx) self.changed_checked = True self.save() def importation( self, session_key=None, line_to_process=None, simulate=False, return_importer_and_data=False, request=None, verbose=False ): self.state = "IP" self.end_date = datetime.datetime.now() if not line_to_process: # full import self.imported_line_numbers = "" self.current_line = 0 self.save() importer = self.get_importer_instance() try: data = importer.importation( self.data_table, user=self.user, line_to_process=line_to_process, simulate=simulate, verbose=verbose ) except IOError: error_message = str(_("Error on imported file: {}")).format( self.get_imported_values() ) importer.errors = [error_message] if session_key: put_session_message(session_key, error_message, "warning") ids = get_session_var(session_key, "current_import_id") if not ids: ids = [] ids.append(self.pk) put_session_var(session_key, "current_import_id", ids) if line_to_process: self.state = "PI" else: self.state = "FE" if importer.debug: self.debug = json.dumps(importer.debug, cls=StrJSONEncoder, indent=2) self.save() if not return_importer_and_data: return return importer, None # result file filename = slugify(self.importer_type.name) now = datetime.datetime.now().isoformat("-").replace(":", "") result_file = filename + "_result_%s.csv" % now self.result_file.save( result_file, ContentFile(importer.get_csv_result().encode("utf-8")) ) if importer.debug: self.debug = json.dumps(importer.debug, cls=StrJSONEncoder, indent=2) if importer.errors: if line_to_process: self.state = "PI" else: self.state = "FE" error_file = filename + "_errors_%s.csv" % now self.error_file.save( error_file, ContentFile(importer.get_csv_errors().encode("utf-8")) ) msg = str(_("Import {} finished with errors")).format(self.name) msg_cls = "warning" else: if line_to_process: self.state = "PI" else: self.state = "F" self.error_file = None msg = str(_("Import {} finished with no errors")).format(self.name) msg_cls = "primary" if session_key and request: put_session_message(session_key, msg, msg_cls) ids = ( request.session["current_import_id"] if "current_import_id" in request.session else [] ) ids.append(self.pk) put_session_var(session_key, "current_import_id", ids) if importer.match_table: match_file = filename + "_match_%s.csv" % now self.match_file.save( match_file, ContentFile(importer.get_csv_matches().encode("utf-8")) ) self.end_date = datetime.datetime.now() self.save() if self.next_import and not importer.errors: return self.next_import.importation( session_key=session_key, line_to_process=line_to_process, simulate=simulate, return_importer_and_data=return_importer_and_data, request=request ) if return_importer_and_data: return importer, data def _unarchive(self): if not self.archive_file: return with tempfile.TemporaryDirectory() as tmp_dir_name: # extract the current archive current_zip = zipfile.ZipFile(self.archive_file.path, "r") name_list = current_zip.namelist() if "content.json" not in name_list: return for name in name_list: current_zip.extract(name, tmp_dir_name) current_zip.close() content_name = os.path.join(tmp_dir_name, "content.json") try: with open(content_name, "r") as content: files = json.loads(content.read()) except (IOError, json.JSONDecodeError): return for attr in files: filename = files[attr] full_filename = os.path.join(tmp_dir_name, filename) with open(full_filename, "rb") as raw_file: getattr(self, attr).save(filename, File(raw_file)) os.remove(self.archive_file.path) setattr(self, "archive_file", None) self.state = "FE" if self.error_file else "F" self.save() return True def _archive(self): file_attr = ["imported_file", "imported_values", "error_file", "result_file", "match_file"] files = [ (k, getattr(self, k).path, getattr(self, k).name.split(os.sep)[-1]) for k in file_attr if getattr(self, k) ] self._archive_pending = True with tempfile.TemporaryDirectory() as tmpdir: base_name = "{}.zip".format(slugify(self.name)) archive_name = os.path.join(tmpdir, base_name) with zipfile.ZipFile(archive_name, "w") as current_zip: zip_content = {} for k, path, name in files: try: current_zip.write(path, arcname=name) zip_content[k] = name except OSError: pass content_name = os.path.join(tmpdir, "content.json") with open(content_name, "w") as content: content.write(json.dumps(zip_content)) current_zip.write(content_name, arcname="content.json") with open( archive_name, "rb", ) as raw_file: self.archive_file.save(base_name, File(raw_file)) IshtarSiteProfile = apps.get_model("ishtar_common", "IshtarSiteProfile") profile = IshtarSiteProfile.get_current_profile() if profile.delete_image_zip_on_archive: file_attr.append("imported_images") for attr in file_attr: file_field = getattr(self, attr) if file_field: try: os.remove(file_field.path) except FileNotFoundError: pass setattr(self, attr, None) self.save() self._archive_pending = False def _get_all_related(self, key): imported = [] for related, zorg in get_all_related_m2m_objects_with_model(self, related_name=key): accessor = related.get_accessor_name() imported += [(accessor, obj) for obj in getattr(self, accessor).all()] return imported def get_all_imported(self): return self._get_all_related("imported_") def get_all_updated(self): return self._get_all_related("import_updated_") def parse_error_file(self): if not self.error_file or not self.error_file.path: ImportLineError.objects.filter(import_item=self).delete() return if not os.path.isfile(self.error_file.path): return with open(self.error_file.path, "r") as error_file: reader = csv.reader(error_file) for idx, line in enumerate(reader): if not idx: # header continue ImportLineError.objects.get_or_create(import_item=self, line=idx) def save(self, *args, **kwargs): maj_imported_file = False if getattr(self, "_maj_imported_file", False): pass elif self.imported_file: if self._initial_imported_file != self.imported_file.path or not self.imported_values: maj_imported_file = True elif self.imported_values: self.imported_values = None super().save(*args, **kwargs) if not getattr(self, "_no_parse_error_file", False): self.parse_error_file() if maj_imported_file and self.set_imported_values(): self._maj_imported_file = True self.save() def pre_delete_import(sender, **kwargs): # deleted imported items when an import is delete instance = kwargs.get("instance") if not instance: return to_delete = [] for accessor, imported in instance.get_all_imported(): to_delete.append(imported) post_delete_to_update = {} for item in to_delete: if hasattr(item, "post_delete_to_update"): item._no_pre_delete = True for klass, values in item.post_delete_to_update(): if klass not in post_delete_to_update: post_delete_to_update[klass] = set(values) else: post_delete_to_update[klass].update(values) item.delete() for klass in post_delete_to_update: for item_id in post_delete_to_update[klass]: q = klass.objects.filter(pk=item_id) if q.count(): q.all()[0].save() pre_delete.connect(pre_delete_import, sender=Import) class ImportLineError(models.Model): import_item = models.ForeignKey(Import, on_delete=models.CASCADE, related_name="error_lines") line = models.PositiveIntegerField(_("Line")) ignored = models.BooleanField(_("Ignored"), default=False) class Meta: verbose_name = _("Import - Ignored error") verbose_name_plural = _("Import - Ignored error") unique_together = ("line", "import_item") ordering = ("import_item", "line") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._initial_ignored = self.ignored def save(self, *args, **kwargs): super().save(*args, **kwargs) # reevaluate error status of the import if self._initial_ignored == self.ignored: return has_non_ignored_errors = self.import_item.error_lines.filter(ignored=False).count() modified = False if has_non_ignored_errors and self.import_item.state == "F": self.import_item.state = "FE" modified = True elif not has_non_ignored_errors and self.import_item.state == "FE": self.import_item.state = "F" modified = True if modified: self.import_item._no_parse_error_file = True self.import_item.save() class ImportColumnValue(models.Model): """ Value in a column for pre-import columns """ column = models.ForeignKey(ImporterColumn, on_delete=models.CASCADE) import_item = models.ForeignKey(Import, on_delete=models.CASCADE) value = models.TextField(default="", blank=True) class Meta: verbose_name = _("Import - Pre-import value") verbose_name_plural = _("Import - Pre-import values") unique_together = ("column", "import_item") ADMIN_SECTION = _("Imports") def __str__(self): return f"{self.import_item} - {self.column} - {self.value}" class ItemKey(models.Model): key = models.TextField(_("Key")) content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE) object_id = models.PositiveIntegerField() content_object = GenericForeignKey("content_type", "object_id") importer = models.ForeignKey( Import, null=True, blank=True, help_text=_("Specific key to an import"), on_delete=models.SET_NULL, ) user = models.ForeignKey( "IshtarUser", blank=True, null=True, on_delete=models.SET_NULL ) group = models.ForeignKey( TargetKeyGroup, blank=True, null=True, on_delete=models.SET_NULL ) class Meta: verbose_name = _("Import - Item key") verbose_name_plural = _("Import - Item keys") ADMIN_SECTION = _("Imports") def __str__(self): return self.key