#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2017 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import csv import datetime from importlib import import_module import os import logging import shutil import re import tempfile import unicodecsv import zipfile from django.conf import settings from django.contrib.gis.db import models from django.core.exceptions import SuspiciousOperation from django.core.files.base import ContentFile from django.core.validators import validate_comma_separated_integer_list from django.db.models.base import ModelBase from django.db.models.signals import pre_delete from django.template.defaultfilters import slugify from django.utils.functional import cached_property from django.utils.translation import ugettext_lazy as _, pgettext_lazy from ishtar_common.utils import create_slug, \ get_all_related_m2m_objects_with_model, put_session_message, \ put_session_var, get_session_var, num2col from ishtar_common.data_importer import Importer, ImportFormater, \ IntegerFormater, FloatFormater, UnicodeFormater, DateFormater, \ TypeFormater, YearFormater, StrToBoolean, FileFormater, InseeFormater, \ ImporterError if settings.USE_BACKGROUND_TASK: from background_task import background else: background = lambda x: x logger = logging.getLogger(__name__) IMPORTER_CLASSES = {} IMPORTER_CLASSES.update({ 'sra-pdl-files': 'archaeological_files.data_importer.FileImporterSraPdL'}) def get_model_fields(model): """ Return a dict of fields from model """ fields = {} if not model: return fields for field in model._meta.get_fields(): fields[field.name] = field return fields def import_class(full_path_classname): """ Return the model class from the full path TODO: add a white list for more security """ mods = full_path_classname.split('.') if len(mods) == 1: mods = ['ishtar_common', 'models', mods[0]] elif 'models' not in mods and 'models_finds' not in mods \ and 'models_treatments' not in mods: raise SuspiciousOperation( u"Try to import a non model from a string") module = import_module('.'.join(mods[:-1])) return getattr(module, mods[-1]) class ImportModelManager(models.Manager): def get_by_natural_key(self, klass): return self.get(klass=klass) class ImporterModel(models.Model): name = models.CharField(_(u"Name"), max_length=200) klass = models.CharField(_(u"Class name"), max_length=200, unique=True) objects = ImportModelManager() class Meta: verbose_name = _(u"Importer - Model") verbose_name_plural = _(u"Importer - Models") ordering = ('name',) def __unicode__(self): return self.name def natural_key(self): return (self.klass, ) class ImporterTypeManager(models.Manager): def get_by_natural_key(self, slug): return self.get(slug=slug) class ImporterType(models.Model): """ Description of a table to be mapped with ishtar database """ name = models.CharField(_(u"Name"), max_length=200) slug = models.SlugField(_(u"Slug"), unique=True, max_length=100) description = models.CharField(_(u"Description"), blank=True, null=True, max_length=500) users = models.ManyToManyField('IshtarUser', verbose_name=_(u"Users"), blank=True) associated_models = models.ForeignKey( ImporterModel, verbose_name=_(u"Associated model"), related_name='+', blank=True, null=True) created_models = models.ManyToManyField( ImporterModel, verbose_name=_(u"Models that can accept new items"), blank=True, help_text=_(u"Leave blank for no restrictions"), related_name='+') is_template = models.BooleanField(_(u"Is template"), default=False) unicity_keys = models.CharField(_(u"Unicity keys (separator \";\")"), blank=True, null=True, max_length=500) available = models.BooleanField(_(u"Available"), default=True) objects = ImporterTypeManager() class Meta: verbose_name = _(u"Importer - Type") verbose_name_plural = _(u"Importer - Types") ordering = ('name',) def natural_key(self): return (self.slug, ) def __unicode__(self): return self.name def get_importer_class(self, import_instance=None): if self.slug and self.slug in IMPORTER_CLASSES: cls = import_class(IMPORTER_CLASSES[self.slug]) return cls OBJECT_CLS = import_class(self.associated_models.klass) DEFAULTS = dict([(default.keys, default.values) for default in self.defaults.all()]) LINE_FORMAT = [] LINE_EXPORT_FORMAT = [] idx = 0 for column in self.columns.order_by('col_number').all(): idx += 1 while column.col_number > idx: LINE_FORMAT.append(None) LINE_EXPORT_FORMAT.append(None) idx += 1 targets = [] formater_types = [] nb = column.targets.count() if not nb: LINE_FORMAT.append(None) if column.export_field_name: LINE_EXPORT_FORMAT.append( ImportFormater(column.export_field_name, label=column.label) ) continue force_news = [] concat_str = [] for target in column.targets.all(): ft = target.formater_type.get_formater_type( target, import_instance=import_instance) if not ft: continue formater_types.append(ft) targets.append(target.target) concat_str.append(target.concat_str) force_news.append(target.force_new) formater_kwargs = {} if column.regexp_pre_filter: formater_kwargs['regexp'] = re.compile( column.regexp_pre_filter.regexp) formater_kwargs['concat_str'] = concat_str formater_kwargs['duplicate_fields'] = [ (field.field_name, field.force_new, field.concat, field.concat_str) for field in column.duplicate_fields.all()] formater_kwargs['label'] = column.label formater_kwargs['required'] = column.required formater_kwargs['force_new'] = force_news formater_kwargs['comment'] = column.description if column.export_field_name: formater_kwargs['export_field_name'] = [ column.export_field_name] formater = ImportFormater(targets, formater_types, **formater_kwargs) LINE_FORMAT.append(formater) LINE_EXPORT_FORMAT.append(formater) UNICITY_KEYS = [] if self.unicity_keys: UNICITY_KEYS = [un.strip() for un in self.unicity_keys.split(';')] MODEL_CREATION_LIMIT = [] for modls in self.created_models.all(): MODEL_CREATION_LIMIT.append(import_class(modls.klass)) args = {'OBJECT_CLS': OBJECT_CLS, 'DESC': self.description, 'DEFAULTS': DEFAULTS, 'LINE_FORMAT': LINE_FORMAT, 'UNICITY_KEYS': UNICITY_KEYS, 'LINE_EXPORT_FORMAT': LINE_EXPORT_FORMAT, 'MODEL_CREATION_LIMIT': MODEL_CREATION_LIMIT} name = str(''.join( x for x in slugify(self.name).replace('-', ' ').title() if not x.isspace())) newclass = type(name, (Importer,), args) return newclass def save(self, *args, **kwargs): if not self.slug: self.slug = create_slug(ImporterType, self.name) return super(ImporterType, self).save(*args, **kwargs) def get_associated_model(parent_model, keys): model = None if isinstance(parent_model, unicode) or \ isinstance(parent_model, str): OBJECT_CLS = import_class(parent_model) else: OBJECT_CLS = parent_model for idx, item in enumerate(keys): if not idx: field = get_model_fields(OBJECT_CLS)[item] if hasattr(field, 'rel') and hasattr(field.rel, 'to'): model = field.rel.to if type(field) == ModelBase: model = field else: return get_associated_model(model, keys[1:]) return model class ImporterDefaultManager(models.Manager): def get_by_natural_key(self, importer_type, target): return self.get(importer_type__slug=importer_type, target=target) class ImporterDefault(models.Model): """ Targets of default values in an import """ importer_type = models.ForeignKey(ImporterType, related_name='defaults') target = models.CharField(u"Target", max_length=500) class Meta: verbose_name = _(u"Importer - Default") verbose_name_plural = _(u"Importer - Defaults") unique_together = ('importer_type', 'target') objects = ImporterDefaultManager() def __unicode__(self): return u"{} - {}".format(self.importer_type, self.target) def natural_key(self): return self.importer_type.slug, self.target @property def keys(self): return tuple(self.target.split('__')) @property def associated_model(self): return get_associated_model(self.importer_type.associated_models.klass, self.keys) @property def values(self): values = {} for default_value in self.default_values.all(): values[default_value.target] = default_value.get_value() return values class ImporterDefaultValuesManager(models.Manager): def get_by_natural_key(self, def_target_type, def_target, target): return self.get(default_target__importer_type__slug=def_target_type, default_target__target=def_target, target=target) class ImporterDefaultValues(models.Model): """ Default values in an import """ default_target = models.ForeignKey(ImporterDefault, related_name='default_values') target = models.CharField(u"Target", max_length=500) value = models.CharField(u"Value", max_length=500) objects = ImporterDefaultValuesManager() class Meta: verbose_name = _(u"Importer - Default value") verbose_name_plural = _(u"Importer - Default values") unique_together = ('default_target', 'target') def natural_key(self): return (self.default_target.importer_type.slug, self.default_target.target, self.target) def __unicode__(self): return u"{} - {}".format(self.default_target, self.target, self.value) def get_value(self): parent_model = self.default_target.associated_model if not parent_model: return self.value fields = get_model_fields(parent_model) target = self.target.strip() if target not in fields: return field = fields[target] if not hasattr(field, 'rel') or not hasattr(field.rel, 'to'): return model = field.rel.to # if value is an id try: return model.objects.get(pk=int(self.value)) except (ValueError, model.DoesNotExist): pass # try with txt_idx try: return model.objects.get(txt_idx=self.value) except (ValueError, model.DoesNotExist): pass return "" class ImporterColumnManager(models.Manager): def get_by_natural_key(self, importer_type, col_number): return self.get(importer_type__slug=importer_type, col_number=col_number) class ImporterColumn(models.Model): """ Import file column description """ label = models.CharField(_(u"Label"), blank=True, null=True, max_length=200) importer_type = models.ForeignKey(ImporterType, related_name='columns') col_number = models.IntegerField(_(u"Column number"), default=1) description = models.TextField(_("Description"), blank=True, null=True) regexp_pre_filter = models.ForeignKey("Regexp", blank=True, null=True) required = models.BooleanField(_(u"Required"), default=False) export_field_name = models.CharField( _(u"Export field name"), blank=True, null=True, max_length=200, help_text=_(u"Fill this field if the field name is ambiguous for " u"export. For instance: concatenated fields.") ) objects = ImporterColumnManager() class Meta: verbose_name = _(u"Importer - Column") verbose_name_plural = _(u"Importer - Columns") ordering = ('importer_type', 'col_number') unique_together = ('importer_type', 'col_number') def __unicode__(self): return u"{} - {}".format(self.importer_type, self.col_number) @property def col_string(self): return num2col(self.col_number) def natural_key(self): return self.importer_type.slug, self.col_number def targets_lbl(self): return u', '.join([target.target for target in self.targets.all()]) def duplicate_fields_lbl(self): return u', '.join([dp.field_name for dp in self.duplicate_fields.all()]) class ImporterDuplicateFieldManager(models.Manager): def get_by_natural_key(self, importer_type, col_number, field_name): return self.get(column__importer_type__slug=importer_type, column__col_number=col_number, field_name=field_name) class ImporterDuplicateField(models.Model): """ Direct copy of result in other fields """ column = models.ForeignKey(ImporterColumn, related_name='duplicate_fields') field_name = models.CharField(_(u"Field name"), blank=True, null=True, max_length=200) force_new = models.BooleanField(_(u"Force creation of new items"), default=False) concat = models.BooleanField(_(u"Concatenate with existing"), default=False) concat_str = models.CharField(_(u"Concatenate character"), max_length=5, blank=True, null=True) objects = ImporterDuplicateFieldManager() class Meta: verbose_name = _(u"Importer - Duplicate field") verbose_name_plural = _(u"Importer - Duplicate fields") ordering = ('column', 'field_name') unique_together = ('column', 'field_name') def natural_key(self): return self.column.importer_type.slug, self.column.col_number, \ self.field_name class NamedManager(models.Manager): def get_by_natural_key(self, name): return self.get(name=name) class Regexp(models.Model): name = models.CharField(_(u"Name"), max_length=100, unique=True) description = models.CharField(_(u"Description"), blank=True, null=True, max_length=500) regexp = models.CharField(_(u"Regular expression"), max_length=500) objects = NamedManager() class Meta: verbose_name = _(u"Importer - Regular expression") verbose_name_plural = _(u"Importer - Regular expressions") def __unicode__(self): return self.name def natural_key(self): return (self.name, ) class ImportTargetManager(models.Manager): def get_by_natural_key(self, importer_type, col_number, target): return self.get(column__importer_type__slug=importer_type, column__col_number=col_number, target=target) class ImportTarget(models.Model): """ Ishtar database target for a column """ column = models.ForeignKey(ImporterColumn, related_name='targets') target = models.CharField(u"Target", max_length=500) regexp_filter = models.ForeignKey("Regexp", blank=True, null=True) formater_type = models.ForeignKey("FormaterType") force_new = models.BooleanField(_(u"Force creation of new items"), default=False) concat = models.BooleanField(_(u"Concatenate with existing"), default=False) concat_str = models.CharField(_(u"Concatenate character"), max_length=5, blank=True, null=True) comment = models.TextField(_(u"Comment"), blank=True, null=True) objects = ImportTargetManager() class Meta: verbose_name = _(u"Importer - Target") verbose_name_plural = _(u"Importer - Targets") unique_together = ('column', 'target') def __unicode__(self): return self.target[:50] if self.target else self.comment @cached_property def verbose_name(self): if not self.column.description: return self.target[:50] desc = self.column.description desc = desc[0].lower() + desc[1:] return u"{} - {}".format(self.target[:50], desc) def natural_key(self): return self.column.importer_type.slug, self.column.col_number, \ self.target @property def associated_model(self): try: return get_associated_model( self.column.importer_type.associated_models.klass, self.target.split('__')) except KeyError: return def get_choices(self): if self.formater_type.formater_type == 'UnknowType' \ and self.column.importer_type.slug: cls = self.column.importer_type.get_importer_class() formt = cls().line_format[self.column.col_number - 1] if hasattr(formt.formater, 'choices'): return [('', '--' * 8)] + list(formt.formater.choices) return [('', '--' * 8)] if self.formater_type.formater_type == 'StrToBoolean': return [('', '--' * 8), ('True', _(u"True")), ('False', _(u"False"))] if not self.associated_model or not hasattr(self.associated_model, 'get_types'): return [] return self.associated_model.get_types() class TargetKeyGroup(models.Model): """ Group of target keys for imports. """ name = models.TextField(_(u"Name"), unique=True) all_user_can_use = models.BooleanField(_(u"All users can use it"), default=False) all_user_can_modify = models.BooleanField(_(u"All users can modify it"), default=False) available = models.BooleanField(_(u"Available"), default=True) class Meta: verbose_name = _(u"Importer - Target key group") verbose_name_plural = _(u"Importer - Target key groups") def __unicode__(self): return self.name class TargetKey(models.Model): """ User's link between import source and ishtar database. Also temporary used for GeneralType to point missing link before adding them in ItemKey table. A targetkey connection can be create to be applied to one particular import (associated_import), one particular user (associated_user), one particular group (associated_group) or to all imports (associated_import, associated_user and associated_group are empty). """ target = models.ForeignKey(ImportTarget, related_name='keys') key = models.TextField(_(u"Key")) value = models.TextField(_(u"Value"), blank=True, null=True) is_set = models.BooleanField(_(u"Is set"), default=False) associated_import = models.ForeignKey('Import', blank=True, null=True) associated_user = models.ForeignKey('IshtarUser', blank=True, null=True) associated_group = models.ForeignKey(TargetKeyGroup, blank=True, null=True) class Meta: unique_together = ('target', 'key', 'associated_user', 'associated_import',) verbose_name = _(u"Importer - Target key") verbose_name_plural = _(u"Importer - Targets keys") ordering = ('target', 'key') def __unicode__(self): return u" - ".join([unicode(self.target), self.key[:50]]) def column_nb(self): # for the admin return self.target.column.col_number def importer_type(self): # for the admin return self.target.column.importer_type.name def format(self): if not self.is_set: return None if self.target.formater_type.formater_type == 'StrToBoolean': if self.value in ('False', '0'): return False elif self.value: return True return return self.value def save(self, *args, **kwargs): obj = super(TargetKey, self).save(*args, **kwargs) if not self.value: return obj associated_model = self.target.associated_model if associated_model and hasattr(self.target.associated_model, "add_key"): v = None # pk is given try: v = self.target.associated_model.objects.get( pk=unicode(int(self.value))) except (ValueError, self.target.associated_model.DoesNotExist): # try with txt_idx try: v = self.target.associated_model.objects.get( txt_idx=unicode(self.value)) except self.target.associated_model.DoesNotExist: pass if v: keys = {} if self.associated_group: keys['group'] = self.associated_group if self.associated_user: keys['user'] = self.associated_user else: keys['importer'] = self.associated_import v.add_key(self.key, **keys) return obj TARGET_MODELS = [ ('OrganizationType', _(u"Organization type")), ('ishtar_common.models.OrganizationType', _(u"Organization type")), ('TitleType', _(u"Title")), ('SourceType', _(u"Source type")), ('AuthorType', _(u"Author type")), ('Format', _(u"Format")), ('archaeological_operations.models.OperationType', _(u"Operation type")), ('archaeological_operations.models.Period', _(u"Period")), ('archaeological_operations.models.ReportState', _(u"Report state")), ('archaeological_operations.models.RemainType', _(u"Remain type")), ('archaeological_context_records.models.Unit', _(u"Unit")), ('archaeological_context_records.models.ActivityType', _(u"Activity type")), ('archaeological_context_records.models.DocumentationType', _(u"Documentation type")), ('archaeological_finds.models.MaterialType', _(u"Material")), ('archaeological_finds.models.ConservatoryState', _(u"Conservatory state")), ('archaeological_warehouse.models.ContainerType', _(u"Container type")), ('archaeological_warehouse.models.WarehouseDivision', _(u"Warehouse division")), ('archaeological_warehouse.models.WarehouseType', _(u"Warehouse type")), ('archaeological_finds.models.TreatmentType', _(u"Treatment type")), ('archaeological_finds.models.TreatmentEmergencyType', _(u"Treatment emergency type")), ('archaeological_finds.models.ObjectType', _(u"Object type")), ('archaeological_finds.models.IntegrityType', _(u"Integrity type")), ('archaeological_finds.models.RemarkabilityType', _(u"Remarkability type")), ('archaeological_finds.models.AlterationType', _(u"Alteration type")), ('archaeological_finds.models.AlterationCauseType', _(u"Alteration cause type")), ('archaeological_finds.models.BatchType', _(u"Batch type")), ('archaeological_finds.models.CheckedType', _(u"Checked type")), ('archaeological_context_records.models.IdentificationType', _("Identification type")), ('archaeological_context_records.models.RelationType', _(u"Context record relation type")), ('SpatialReferenceSystem', _(u"Spatial reference system")), ('SupportType', _(u"Support type")), ('TitleType', _(u"Title type")), ] TARGET_MODELS_KEYS = [tm[0] for tm in TARGET_MODELS] IMPORTER_TYPES = ( ('IntegerFormater', _(u"Integer")), ('FloatFormater', _(u"Float")), ('UnicodeFormater', _(u"String")), ('DateFormater', _(u"Date")), ('TypeFormater', _(u"Type")), ('YearFormater', _(u"Year")), ('InseeFormater', _(u"INSEE code")), ('StrToBoolean', _(u"String to boolean")), ('FileFormater', pgettext_lazy("filesystem", u"File")), ('UnknowType', _(u"Unknow type")) ) IMPORTER_TYPES_DCT = { 'IntegerFormater': IntegerFormater, 'FloatFormater': FloatFormater, 'UnicodeFormater': UnicodeFormater, 'DateFormater': DateFormater, 'TypeFormater': TypeFormater, 'YearFormater': YearFormater, 'StrToBoolean': StrToBoolean, 'FileFormater': FileFormater, 'InseeFormater': InseeFormater, 'UnknowType': None, } DATE_FORMATS = ( ('%Y', _(u"4 digit year. e.g.: \"2015\"")), ('%Y/%m/%d', _(u"4 digit year/month/day. e.g.: \"2015/02/04\"")), ('%d/%m/%Y', _(u"Day/month/4 digit year. e.g.: \"04/02/2015\"")), ) IMPORTER_TYPES_CHOICES = {'TypeFormater': TARGET_MODELS, 'DateFormater': DATE_FORMATS} class FormaterTypeManager(models.Manager): def get_by_natural_key(self, formater_type, options, many_split): return self.get(formater_type=formater_type, options=options, many_split=many_split) class FormaterType(models.Model): formater_type = models.CharField(u"Formater type", max_length=20, choices=IMPORTER_TYPES) options = models.CharField(_(u"Options"), max_length=500, blank=True, null=True) many_split = models.CharField(_(u"Split character(s)"), max_length=10, blank=True, null=True) objects = FormaterTypeManager() class Meta: verbose_name = _(u"Importer - Formater type") verbose_name_plural = _(u"Importer - Formater types") unique_together = ('formater_type', 'options', 'many_split') ordering = ('formater_type', 'options') def natural_key(self): return self.formater_type, self.options, self.many_split def __unicode__(self): return u" - ".join( [unicode(dict(IMPORTER_TYPES)[self.formater_type]) if self.formater_type in IMPORTER_TYPES_DCT else ''] + [getattr(self, k) for k in ('options', 'many_split') if getattr(self, k)]) def get_choices(self): if self.format_type in IMPORTER_TYPES_CHOICES: return IMPORTER_TYPES_CHOICES[self.format_type] def get_formater_type(self, target, import_instance=None): if self.formater_type not in IMPORTER_TYPES_DCT.keys(): return kwargs = {'db_target': target, 'import_instance': import_instance} if self.many_split: kwargs['many_split'] = self.many_split if self.formater_type == 'TypeFormater': if self.options not in TARGET_MODELS_KEYS: logger.warning( "**WARN FormaterType.get_formater_type**: {} " "is not in TARGET_MODELS_KEYS".format(self.options)) return if self.options in dir(): model = dir()[self.options] else: model = import_class(self.options) return TypeFormater(model, **kwargs) elif self.formater_type == 'UnicodeFormater': if self.options: try: return UnicodeFormater(int(self.options.strip()), **kwargs) except ValueError: pass return UnicodeFormater(**kwargs) elif self.formater_type == 'DateFormater': date_formats = self.options if self.many_split: date_formats = self.options.split(kwargs.pop('many_split')) return DateFormater(date_formats, **kwargs) elif self.formater_type == 'StrToBoolean': return StrToBoolean(**kwargs) elif self.formater_type == 'UnknowType': return else: return IMPORTER_TYPES_DCT[self.formater_type](**kwargs) IMPORT_STATE = ( ("C", _(u"Created")), ("AP", _(u"Analyse in progress")), ("A", _(u"Analysed")), ("HQ", _(u"Check modified in queue")), ("IQ", _(u"Import in queue")), ("HP", _(u"Check modified in progress")), ("IP", _(u"Import in progress")), ("PI", _(u"Partially imported")), ("FE", _(u"Finished with errors")), ("F", _(u"Finished")), ("AC", _(u"Archived")), ) IMPORT_STATE_DCT = dict(IMPORT_STATE) ENCODINGS = [(settings.ENCODING, settings.ENCODING), (settings.ALT_ENCODING, settings.ALT_ENCODING), ('utf-8', 'utf-8')] delayed_import = None delayed_check = None if settings.USE_BACKGROUND_TASK: @background(schedule=1) def delayed_import(import_pk, session_key): try: imp = Import.objects.get(pk=import_pk) except Import.DoesNotExist: pass imp.importation(session_key=session_key) @background(schedule=1) def delayed_check(import_pk, session_key): try: imp = Import.objects.get(pk=import_pk) except Import.DoesNotExist: pass imp.check_modified(session_key=session_key) class Import(models.Model): user = models.ForeignKey('IshtarUser', blank=True, null=True, on_delete=models.SET_NULL) name = models.CharField(_(u"Name"), max_length=500, null=True) importer_type = models.ForeignKey(ImporterType) imported_file = models.FileField( _(u"Imported file"), upload_to="upload/imports/%Y/%m/", max_length=220) imported_images = models.FileField( _(u"Associated images (zip file)"), upload_to="upload/imports/%Y/%m/", blank=True, null=True, max_length=220) associated_group = models.ForeignKey( TargetKeyGroup, blank=True, null=True, help_text=_(u"If a group is selected, target key saved in this group " u"will be used.") ) encoding = models.CharField(_(u"Encoding"), choices=ENCODINGS, default=u'utf-8', max_length=15) skip_lines = models.IntegerField( _(u"Skip lines"), default=1, help_text=_(u"Number of header lines in your file (can be 0).")) error_file = models.FileField(_(u"Error file"), upload_to="upload/imports/%Y/%m/", blank=True, null=True, max_length=255) result_file = models.FileField(_(u"Result file"), upload_to="upload/imports/%Y/%m/", blank=True, null=True, max_length=255) match_file = models.FileField(_(u"Match file"), upload_to="upload/imports/%Y/%m/", blank=True, null=True, max_length=255) state = models.CharField(_(u"State"), max_length=2, choices=IMPORT_STATE, default=u'C') conservative_import = models.BooleanField( _(u"Conservative import"), default=False, help_text=_(u'If set to true, do not overload existing values.')) creation_date = models.DateTimeField( _(u"Creation date"), auto_now_add=True, blank=True, null=True) end_date = models.DateTimeField(_(u"End date"), auto_now_add=True, blank=True, null=True, editable=False) seconds_remaining = models.IntegerField( _(u"Remaining seconds"), blank=True, null=True, editable=False) # used by step by step import current_line = models.IntegerField(_(u"Current line"), blank=True, null=True) number_of_line = models.IntegerField(_(u"Number of line"), blank=True, null=True) imported_line_numbers = models.TextField( _(u"Imported line numbers"), blank=True, null=True, validators=[validate_comma_separated_integer_list] ) changed_checked = models.BooleanField(_(u"Changed have been checked"), default=False) changed_line_numbers = models.TextField( _(u"Changed line numbers"), blank=True, null=True, validators=[validate_comma_separated_integer_list] ) class Meta: verbose_name = _(u"Import") verbose_name_plural = _(u"Imports") def __unicode__(self): return u"{} | {}".format(self.name or u"-", self.importer_type) def need_matching(self): return bool(TargetKey.objects.filter(associated_import=self, is_set=False).count()) @property def errors(self): if not self.error_file: return [] errors = [] with open(self.error_file.path, 'rb') as csvfile: reader = csv.DictReader(csvfile, fieldnames=['line', 'column', 'error']) reader.next() # pass the header for row in reader: errors.append(row) return errors def get_number_of_lines(self): if self.number_of_line: return self.number_of_line if not self.imported_file or not self.imported_file.path: return filename = self.imported_file.path with open(filename, 'r') as f: reader = unicodecsv.reader( f, encoding=self.encoding) nb = sum(1 for row in reader) - self.skip_lines self.number_of_line = nb self.save() return nb def add_imported_line(self, idx_line): if self.imported_line_numbers and \ str(idx_line) in self.imported_line_numbers.split(','): return if self.imported_line_numbers: self.imported_line_numbers += "," else: self.imported_line_numbers = "" self.imported_line_numbers += str(idx_line) self.save() def add_changed_line(self, idx_line): if self.changed_line_numbers and \ str(idx_line) in self.changed_line_numbers.split(','): return if self.changed_line_numbers: self.changed_line_numbers += "," else: self.changed_line_numbers = "" self.changed_line_numbers += str(idx_line) self.save() def remove_changed_line(self, idx_line): if not self.changed_line_numbers: return line_numbers = self.changed_line_numbers.split(',') if str(idx_line) not in line_numbers: return line_numbers.pop(line_numbers.index(str(idx_line))) self.changed_line_numbers = ",".join(line_numbers) self.save() def has_changes(self, idx_line): if not self.changed_checked: return True if not self.changed_line_numbers: return line_numbers = self.changed_line_numbers.split(',') return str(idx_line) in line_numbers def line_is_imported(self, idx_line): return self.imported_line_numbers and \ str(idx_line) in self.imported_line_numbers.split(',') def get_actions(self): """ Get available action relevant with the current status """ from ishtar_common.models import IshtarSiteProfile profile = IshtarSiteProfile.get_current_profile() actions = [] if self.state == 'C': actions.append(('A', _(u"Analyse"))) if self.state in ('A', 'PI'): actions.append(('A', _(u"Re-analyse"))) actions.append(('I', _(u"Launch import"))) if profile.experimental_feature: if self.changed_checked: actions.append(('IS', _(u"Step by step import"))) actions.append(('CH', _(u"Re-check for changes"))) else: actions.append(('CH', _(u"Check for changes"))) if self.state in ('F', 'FE'): actions.append(('A', _(u"Re-analyse"))) actions.append(('I', _(u"Re-import"))) if profile.experimental_feature: if self.changed_checked: actions.append(('IS', _(u"Step by step re-import"))) actions.append(('CH', _(u"Re-check for changes"))) else: actions.append(('CH', _(u"Check for changes"))) actions.append(('AC', _(u"Archive"))) if self.state == 'AC': actions.append(('A', _(u"Unarchive"))) actions.append(('D', _(u"Delete"))) return actions @property def imported_filename(self): return self.imported_file.name.split(os.sep)[-1] @property def status(self): if self.state not in IMPORT_STATE_DCT: return "" return IMPORT_STATE_DCT[self.state] def get_importer_instance(self): return self.importer_type.get_importer_class(import_instance=self)( skip_lines=self.skip_lines, import_instance=self, conservative_import=self.conservative_import) @property def data_table(self): imported_file = self.imported_file.path tmpdir = None if zipfile.is_zipfile(imported_file): z = zipfile.ZipFile(imported_file) filename = None for name in z.namelist(): # get first CSV file found if name.endswith('.csv'): filename = name break if not filename: return [] tmpdir = tempfile.mkdtemp(prefix='tmp-ishtar-') imported_file = z.extract(filename, tmpdir) encodings = [self.encoding] encodings += [coding for coding, c in ENCODINGS if coding != self.encoding] for encoding in encodings: try: with open(imported_file) as csv_file: vals = [line for line in unicodecsv.reader(csv_file, encoding=encoding)] if tmpdir: shutil.rmtree(tmpdir) return vals except UnicodeDecodeError: pass # try the next encoding except unicodecsv.Error: raise ImporterError(_(u"Error in the CSV file.")) if tmpdir: shutil.rmtree(tmpdir) return [] def initialize(self, user=None, session_key=None): self.state = 'AP' self.end_date = datetime.datetime.now() self.save() try: self.get_importer_instance().initialize( self.data_table, user=user, output='db') except ImporterError as e: if session_key: put_session_message(session_key, e.msg, "danger") self.state = 'C' self.save() return self.state = 'A' self.end_date = datetime.datetime.now() self.save() def delayed_check_modified(self, session_key): if not settings.USE_BACKGROUND_TASK: return self.check_modified(session_key=session_key) put_session_message( session_key, unicode( _(u"Modification check {} added to the queue") ).format(self.name), "info") self.state = 'HQ' self.end_date = datetime.datetime.now() self.save() return delayed_check(self.pk, session_key) def check_modified(self, session_key=None): self.state = 'HP' self.end_date = datetime.datetime.now() self.changed_line_numbers = "" self.changed_checked = False self.save() for idx in range(self.skip_lines, self.get_number_of_lines() + 1): try: imprt, data = self.importation( simulate=True, line_to_process=idx, return_importer_and_data=True ) except IOError as e: # error is identified as a change self.add_changed_line(idx) continue # no data is not normal and an error is identified as a change if not data or not data[0]: self.add_changed_line(idx) continue # new objects is a change if imprt.new_objects: self.add_changed_line(idx) continue # check all updated fields changed = False for path, obj, values, updated_values in imprt.updated_objects: if changed: break for k in updated_values.keys(): if changed: break current_value = getattr(obj, k) updated_value = updated_values[k] if hasattr(current_value, 'all'): current_value = list(current_value.all()) changed = False for v in updated_value: if v not in current_value: changed = True break else: if current_value != updated_value: changed = True break if changed: self.add_changed_line(idx) continue self.remove_changed_line(idx) self.changed_checked = True self.save() def delayed_importation(self, session_key): if not settings.USE_BACKGROUND_TASK: return self.importation(session_key=session_key) put_session_message( session_key, unicode(_(u"Import {} added to the queue")).format(self.name), "info") self.state = 'IQ' self.end_date = datetime.datetime.now() self.save() return delayed_import(self.pk, session_key) def importation(self, session_key=None, line_to_process=None, simulate=False, return_importer_and_data=False): self.state = 'IP' self.end_date = datetime.datetime.now() self.save() importer = self.get_importer_instance() try: data = importer.importation( self.data_table, user=self.user, line_to_process=line_to_process, simulate=simulate) except IOError: error_message = unicode(_(u"Error on imported file: {}")).format( self.imported_file) importer.errors = [error_message] if session_key: put_session_message(session_key, error_message, "warning") ids = get_session_var(session_key, 'current_import_id') if not ids: ids = [] ids.append(self.pk) put_session_var(session_key, 'current_import_id', ids) if line_to_process: self.state = 'PI' else: self.state = 'FE' self.save() if not return_importer_and_data: return return importer, None # result file filename = slugify(self.importer_type.name) now = datetime.datetime.now().isoformat('-').replace(':', '') result_file = filename + "_result_%s.csv" % now self.result_file.save( result_file, ContentFile(importer.get_csv_result().encode('utf-8'))) if importer.errors: if line_to_process: self.state = 'PI' else: self.state = 'FE' error_file = filename + "_errors_%s.csv" % now self.error_file.save( error_file, ContentFile(importer.get_csv_errors().encode('utf-8')) ) msg = unicode(_(u"Import {} finished with errors")).format( self.name) msg_cls = "warning" else: if line_to_process: self.state = 'PI' else: self.state = 'F' self.error_file = None msg = unicode(_(u"Import {} finished with no errors")).format( self.name) msg_cls = "primary" if session_key: put_session_message(session_key, msg, msg_cls) ids = self.request.session['current_import_id'] \ if 'current_import_id' in self.request.session else [] ids.append(self.pk) put_session_var(session_key, 'current_import_id', ids) if importer.match_table: match_file = filename + "_match_%s.csv" % now self.match_file.save( match_file, ContentFile(importer.get_csv_matches().encode('utf-8')) ) self.end_date = datetime.datetime.now() self.save() if return_importer_and_data: return importer, data def archive(self): self.state = 'AC' self.end_date = datetime.datetime.now() self.save() def get_all_imported(self): imported = [] for related, zorg in get_all_related_m2m_objects_with_model(self): accessor = related.get_accessor_name() imported += [(accessor, obj) for obj in getattr(self, accessor).all()] return imported def pre_delete_import(sender, **kwargs): # deleted imported items when an import is delete instance = kwargs.get('instance') if not instance: return to_delete = [] for accessor, imported in instance.get_all_imported(): to_delete.append(imported) for item in to_delete: item.delete() pre_delete.connect(pre_delete_import, sender=Import)