#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2017 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import csv import datetime import os import logging import shutil import re import tempfile import zipfile from django.conf import settings from django.contrib.gis.db import models from django.core.exceptions import ValidationError from django.core.files.base import ContentFile from django.core.validators import validate_comma_separated_integer_list from django.db.models.base import ModelBase from django.db.models.signals import pre_delete from django.template.defaultfilters import slugify from django.utils.functional import cached_property from django.utils.translation import ugettext_lazy as _, pgettext_lazy from ishtar_common.model_managers import SlugModelManager from ishtar_common.utils import create_slug, \ get_all_related_m2m_objects_with_model, put_session_message, \ put_session_var, get_session_var, num2col, max_size_help, import_class from ishtar_common.data_importer import Importer, ImportFormater, \ IntegerFormater, FloatFormater, UnicodeFormater, DateFormater, \ TypeFormater, YearFormater, StrToBoolean, FileFormater, InseeFormater, \ ImporterError from ishtar_common.utils import task logger = logging.getLogger(__name__) def get_model_fields(model): """ Return a dict of fields from model """ fields = {} if not model: return fields for field in model._meta.get_fields(): fields[field.name] = field return fields class ImportModelManager(models.Manager): def get_by_natural_key(self, klass): return self.get(klass=klass) class ImporterModel(models.Model): name = models.CharField(_("Name"), max_length=200) klass = models.CharField(_("Class name"), max_length=200, unique=True) objects = ImportModelManager() class Meta: verbose_name = _("Model") verbose_name_plural = _("Models") ordering = ('name',) def __str__(self): return self.name def natural_key(self): return (self.klass, ) class ImporterTypeManager(models.Manager): def get_by_natural_key(self, slug): return self.get(slug=slug) class ImporterType(models.Model): """ Description of a table to be mapped with ishtar database """ name = models.CharField(_("Name"), max_length=200) slug = models.SlugField(_("Slug"), unique=True, max_length=100) description = models.CharField(_("Description"), blank=True, null=True, max_length=500) users = models.ManyToManyField('IshtarUser', verbose_name=_("Users"), blank=True) associated_models = models.ForeignKey( ImporterModel, verbose_name=_("Associated model"), on_delete=models.SET_NULL, related_name='importer_type_associated', blank=True, null=True) created_models = models.ManyToManyField( ImporterModel, verbose_name=_("Models that can accept new items"), blank=True, help_text=_("Leave blank for no restrictions"), related_name='importer_type_created') is_template = models.BooleanField(_("Can be exported"), default=False) unicity_keys = models.CharField(_("Unicity keys (separator \";\")"), blank=True, null=True, max_length=500) available = models.BooleanField(_("Available"), default=True) objects = ImporterTypeManager() SERIALIZATION_EXCLUDE = ["users"] class Meta: verbose_name = _("Importer - Type") verbose_name_plural = _("Importer - Types") ordering = ('name',) def natural_key(self): return (self.slug, ) def __str__(self): return self.name def get_libreoffice_template(self): if not settings.USE_LIBREOFFICE: return from ishtar_common.libreoffice import UnoCalc ROW_NUMBER = 500 uno = UnoCalc() calc = uno.create_calc() main_sheet = uno.get_sheet(calc, 0, self.name) lst_sheet = uno.get_sheet(calc, 1, str(_("List types"))) if not calc: return col_number = 1 # user number so we start with 1 lst_col_number = 0 for column in self.columns.order_by('col_number').all(): while column.col_number > col_number: col_number += 1 # header cell = main_sheet.getCellByPosition(col_number - 1, 0) cell.CharWeight = 150 cell.setString(column.label) # only managing the first target... ft = None for target in column.targets.all(): ft = target.formater_type if ft: break if not ft: continue # first we only manage TypeFormater if ft.formater_type != 'TypeFormater': continue if not ft.options: # bad configuration continue model = import_class(ft.options) if not model: continue lst = [] for typ in model.get_types(instances=True): lst.append(str(typ)) end_row = uno.create_list(lst_sheet, lst_col_number, 0, str(model._meta.verbose_name), lst) uno.set_cell_validation_list( main_sheet, col_number, 1, ROW_NUMBER + 1, lst_sheet, lst_col_number, [1, end_row]) lst_col_number += 1 tmpdir = tempfile.mkdtemp(prefix="ishtar-templates-") dest_filename = "{}{}{}.ods".format(tmpdir, os.sep, self.name) uno.save_calc(calc, dest_filename) return dest_filename def get_importer_class(self, import_instance=None): OBJECT_CLS = import_class(self.associated_models.klass) DEFAULTS = dict([(default.keys, default.values) for default in self.defaults.all()]) LINE_FORMAT = [] LINE_EXPORT_FORMAT = [] idx = 0 for column in self.columns.order_by('col_number').all(): idx += 1 while column.col_number > idx: LINE_FORMAT.append(None) LINE_EXPORT_FORMAT.append(None) idx += 1 targets = [] formater_types = [] nb = column.targets.count() if not nb: LINE_FORMAT.append(None) if column.export_field_name: LINE_EXPORT_FORMAT.append( ImportFormater(column.export_field_name, label=column.label) ) continue force_news = [] concat_str = [] concat = [] for target in column.targets.order_by("pk").all(): ft = target.formater_type.get_formater_type( target, import_instance=import_instance) if not ft: continue formater_types.append(ft) targets.append(target.target) concat_str.append(target.concat_str) force_news.append(target.force_new) concat.append(target.concat) formater_kwargs = {} if column.regexp_pre_filter: formater_kwargs['regexp'] = re.compile( column.regexp_pre_filter.regexp) if column.value_format: formater_kwargs['value_format'] = \ column.value_format.format_string formater_kwargs['concat'] = concat formater_kwargs['concat_str'] = concat_str formater_kwargs['duplicate_fields'] = [ (field.field_name, field.force_new, field.concat, field.concat_str) for field in column.duplicate_fields.all()] formater_kwargs['label'] = column.label formater_kwargs['required'] = column.required formater_kwargs['force_new'] = force_news formater_kwargs['comment'] = column.description if column.export_field_name: formater_kwargs['export_field_name'] = [ column.export_field_name] formater = ImportFormater(targets, formater_types, **formater_kwargs) LINE_FORMAT.append(formater) LINE_EXPORT_FORMAT.append(formater) UNICITY_KEYS = [] if self.unicity_keys: UNICITY_KEYS = [un.strip() for un in self.unicity_keys.split(';')] MODEL_CREATION_LIMIT = [] for modls in self.created_models.all(): MODEL_CREATION_LIMIT.append(import_class(modls.klass)) args = {'OBJECT_CLS': OBJECT_CLS, 'DESC': self.description, 'DEFAULTS': DEFAULTS, 'LINE_FORMAT': LINE_FORMAT, 'UNICITY_KEYS': UNICITY_KEYS, 'LINE_EXPORT_FORMAT': LINE_EXPORT_FORMAT, 'MODEL_CREATION_LIMIT': MODEL_CREATION_LIMIT} name = str(''.join( x for x in slugify(self.name).replace('-', ' ').title() if not x.isspace())) newclass = type(name, (Importer,), args) return newclass def save(self, *args, **kwargs): if not self.slug: self.slug = create_slug(ImporterType, self.name) return super(ImporterType, self).save(*args, **kwargs) def get_associated_model(parent_model, keys): model = None if isinstance(parent_model, str): OBJECT_CLS = import_class(parent_model) else: OBJECT_CLS = parent_model fields = get_model_fields(OBJECT_CLS) for idx, item in enumerate(keys): if item in ("-", ""): model = None elif not idx: if item not in fields: raise ImporterError( str(_("Importer configuration error: " "\"{}\" is not available for \"{}\"." " Check your default and column " "configuration")).format( item, OBJECT_CLS.__name__)) field = fields[item] if hasattr(field, 'rel') and hasattr(field.rel, 'to'): model = field.rel.to if type(field) == ModelBase: model = field else: if not model: raise ImporterError( str(_("Importer configuration error: " "\"{}\" is not available for \"{}\"." " Check your default and column " "configuration")).format( "__".join(keys[1:]), OBJECT_CLS.__name__)) return get_associated_model(model, keys[1:]) return model class ImporterDefaultManager(models.Manager): def get_by_natural_key(self, importer_type, target): return self.get(importer_type__slug=importer_type, target=target) class ImporterDefault(models.Model): """ Targets of default values in an import """ importer_type = models.ForeignKey(ImporterType, related_name='defaults') target = models.CharField("Target", max_length=500) class Meta: verbose_name = _("Importer - Default") verbose_name_plural = _("Importer - Defaults") unique_together = ('importer_type', 'target') objects = ImporterDefaultManager() def __str__(self): return "{} - {}".format(self.importer_type, self.target) def natural_key(self): return self.importer_type.slug, self.target @property def keys(self): return tuple(t for t in self.target.split('__') if t not in ("-", "")) @property def associated_model(self): return get_associated_model(self.importer_type.associated_models.klass, self.keys) @property def values(self): values = {} for default_value in self.default_values.all(): target = default_value.target if target == "-": target = "" values[target] = default_value.get_value() return values class ImporterDefaultValuesManager(models.Manager): def get_by_natural_key(self, def_target_type, def_target, target): return self.get(default_target__importer_type__slug=def_target_type, default_target__target=def_target, target=target) class ImporterDefaultValues(models.Model): """ Default values in an import """ default_target = models.ForeignKey(ImporterDefault, related_name='default_values') target = models.CharField("Target", max_length=500) value = models.CharField("Value", max_length=500) objects = ImporterDefaultValuesManager() class Meta: verbose_name = _("Importer - Default value") verbose_name_plural = _("Importer - Default values") unique_together = ('default_target', 'target') def natural_key(self): return (self.default_target.importer_type.slug, self.default_target.target, self.target) def __str__(self): return "{} - {}".format(self.default_target, self.target, self.value) def get_value(self): parent_model = self.default_target.associated_model if not parent_model: return self.value fields = get_model_fields(parent_model) target = self.target.strip() if target not in fields: return field = fields[target] if not hasattr(field, 'rel') or not hasattr(field.rel, 'to'): return self.value model = field.rel.to # if value is an id try: return model.objects.get(pk=int(self.value)) except (ValueError, model.DoesNotExist): pass # try with txt_idx try: return model.objects.get(txt_idx=self.value) except (ValueError, model.DoesNotExist): pass return "" class ImporterColumnManager(models.Manager): def get_by_natural_key(self, importer_type, col_number): return self.get(importer_type__slug=importer_type, col_number=col_number) class ImporterColumn(models.Model): """ Import file column description """ label = models.CharField(_("Label"), blank=True, null=True, max_length=200) importer_type = models.ForeignKey(ImporterType, related_name='columns') col_number = models.IntegerField(_("Column number"), default=1) description = models.TextField(_("Description"), blank=True, null=True) regexp_pre_filter = models.ForeignKey( "Regexp", blank=True, null=True, on_delete=models.SET_NULL, related_name="columns", ) value_format = models.ForeignKey( "ValueFormater", blank=True, null=True, on_delete=models.SET_NULL, related_name="columns" ) required = models.BooleanField(_("Required"), default=False) export_field_name = models.CharField( _("Export field name"), blank=True, null=True, max_length=200, help_text=_("Fill this field if the field name is ambiguous for " "export. For instance: concatenated fields.") ) objects = ImporterColumnManager() class Meta: verbose_name = _("Importer - Column") verbose_name_plural = _("Importer - Columns") ordering = ('importer_type', 'col_number') unique_together = ('importer_type', 'col_number') def __str__(self): return "{} - {}".format(self.importer_type, self.col_number) @property def col_string(self): return num2col(self.col_number) def natural_key(self): return self.importer_type.slug, self.col_number def targets_lbl(self): return ', '.join([target.target for target in self.targets.all()]) def duplicate_fields_lbl(self): return ', '.join([dp.field_name or "" for dp in self.duplicate_fields.all()]) class ImporterDuplicateFieldManager(models.Manager): def get_by_natural_key(self, importer_type, col_number, field_name): return self.get(column__importer_type__slug=importer_type, column__col_number=col_number, field_name=field_name) class ImporterDuplicateField(models.Model): """ Direct copy of result in other fields """ column = models.ForeignKey(ImporterColumn, related_name='duplicate_fields') field_name = models.CharField(_("Field name"), blank=True, null=True, max_length=200) force_new = models.BooleanField(_("Force creation of new items"), default=False) concat = models.BooleanField(_("Concatenate with existing"), default=False) concat_str = models.CharField(_("Concatenate character"), max_length=5, blank=True, null=True) objects = ImporterDuplicateFieldManager() class Meta: verbose_name = _("Importer - Duplicate field") verbose_name_plural = _("Importer - Duplicate fields") ordering = ('column', 'field_name') unique_together = ('column', 'field_name') def natural_key(self): return self.column.importer_type.slug, self.column.col_number, \ self.field_name class NamedManager(models.Manager): def get_by_natural_key(self, name): return self.get(name=name) class Regexp(models.Model): name = models.CharField(_("Name"), max_length=100, unique=True) description = models.TextField(_("Description"), blank=True, null=True) regexp = models.CharField(_("Regular expression"), max_length=500) objects = NamedManager() class Meta: verbose_name = _("Importer - Regular expression") verbose_name_plural = _("Importer - Regular expressions") def __str__(self): return self.name def natural_key(self): return (self.name, ) class ValueFormater(models.Model): name = models.CharField(_("Name"), max_length=100, unique=True) slug = models.SlugField(_("Slug"), unique=True, max_length=100) description = models.TextField(_("Description"), blank=True, null=True) format_string = models.CharField( _("Format string"), max_length=100, help_text=_("A string used to format a value using the Python " "\"format()\" method. The site https://pyformat.info/ " "provide good examples of usage. Only one \"{}\" entry " "is managed. The input is assumed to be a string.") ) objects = SlugModelManager() class Meta: verbose_name = _("Importer - Value format") verbose_name_plural = _("Importer - Value formats") def __str__(self): return self.name def clean(self): try: self.format_string.format("sample value") except ValueError: raise ValidationError( {'format_string': _("The string provided generate an error. " "Fix it.")} ) def natural_key(self): return (self.slug, ) class ImportTargetManager(models.Manager): def get_by_natural_key(self, importer_type, col_number, target): return self.get(column__importer_type__slug=importer_type, column__col_number=col_number, target=target) class ImportTarget(models.Model): """ Ishtar database target for a column """ column = models.ForeignKey(ImporterColumn, related_name='targets') target = models.CharField("Target", max_length=500) formater_type = models.ForeignKey("FormaterType", related_name='targets') force_new = models.BooleanField(_("Force creation of new items"), default=False) concat = models.BooleanField(_("Concatenate with existing"), default=False) concat_str = models.CharField(_("Concatenate character"), max_length=5, blank=True, null=True) comment = models.TextField(_("Comment"), blank=True, null=True) objects = ImportTargetManager() class Meta: verbose_name = _("Importer - Target") verbose_name_plural = _("Importer - Targets") unique_together = ('column', 'target') def __str__(self): return self.target[:50] if self.target else self.comment @cached_property def verbose_name(self): if not self.column.description: return self.target[:50] desc = self.column.description desc = desc[0].lower() + desc[1:] return "{} - {}".format(self.target[:50], desc) def natural_key(self): return self.column.importer_type.slug, self.column.col_number, \ self.target @property def associated_model(self): if self.target.startswith("data__"): return try: return get_associated_model( self.column.importer_type.associated_models.klass, self.target.split('__')) except KeyError: return def get_choices(self): if self.formater_type.formater_type == 'UnknowType' \ and self.column.importer_type.slug: cls = self.column.importer_type.get_importer_class() formt = cls().line_format[self.column.col_number - 1] if hasattr(formt.formater, 'choices'): return [('', '--' * 8)] + list(formt.formater.choices) return [('', '--' * 8)] if self.formater_type.formater_type == 'StrToBoolean': return [('', '--' * 8), ('True', _("True")), ('False', _("False"))] if not self.associated_model or not hasattr(self.associated_model, 'get_types'): return [] return self.associated_model.get_types() class TargetKeyGroup(models.Model): """ Group of target keys for imports. """ name = models.TextField(_("Name"), unique=True) all_user_can_use = models.BooleanField(_("All users can use it"), default=False) all_user_can_modify = models.BooleanField(_("All users can modify it"), default=False) available = models.BooleanField(_("Available"), default=True) class Meta: verbose_name = _("Importer - Target key group") verbose_name_plural = _("Importer - Target key groups") def __str__(self): return self.name class TargetKey(models.Model): """ User's link between import source and ishtar database. Also temporary used for GeneralType to point missing link before adding them in ItemKey table. A targetkey connection can be create to be applied to one particular import (associated_import), one particular user (associated_user), one particular group (associated_group) or to all imports (associated_import, associated_user and associated_group are empty). """ target = models.ForeignKey(ImportTarget, related_name='keys') key = models.TextField(_("Key")) value = models.TextField(_("Value"), blank=True, null=True) is_set = models.BooleanField(_("Is set"), default=False) associated_import = models.ForeignKey('Import', blank=True, null=True) associated_user = models.ForeignKey('IshtarUser', blank=True, null=True) associated_group = models.ForeignKey(TargetKeyGroup, blank=True, null=True) class Meta: unique_together = ('target', 'key', 'associated_user', 'associated_import',) verbose_name = _("Importer - Target key") verbose_name_plural = _("Importer - Targets keys") ordering = ('target', 'key') def __str__(self): return " - ".join([str(self.target), self.key[:50]]) def column_nb(self): # for the admin return self.target.column.col_number def importer_type(self): # for the admin return self.target.column.importer_type.name def format(self): if not self.is_set: return None if self.target.formater_type.formater_type == 'StrToBoolean': if self.value in ('False', '0'): return False elif self.value: return True return return self.value def save(self, *args, **kwargs): obj = super(TargetKey, self).save(*args, **kwargs) if not self.value: return obj v = None associated_model = self.target.associated_model if associated_model and hasattr(self.target.associated_model, "add_key"): # pk is given try: v = self.target.associated_model.objects.get( pk=str(int(self.value))) except (ValueError, self.target.associated_model.DoesNotExist): # try with txt_idx try: v = self.target.associated_model.objects.get( txt_idx=str(self.value)) except self.target.associated_model.DoesNotExist: pass if v: keys = {} if self.associated_group: keys['group'] = self.associated_group if self.associated_user: keys['user'] = self.associated_user else: keys['importer'] = self.associated_import v.add_key(self.key, **keys) return obj TARGET_MODELS = [ ('OrganizationType', _("Organization type")), ('ishtar_common.models.OrganizationType', _("Organization type")), ('ishtar_common.models.PersonType', _("Person type")), ('TitleType', _("Title")), ('SourceType', _("Source type")), ('AuthorType', _("Author type")), ('Format', _("Format")), ('archaeological_operations.models.OperationType', _("Operation type")), ('archaeological_operations.models.Period', _("Period")), ('archaeological_operations.models.ReportState', _("Report state")), ('archaeological_operations.models.RemainType', _("Remain type")), ('archaeological_operations.models.RelationType', _("Operation relation type")), ('archaeological_context_records.models.Unit', _("Unit")), ('archaeological_context_records.models.ActivityType', _("Activity type")), ('archaeological_context_records.models.DocumentationType', _("Documentation type")), ("archaeological_context_records.models.DatingQuality", _("Dating quality")), ('archaeological_finds.models.MaterialType', _("Material")), ('archaeological_finds.models.ConservatoryState', _("Conservatory state")), ('archaeological_warehouse.models.ContainerType', _("Container type")), ('archaeological_warehouse.models.WarehouseDivision', _("Warehouse division")), ('archaeological_warehouse.models.WarehouseType', _("Warehouse type")), ('archaeological_finds.models.TreatmentType', _("Treatment type")), ('archaeological_finds.models.TreatmentEmergencyType', _("Treatment emergency type")), ('archaeological_finds.models.ObjectType', _("Object type")), ('archaeological_finds.models.IntegrityType', _("Integrity type")), ('archaeological_finds.models.RemarkabilityType', _("Remarkability type")), ('archaeological_finds.models.AlterationType', _("Alteration type")), ('archaeological_finds.models.AlterationCauseType', _("Alteration cause type")), ('archaeological_finds.models.BatchType', _("Batch type")), ('archaeological_finds.models.CheckedType', _("Checked type")), ('archaeological_finds.models.MaterialTypeQualityType', _("Material type quality")), ('archaeological_context_records.models.IdentificationType', _("Identification type")), ('archaeological_context_records.models.RelationType', _("Context record relation type")), ('SpatialReferenceSystem', _("Spatial reference system")), ('SupportType', _("Support type")), ('TitleType', _("Title type")), ] TARGET_MODELS_KEYS = [tm[0] for tm in TARGET_MODELS] IMPORTER_TYPES = ( ('IntegerFormater', _("Integer")), ('FloatFormater', _("Float")), ('UnicodeFormater', _("String")), ('DateFormater', _("Date")), ('TypeFormater', _("Type")), ('YearFormater', _("Year")), ('InseeFormater', _("INSEE code")), ('StrToBoolean', _("String to boolean")), ('FileFormater', pgettext_lazy("filesystem", "File")), ('UnknowType', _("Unknow type")) ) IMPORTER_TYPES_DCT = { 'IntegerFormater': IntegerFormater, 'FloatFormater': FloatFormater, 'UnicodeFormater': UnicodeFormater, 'DateFormater': DateFormater, 'TypeFormater': TypeFormater, 'YearFormater': YearFormater, 'StrToBoolean': StrToBoolean, 'FileFormater': FileFormater, 'InseeFormater': InseeFormater, 'UnknowType': None, } DATE_FORMATS = ( ('%Y', _("4 digit year. e.g.: \"2015\"")), ('%Y/%m/%d', _("4 digit year/month/day. e.g.: \"2015/02/04\"")), ('%d/%m/%Y', _("Day/month/4 digit year. e.g.: \"04/02/2015\"")), ) IMPORTER_TYPES_CHOICES = {'TypeFormater': TARGET_MODELS, 'DateFormater': DATE_FORMATS} class FormaterTypeManager(models.Manager): def get_by_natural_key(self, formater_type, options, many_split): return self.get(formater_type=formater_type, options=options, many_split=many_split) class FormaterType(models.Model): formater_type = models.CharField("Formater type", max_length=20, choices=IMPORTER_TYPES) options = models.CharField(_("Options"), max_length=500, blank=True, null=True) many_split = models.CharField(_("Split character(s)"), max_length=10, blank=True, null=True) objects = FormaterTypeManager() class Meta: verbose_name = _("Importer - Formater type") verbose_name_plural = _("Importer - Formater types") unique_together = ('formater_type', 'options', 'many_split') ordering = ('formater_type', 'options') def natural_key(self): return self.formater_type, self.options, self.many_split def __str__(self): return " - ".join( [str(dict(IMPORTER_TYPES)[self.formater_type]) if self.formater_type in IMPORTER_TYPES_DCT else ''] + [getattr(self, k) for k in ('options', 'many_split') if getattr(self, k)]) def get_choices(self): if self.format_type in IMPORTER_TYPES_CHOICES: return IMPORTER_TYPES_CHOICES[self.format_type] def get_formater_type(self, target, import_instance=None): if self.formater_type not in IMPORTER_TYPES_DCT.keys(): return kwargs = {'db_target': target, 'import_instance': import_instance} if self.many_split: kwargs['many_split'] = self.many_split if self.formater_type == 'TypeFormater': if self.options not in TARGET_MODELS_KEYS: logger.warning( "**WARN FormaterType.get_formater_type**: {} " "is not in TARGET_MODELS_KEYS".format(self.options)) return if self.options in dir(): model = dir()[self.options] else: model = import_class(self.options) return TypeFormater(model, **kwargs) elif self.formater_type == 'UnicodeFormater': if self.options: try: return UnicodeFormater(int(self.options.strip()), **kwargs) except ValueError: pass return UnicodeFormater(**kwargs) elif self.formater_type == 'DateFormater': date_formats = self.options if self.many_split: date_formats = self.options.split(kwargs.pop('many_split')) return DateFormater(date_formats, **kwargs) elif self.formater_type == 'StrToBoolean': return StrToBoolean(**kwargs) elif self.formater_type == 'UnknowType': return else: return IMPORTER_TYPES_DCT[self.formater_type](**kwargs) IMPORT_STATE = ( ("C", _("Created")), ("AP", _("Analyse in progress")), ("A", _("Analysed")), ("HQ", _("Check modified in queue")), ("IQ", _("Import in queue")), ("HP", _("Check modified in progress")), ("IP", _("Import in progress")), ("PI", _("Partially imported")), ("FE", _("Finished with errors")), ("F", _("Finished")), ("AC", _("Archived")), ) IMPORT_STATE_DCT = dict(IMPORT_STATE) ENCODINGS = [(settings.ENCODING, settings.ENCODING), (settings.ALT_ENCODING, settings.ALT_ENCODING), ('utf-8', 'utf-8')] CSV_SEPS = ((",", ","), (";", ";"),) @task() def delayed_import(import_pk): try: imp = Import.objects.get(pk=import_pk) except Import.DoesNotExist: return imp.importation() @task() def delayed_check(import_pk): try: imp = Import.objects.get(pk=import_pk) except Import.DoesNotExist: return imp.check_modified() class Import(models.Model): user = models.ForeignKey('IshtarUser', blank=True, null=True, on_delete=models.SET_NULL) name = models.CharField(_("Name"), max_length=500, null=True) importer_type = models.ForeignKey(ImporterType) imported_file = models.FileField( _("Imported file"), upload_to="upload/imports/%Y/%m/", max_length=220, help_text=max_size_help()) imported_images = models.FileField( _("Associated images (zip file)"), upload_to="upload/imports/%Y/%m/", blank=True, null=True, max_length=220, help_text=max_size_help()) associated_group = models.ForeignKey( TargetKeyGroup, blank=True, null=True, on_delete=models.SET_NULL, help_text=_("If a group is selected, target key saved in this group " "will be used.") ) encoding = models.CharField(_("Encoding"), choices=ENCODINGS, default=u'utf-8', max_length=15) csv_sep = models.CharField( _("CSV separator"), choices=CSV_SEPS, default=u',', max_length=1, help_text=_("Separator for CSV file. Standard is comma but Microsoft " "Excel do not follow this standard and use semi-colon.") ) skip_lines = models.IntegerField( _("Skip lines"), default=1, help_text=_("Number of header lines in your file (can be 0).")) error_file = models.FileField( _("Error file"), upload_to="upload/imports/%Y/%m/", blank=True, null=True, max_length=255, help_text=max_size_help()) result_file = models.FileField( _("Result file"), upload_to="upload/imports/%Y/%m/", blank=True, null=True, max_length=255, help_text=max_size_help()) match_file = models.FileField( _("Match file"), upload_to="upload/imports/%Y/%m/", blank=True, null=True, max_length=255, help_text=max_size_help()) state = models.CharField(_("State"), max_length=2, choices=IMPORT_STATE, default=u'C') conservative_import = models.BooleanField( _("Conservative import"), default=False, help_text=_(u'If set to true, do not overload existing values.')) creation_date = models.DateTimeField( _("Creation date"), auto_now_add=True, blank=True, null=True) end_date = models.DateTimeField(_("End date"), auto_now_add=True, blank=True, null=True, editable=False) seconds_remaining = models.IntegerField( _("Remaining seconds"), blank=True, null=True, editable=False) current_line = models.IntegerField(_("Current line"), blank=True, null=True) number_of_line = models.IntegerField(_("Number of line"), blank=True, null=True) imported_line_numbers = models.TextField( _("Imported line numbers"), blank=True, null=True, validators=[validate_comma_separated_integer_list] ) changed_checked = models.BooleanField(_("Changed have been checked"), default=False) changed_line_numbers = models.TextField( _("Changed line numbers"), blank=True, null=True, validators=[validate_comma_separated_integer_list] ) class Meta: verbose_name = _("Import") verbose_name_plural = _("Imports") def __str__(self): return "{} | {}".format(self.name or "-", self.importer_type) def need_matching(self): return bool(TargetKey.objects.filter(associated_import=self, is_set=False).count()) @property def errors(self): if not self.error_file: return [] errors = [] with open(self.error_file.path, 'rt') as csvfile: reader = csv.DictReader( csvfile, fieldnames=['line', 'column', 'error']) for idx, row in enumerate(reader): if not idx: # pass the header continue errors.append(row) return errors def get_number_of_lines(self): if self.number_of_line: return self.number_of_line if not self.imported_file or not self.imported_file.path: return filename = self.imported_file.path encodings = [self.encoding] encodings += [coding for coding, c in ENCODINGS if coding != self.encoding] for encoding in encodings: try: with open(filename, 'r', encoding=encoding) as f: reader = csv.reader(f, delimiter=self.csv_sep) nb = sum(1 for __ in reader) - self.skip_lines except UnicodeDecodeError: pass # try the next encoding except csv.Error: raise ImporterError(_("Error in the CSV file.")) self.number_of_line = nb self.save() return nb @property def progress_percent(self): if not self.current_line or not self.number_of_line: return 0 return int((float(self.current_line) / float(self.number_of_line)) * 100) def add_imported_line(self, idx_line): if not self.number_of_line: self.get_number_of_lines() if self.imported_line_numbers and \ str(idx_line) in self.imported_line_numbers.split(','): return if self.imported_line_numbers: self.imported_line_numbers += "," else: self.imported_line_numbers = "" self.imported_line_numbers += str(idx_line) self.current_line = idx_line self.save() def add_changed_line(self, idx_line): if self.changed_line_numbers and \ str(idx_line) in self.changed_line_numbers.split(','): return if self.changed_line_numbers: self.changed_line_numbers += "," else: self.changed_line_numbers = "" self.changed_line_numbers += str(idx_line) self.save() def remove_changed_line(self, idx_line): if not self.changed_line_numbers: return line_numbers = self.changed_line_numbers.split(',') if str(idx_line) not in line_numbers: return line_numbers.pop(line_numbers.index(str(idx_line))) self.changed_line_numbers = ",".join(line_numbers) self.save() def has_changes(self, idx_line): if not self.changed_checked: return True if not self.changed_line_numbers: return line_numbers = self.changed_line_numbers.split(',') return str(idx_line) in line_numbers def line_is_imported(self, idx_line): return self.imported_line_numbers and \ str(idx_line) in self.imported_line_numbers.split(',') def get_actions(self): """ Get available action relevant with the current status """ from ishtar_common.models import IshtarSiteProfile profile = IshtarSiteProfile.get_current_profile() actions = [] if self.state == 'C': actions.append(('A', _("Analyse"))) if self.state in ('A', 'PI'): actions.append(('A', _("Re-analyse"))) actions.append(('I', _("Launch import"))) if profile.experimental_feature: if self.changed_checked: actions.append(('IS', _("Step by step import"))) actions.append(('CH', _("Re-check for changes"))) else: actions.append(('CH', _("Check for changes"))) if self.state in ('F', 'FE'): actions.append(('A', _("Re-analyse"))) actions.append(('I', _("Re-import"))) if profile.experimental_feature: if self.changed_checked: actions.append(('IS', _("Step by step re-import"))) actions.append(('CH', _("Re-check for changes"))) else: actions.append(('CH', _("Check for changes"))) actions.append(('AC', _("Archive"))) if self.state == 'AC': actions.append(('A', _("Unarchive"))) actions.append(('D', _("Delete"))) return actions @property def imported_filename(self): return self.imported_file.name.split(os.sep)[-1] @property def status(self): if self.state not in IMPORT_STATE_DCT: return "" return IMPORT_STATE_DCT[self.state] def get_importer_instance(self): return self.importer_type.get_importer_class(import_instance=self)( skip_lines=self.skip_lines, import_instance=self, conservative_import=self.conservative_import) @property def data_table(self): imported_file = self.imported_file.path tmpdir = None if zipfile.is_zipfile(imported_file): z = zipfile.ZipFile(imported_file) filename = None for name in z.namelist(): # get first CSV file found if name.endswith('.csv'): filename = name break if not filename: return [] tmpdir = tempfile.mkdtemp(prefix='tmp-ishtar-') imported_file = z.extract(filename, tmpdir) encodings = [self.encoding] encodings += [coding for coding, c in ENCODINGS if coding != self.encoding] for encoding in encodings: try: with open(imported_file, encoding=encoding) as csv_file: vals = [ line for line in csv.reader(csv_file, delimiter=self.csv_sep) ] if tmpdir: shutil.rmtree(tmpdir) return vals except UnicodeDecodeError: pass # try the next encoding except csv.Error: raise ImporterError(_("Error in the CSV file.")) if tmpdir: shutil.rmtree(tmpdir) return [] def initialize(self, user=None, session_key=None): self.state = 'AP' self.end_date = datetime.datetime.now() self.save() try: self.get_importer_instance().initialize( self.data_table, user=user, output='db') except ImporterError as e: if session_key: put_session_message(session_key, e.msg, "danger") self.state = 'C' self.save() return self.state = 'A' self.end_date = datetime.datetime.now() self.save() def delayed_check_modified(self, session_key): if not settings.USE_BACKGROUND_TASK: return self.check_modified(session_key=session_key) put_session_message( session_key, str( _("Modification check {} added to the queue") ).format(self.name), "info") self.state = 'HQ' self.end_date = datetime.datetime.now() self.save() return delayed_check.delay(self.pk) def check_modified(self, session_key=None): self.state = 'HP' self.end_date = datetime.datetime.now() self.changed_line_numbers = "" self.changed_checked = False self.save() for idx in range(self.skip_lines, self.get_number_of_lines() + 1): try: imprt, data = self.importation( simulate=True, line_to_process=idx, return_importer_and_data=True ) except IOError as e: # error is identified as a change self.add_changed_line(idx) continue # no data is not normal and an error is identified as a change if not data or not data[0]: self.add_changed_line(idx) continue # new objects is a change if imprt.new_objects: self.add_changed_line(idx) continue # check all updated fields changed = False for path, obj, values, updated_values in imprt.updated_objects: if changed: break for k in updated_values.keys(): if changed: break current_value = getattr(obj, k) updated_value = updated_values[k] if hasattr(current_value, 'all'): current_value = list(current_value.all()) changed = False for v in updated_value: if v not in current_value: changed = True break else: if current_value != updated_value: changed = True break if changed: self.add_changed_line(idx) continue self.remove_changed_line(idx) self.changed_checked = True self.save() def delayed_importation(self, request, session_key): if not settings.USE_BACKGROUND_TASK: return self.importation(request=request, session_key=session_key) put_session_message( session_key, str(_("Import {} added to the queue")).format(self.name), "info") self.state = 'IQ' self.end_date = datetime.datetime.now() self.save() return delayed_import.delay(self.pk) def importation(self, session_key=None, line_to_process=None, simulate=False, return_importer_and_data=False, request=None): self.state = 'IP' self.end_date = datetime.datetime.now() if not line_to_process: # full import self.imported_line_numbers = '' self.current_line = 0 self.save() importer = self.get_importer_instance() try: data = importer.importation( self.data_table, user=self.user, line_to_process=line_to_process, simulate=simulate) except IOError: error_message = str(_("Error on imported file: {}")).format( self.imported_file) importer.errors = [error_message] if session_key: put_session_message(session_key, error_message, "warning") ids = get_session_var(session_key, 'current_import_id') if not ids: ids = [] ids.append(self.pk) put_session_var(session_key, 'current_import_id', ids) if line_to_process: self.state = 'PI' else: self.state = 'FE' self.save() if not return_importer_and_data: return return importer, None # result file filename = slugify(self.importer_type.name) now = datetime.datetime.now().isoformat('-').replace(':', '') result_file = filename + "_result_%s.csv" % now self.result_file.save( result_file, ContentFile(importer.get_csv_result().encode('utf-8'))) if importer.errors: if line_to_process: self.state = 'PI' else: self.state = 'FE' error_file = filename + "_errors_%s.csv" % now self.error_file.save( error_file, ContentFile(importer.get_csv_errors().encode('utf-8')) ) msg = str(_("Import {} finished with errors")).format( self.name) msg_cls = "warning" else: if line_to_process: self.state = 'PI' else: self.state = 'F' self.error_file = None msg = str(_("Import {} finished with no errors")).format( self.name) msg_cls = "primary" if session_key and request: put_session_message(session_key, msg, msg_cls) ids = request.session['current_import_id'] \ if 'current_import_id' in request.session else [] ids.append(self.pk) put_session_var(session_key, 'current_import_id', ids) if importer.match_table: match_file = filename + "_match_%s.csv" % now self.match_file.save( match_file, ContentFile(importer.get_csv_matches().encode('utf-8')) ) self.end_date = datetime.datetime.now() self.save() if return_importer_and_data: return importer, data def archive(self): self.state = 'AC' self.end_date = datetime.datetime.now() self.save() def get_all_imported(self): imported = [] for related, zorg in get_all_related_m2m_objects_with_model(self): accessor = related.get_accessor_name() imported += [(accessor, obj) for obj in getattr(self, accessor).all()] return imported def pre_delete_import(sender, **kwargs): # deleted imported items when an import is delete instance = kwargs.get('instance') if not instance: return to_delete = [] for accessor, imported in instance.get_all_imported(): to_delete.append(imported) for item in to_delete: item.delete() pre_delete.connect(pre_delete_import, sender=Import)