diff options
author | Étienne Loks <etienne.loks@iggdrasil.net> | 2017-08-21 18:23:29 +0200 |
---|---|---|
committer | Étienne Loks <etienne.loks@iggdrasil.net> | 2017-08-21 18:23:29 +0200 |
commit | 1848a2fcab0d22056a2979459093a54c71f4060d (patch) | |
tree | 0d2270c4b34eab262d9ab1f3936c5fe4f8942289 /ishtar_common/models_imports.py | |
parent | 644a56222e5c937feb1c23eef95a4906840960bb (diff) | |
download | Ishtar-1848a2fcab0d22056a2979459093a54c71f4060d.tar.bz2 Ishtar-1848a2fcab0d22056a2979459093a54c71f4060d.zip |
Models: refactoring move all import models to a specific file
Diffstat (limited to 'ishtar_common/models_imports.py')
-rw-r--r-- | ishtar_common/models_imports.py | 912 |
1 files changed, 912 insertions, 0 deletions
diff --git a/ishtar_common/models_imports.py b/ishtar_common/models_imports.py new file mode 100644 index 000000000..dcb02c27e --- /dev/null +++ b/ishtar_common/models_imports.py @@ -0,0 +1,912 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright (C) 2017 Étienne Loks <etienne.loks_AT_peacefrogsDOTnet> + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +# See the file COPYING for details. + +import csv +import datetime +from importlib import import_module +import os +import logging +import shutil +import re +import tempfile +import unicodecsv +import zipfile + +from django.conf import settings +from django.contrib.gis.db import models +from django.core.exceptions import SuspiciousOperation +from django.core.files import File +from django.db.models.base import ModelBase +from django.db.models.signals import pre_delete +from django.template.defaultfilters import slugify +from django.utils.translation import ugettext_lazy as _, pgettext_lazy + +from ishtar_common.utils import create_slug +from ishtar_common.data_importer import Importer, ImportFormater, \ + IntegerFormater, FloatFormater, UnicodeFormater, DateFormater, \ + TypeFormater, YearFormater, StrToBoolean, FileFormater + +logger = logging.getLogger(__name__) + +IMPORTER_CLASSES = {} + +IMPORTER_CLASSES.update({ + 'sra-pdl-files': + 'archaeological_files.data_importer.FileImporterSraPdL'}) + + +def get_model_fields(model): + """ + Return a dict of fields from model + To be replace in Django 1.8 with get_fields, get_field + """ + fields = {} + options = model._meta + for field in sorted(options.fields + options.many_to_many): + fields[field.name] = field + if hasattr(model, 'get_extra_fields'): + fields.update(model.get_extra_fields()) + return fields + + +def import_class(full_path_classname): + """ + Return the model class from the full path + TODO: add a white list for more security + """ + mods = full_path_classname.split('.') + if len(mods) == 1: + mods = ['ishtar_common', 'models', mods[0]] + elif 'models' not in mods and 'models_finds' not in mods \ + and 'models_treatments' not in mods: + raise SuspiciousOperation( + u"Try to import a non model from a string") + module = import_module('.'.join(mods[:-1])) + return getattr(module, mods[-1]) + + +class ImportModelManager(models.Manager): + def get_by_natural_key(self, klass): + return self.get(klass=klass) + + +class ImporterModel(models.Model): + name = models.CharField(_(u"Name"), max_length=200) + klass = models.CharField(_(u"Class name"), max_length=200, unique=True) + objects = ImportModelManager() + + class Meta: + verbose_name = _(u"Importer - Model") + verbose_name_plural = _(u"Importer - Models") + ordering = ('name',) + + def __unicode__(self): + return self.name + + def natural_key(self): + return (self.klass, ) + + +class ImporterTypeManager(models.Manager): + def get_by_natural_key(self, slug): + return self.get(slug=slug) + + +class ImporterType(models.Model): + """ + Description of a table to be mapped with ishtar database + """ + name = models.CharField(_(u"Name"), blank=True, null=True, + max_length=100) + slug = models.SlugField(_(u"Slug"), unique=True, max_length=100, + blank=True, null=True) + description = models.CharField(_(u"Description"), blank=True, null=True, + max_length=500) + users = models.ManyToManyField('IshtarUser', verbose_name=_(u"Users"), + blank=True) + associated_models = models.ForeignKey( + ImporterModel, verbose_name=_(u"Associated model"), + related_name='+', blank=True, null=True) + created_models = models.ManyToManyField( + ImporterModel, verbose_name=_(u"Models that can accept new items"), + blank=True, help_text=_(u"Leave blank for no restrictions"), + related_name='+') + is_template = models.BooleanField(_(u"Is template"), default=False) + unicity_keys = models.CharField(_(u"Unicity keys (separator \";\")"), + blank=True, null=True, max_length=500) + objects = ImporterTypeManager() + + class Meta: + verbose_name = _(u"Importer - Type") + verbose_name_plural = _(u"Importer - Types") + ordering = ('name',) + + def natural_key(self): + return (self.slug, ) + + def __unicode__(self): + return self.name + + def get_importer_class(self, import_instance=None): + if self.slug and self.slug in IMPORTER_CLASSES: + cls = import_class(IMPORTER_CLASSES[self.slug]) + return cls + OBJECT_CLS = import_class(self.associated_models.klass) + DEFAULTS = dict([(default.keys, default.values) + for default in self.defaults.all()]) + LINE_FORMAT = [] + idx = 0 + for column in self.columns.order_by('col_number').all(): + idx += 1 + while column.col_number > idx: + LINE_FORMAT.append(None) + idx += 1 + targets = [] + formater_types = [] + nb = column.targets.count() + if not nb: + LINE_FORMAT.append(None) + continue + force_news = [] + concat_str = [] + for target in column.targets.all(): + ft = target.formater_type.get_formater_type( + target, import_instance=import_instance) + if not ft: + continue + formater_types.append(ft) + targets.append(target.target) + concat_str.append(target.concat_str) + force_news.append(target.force_new) + formater_kwargs = {} + if column.regexp_pre_filter: + formater_kwargs['regexp'] = re.compile( + column.regexp_pre_filter.regexp) + formater_kwargs['concat_str'] = concat_str + formater_kwargs['duplicate_fields'] = [ + (field.field_name, field.force_new, field.concat, + field.concat_str) + for field in column.duplicate_fields.all()] + formater_kwargs['label'] = column.label + formater_kwargs['required'] = column.required + formater_kwargs['force_new'] = force_news + if column.export_field_name: + formater_kwargs['export_field_name'] = [ + column.export_field_name] + formater = ImportFormater(targets, formater_types, + **formater_kwargs) + LINE_FORMAT.append(formater) + UNICITY_KEYS = [] + if self.unicity_keys: + UNICITY_KEYS = [un.strip() for un in self.unicity_keys.split(';')] + MODEL_CREATION_LIMIT = [] + for modls in self.created_models.all(): + MODEL_CREATION_LIMIT.append(import_class(modls.klass)) + args = {'OBJECT_CLS': OBJECT_CLS, 'DESC': self.description, + 'DEFAULTS': DEFAULTS, 'LINE_FORMAT': LINE_FORMAT, + 'UNICITY_KEYS': UNICITY_KEYS, + 'MODEL_CREATION_LIMIT': MODEL_CREATION_LIMIT} + name = str(''.join( + x for x in slugify(self.name).replace('-', ' ').title() + if not x.isspace())) + newclass = type(name, (Importer,), args) + return newclass + + def save(self, *args, **kwargs): + if not self.slug: + self.slug = create_slug(ImporterType, self.name) + return super(ImporterType, self).save(*args, **kwargs) + + +def get_associated_model(parent_model, keys): + model = None + if isinstance(parent_model, unicode) or \ + isinstance(parent_model, str): + OBJECT_CLS = import_class(parent_model) + else: + OBJECT_CLS = parent_model + for idx, item in enumerate(keys): + if not idx: + field = get_model_fields(OBJECT_CLS)[item] + if hasattr(field, 'rel') and hasattr(field.rel, 'to'): + model = field.rel.to + if type(field) == ModelBase: + model = field + else: + return get_associated_model(model, keys[1:]) + return model + + +class ImporterDefaultManager(models.Manager): + def get_by_natural_key(self, importer_type, target): + return self.get(importer_type__slug=importer_type, target=target) + + +class ImporterDefault(models.Model): + """ + Targets of default values in an import + """ + importer_type = models.ForeignKey(ImporterType, related_name='defaults') + target = models.CharField(u"Target", max_length=500) + + class Meta: + verbose_name = _(u"Importer - Default") + verbose_name_plural = _(u"Importer - Defaults") + unique_together = ('importer_type', 'target') + objects = ImporterDefaultManager() + + def __unicode__(self): + return u"{} - {}".format(self.importer_type, self.target) + + def natural_key(self): + return self.importer_type.slug, self.target + + @property + def keys(self): + return tuple(self.target.split('__')) + + @property + def associated_model(self): + return get_associated_model(self.importer_type.associated_models.klass, + self.keys) + + @property + def values(self): + values = {} + for default_value in self.default_values.all(): + values[default_value.target] = default_value.get_value() + return values + + +class ImporterDefaultValuesManager(models.Manager): + def get_by_natural_key(self, def_target_type, def_target, target): + return self.get(default_target__importer_type__slug=def_target_type, + default_target__target=def_target, + target=target) + + +class ImporterDefaultValues(models.Model): + """ + Default values in an import + """ + default_target = models.ForeignKey(ImporterDefault, + related_name='default_values') + target = models.CharField(u"Target", max_length=500) + value = models.CharField(u"Value", max_length=500) + objects = ImporterDefaultValuesManager() + + def __unicode__(self): + return u"{} - {}".format(self.default_target, self.target, self.value) + + class Meta: + verbose_name = _(u"Importer - Default value") + verbose_name_plural = _(u"Importer - Default values") + + def natural_key(self): + return (self.default_target.importer_type.slug, + self.default_target.target, + self.target) + + def get_value(self): + parent_model = self.default_target.associated_model + if not parent_model: + return self.value + fields = get_model_fields(parent_model) + target = self.target.strip() + if target not in fields: + return + field = fields[target] + if not hasattr(field, 'rel') or not hasattr(field.rel, 'to'): + return + model = field.rel.to + # if value is an id + try: + return model.objects.get(pk=int(self.value)) + except (ValueError, model.DoesNotExist): + pass + # try with txt_idx + try: + return model.objects.get(txt_idx=self.value) + except (ValueError, model.DoesNotExist): + pass + return "" + + +class ImporterColumnManager(models.Manager): + def get_by_natural_key(self, importer_type, col_number): + return self.get(importer_type__slug=importer_type, + col_number=col_number) + + +class ImporterColumn(models.Model): + """ + Import file column description + """ + label = models.CharField(_(u"Label"), blank=True, null=True, + max_length=200) + importer_type = models.ForeignKey(ImporterType, related_name='columns') + col_number = models.IntegerField(_(u"Column number"), default=1) + description = models.TextField(_("Description"), blank=True, null=True) + regexp_pre_filter = models.ForeignKey("Regexp", blank=True, null=True) + required = models.BooleanField(_(u"Required"), default=False) + export_field_name = models.CharField( + _(u"Export field name"), blank=True, null=True, max_length=200, + help_text=_(u"Fill this field if the field name is ambiguous for " + u"export. For instance: concatenated fields.") + ) + objects = ImporterColumnManager() + + class Meta: + verbose_name = _(u"Importer - Column") + verbose_name_plural = _(u"Importer - Columns") + ordering = ('importer_type', 'col_number') + unique_together = ('importer_type', 'col_number') + + def __unicode__(self): + return u"{} - {}".format(self.importer_type, self.col_number) + + def natural_key(self): + return self.importer_type.slug, self.col_number + + def targets_lbl(self): + return u', '.join([target.target for target in self.targets.all()]) + + def duplicate_fields_lbl(self): + return u', '.join([dp.field_name + for dp in self.duplicate_fields.all()]) + + +class ImporterDuplicateFieldManager(models.Manager): + def get_by_natural_key(self, importer_type, col_number, field_name): + return self.get(column__importer_type__slug=importer_type, + column__col_number=col_number, + field_name=field_name) + + +class ImporterDuplicateField(models.Model): + """ + Direct copy of result in other fields + """ + column = models.ForeignKey(ImporterColumn, related_name='duplicate_fields') + field_name = models.CharField(_(u"Field name"), blank=True, null=True, + max_length=200) + force_new = models.BooleanField(_(u"Force creation of new items"), + default=False) + concat = models.BooleanField(_(u"Concatenate with existing"), + default=False) + concat_str = models.CharField(_(u"Concatenate character"), max_length=5, + blank=True, null=True) + objects = ImporterDuplicateFieldManager() + + class Meta: + verbose_name = _(u"Importer - Duplicate field") + verbose_name_plural = _(u"Importer - Duplicate fields") + ordering = ('column', 'field_name') + + def natural_key(self): + return self.column.importer_type, self.column.col_number, \ + self.field_name + + +class NamedManager(models.Manager): + def get_by_natural_key(self, name): + return self.get(name=name) + + +class Regexp(models.Model): + name = models.CharField(_(u"Name"), max_length=100, unique=True) + description = models.CharField(_(u"Description"), blank=True, null=True, + max_length=500) + regexp = models.CharField(_(u"Regular expression"), max_length=500) + objects = NamedManager() + + class Meta: + verbose_name = _(u"Importer - Regular expression") + verbose_name_plural = _(u"Importer - Regular expressions") + + def __unicode__(self): + return self.name + + def natural_key(self): + return (self.name, ) + + +class ImportTargetManager(models.Manager): + def get_by_natural_key(self, importer_type, col_number, target): + return self.get(column__importer_type__slug=importer_type, + column__col_number=col_number, + target=target) + + +class ImportTarget(models.Model): + """ + Ishtar database target for a column + """ + column = models.ForeignKey(ImporterColumn, related_name='targets') + target = models.CharField(u"Target", max_length=500) + regexp_filter = models.ForeignKey("Regexp", blank=True, null=True) + formater_type = models.ForeignKey("FormaterType") + force_new = models.BooleanField(_(u"Force creation of new items"), + default=False) + concat = models.BooleanField(_(u"Concatenate with existing"), + default=False) + concat_str = models.CharField(_(u"Concatenate character"), max_length=5, + blank=True, null=True) + comment = models.TextField(_(u"Comment"), blank=True, null=True) + objects = ImportTargetManager() + + class Meta: + verbose_name = _(u"Importer - Target") + verbose_name_plural = _(u"Importer - Targets") + unique_together = ('column', 'target') + + def __unicode__(self): + return self.target[:50] if self.target else self.comment + + def natural_key(self): + return self.column.importer_type.slug, self.column.col_number, \ + self.target + + @property + def associated_model(self): + try: + return get_associated_model( + self.column.importer_type.associated_models.klass, + self.target.split('__')) + except KeyError: + return + + def get_choices(self): + if self.formater_type.formater_type == 'UnknowType' \ + and self.column.importer_type.slug: + cls = self.column.importer_type.get_importer_class() + formt = cls().line_format[self.column.col_number - 1] + if hasattr(formt.formater, 'choices'): + return [('', '--' * 8)] + list(formt.formater.choices) + return [('', '--' * 8)] + if self.formater_type.formater_type == 'StrToBoolean': + return [('', '--' * 8), + ('True', _(u"True")), + ('False', _(u"False"))] + if not self.associated_model or not hasattr(self.associated_model, + 'get_types'): + return [] + return self.associated_model.get_types() + + +class TargetKey(models.Model): + """ + User's link between import source and ishtar database. + Also temporary used for GeneralType to point missing link before adding + them in ItemKey table. + A targetkey connection can be create to be applied to on particular + import (associated_import), one particular user (associated_user) or to all + imports (associated_import and associated_user are empty). + """ + target = models.ForeignKey(ImportTarget, related_name='keys') + key = models.TextField(_(u"Key")) + value = models.TextField(_(u"Value"), blank=True, null=True) + is_set = models.BooleanField(_(u"Is set"), default=False) + associated_import = models.ForeignKey('Import', blank=True, null=True) + associated_user = models.ForeignKey('IshtarUser', blank=True, null=True) + + class Meta: + unique_together = ('target', 'key', 'associated_user', + 'associated_import') + verbose_name = _(u"Importer - Target key") + verbose_name_plural = _(u"Importer - Targets keys") + + def __unicode__(self): + return u" - ".join([unicode(self.target), self.key[:50]]) + + def column_nb(self): + # for the admin + return self.target.column.col_number + + def importer_type(self): + # for the admin + return self.target.column.importer_type.name + + def format(self): + if not self.is_set: + return None + if self.target.formater_type.formater_type == 'StrToBoolean': + if self.value in ('False', '0'): + return False + elif self.value: + return True + return + return self.value + + def save(self, *args, **kwargs): + obj = super(TargetKey, self).save(*args, **kwargs) + if not self.value: + return obj + associated_model = self.target.associated_model + if associated_model and hasattr(self.target.associated_model, + "add_key"): + v = None + # pk is given + try: + v = self.target.associated_model.objects.get( + pk=unicode(int(self.value))) + except (ValueError, self.target.associated_model.DoesNotExist): + # try with txt_idx + try: + v = self.target.associated_model.objects.get( + txt_idx=unicode(self.value)) + except self.target.associated_model.DoesNotExist: + pass + if v: + v.add_key(self.key, importer=self.associated_import) + return obj + +TARGET_MODELS = [ + ('OrganizationType', _(u"Organization type")), + ('TitleType', _(u"Title")), + ('SourceType', _(u"Source type")), + ('AuthorType', _(u"Author type")), + ('Format', _(u"Format")), + ('archaeological_operations.models.OperationType', _(u"Operation type")), + ('archaeological_operations.models.Period', _(u"Period")), + ('archaeological_operations.models.ReportState', _(u"Report state")), + ('archaeological_operations.models.RemainType', _(u"Remain type")), + ('archaeological_context_records.models.Unit', _(u"Unit")), + ('archaeological_context_records.models.ActivityType', + _(u"Activity type")), + ('archaeological_context_records.models.DocumentationType', + _(u"Documentation type")), + ('archaeological_finds.models.MaterialType', _(u"Material")), + ('archaeological_finds.models.ConservatoryState', + _(u"Conservatory state")), + ('archaeological_warehouse.models.ContainerType', _(u"Container type")), + ('archaeological_finds.models.PreservationType', _(u"Preservation type")), + ('archaeological_finds.models.ObjectType', _(u"Object type")), + ('archaeological_finds.models.IntegrityType', _(u"Integrity type")), + ('archaeological_finds.models.RemarkabilityType', + _(u"Remarkability type")), + ('archaeological_finds.models.BatchType', _(u"Batch type")), + ('archaeological_context_records.models.IdentificationType', + _("Identification type")), + ('archaeological_context_records.models.RelationType', + _(u"Context record relation type")), + ('SpatialReferenceSystem', _(u"Spatial reference system")), + ('SupportType', _(u"Support type")), + ('TitleType', _(u"Title type")), +] + +TARGET_MODELS_KEYS = [tm[0] for tm in TARGET_MODELS] + +IMPORTER_TYPES = ( + ('IntegerFormater', _(u"Integer")), + ('FloatFormater', _(u"Float")), + ('UnicodeFormater', _(u"String")), + ('DateFormater', _(u"Date")), + ('TypeFormater', _(u"Type")), + ('YearFormater', _(u"Year")), + ('StrToBoolean', _(u"String to boolean")), + ('FileFormater', pgettext_lazy("filesystem", u"File")), + ('UnknowType', _(u"Unknow type")) +) + +IMPORTER_TYPES_DCT = { + 'IntegerFormater': IntegerFormater, + 'FloatFormater': FloatFormater, + 'UnicodeFormater': UnicodeFormater, + 'DateFormater': DateFormater, + 'TypeFormater': TypeFormater, + 'YearFormater': YearFormater, + 'StrToBoolean': StrToBoolean, + 'FileFormater': FileFormater, + 'UnknowType': None, +} + +DATE_FORMATS = ( + ('%Y', _(u"4 digit year. e.g.: \"2015\"")), + ('%Y/%m/%d', _(u"4 digit year/month/day. e.g.: \"2015/02/04\"")), + ('%d/%m/%Y', _(u"Day/month/4 digit year. e.g.: \"04/02/2015\"")), +) + +IMPORTER_TYPES_CHOICES = {'TypeFormater': TARGET_MODELS, + 'DateFormater': DATE_FORMATS} + + +class FormaterTypeManager(models.Manager): + def get_by_natural_key(self, formater_type, options, many_split): + return self.get(formater_type=formater_type, + options=options, many_split=many_split) + + +class FormaterType(models.Model): + formater_type = models.CharField(u"Formater type", max_length=20, + choices=IMPORTER_TYPES) + options = models.CharField(_(u"Options"), max_length=500, blank=True, + null=True) + many_split = models.CharField(_(u"Split character(s)"), max_length=10, + blank=True, null=True) + objects = FormaterTypeManager() + + class Meta: + verbose_name = _(u"Importer - Formater type") + verbose_name_plural = _(u"Importer - Formater types") + unique_together = ('formater_type', 'options', 'many_split') + ordering = ('formater_type', 'options') + + def natural_key(self): + return self.formater_type, self.options, self.many_split + + def __unicode__(self): + return u" - ".join( + [unicode(dict(IMPORTER_TYPES)[self.formater_type]) + if self.formater_type in IMPORTER_TYPES_DCT else ''] + + [getattr(self, k) for k in ('options', 'many_split') + if getattr(self, k)]) + + def get_choices(self): + if self.format_type in IMPORTER_TYPES_CHOICES: + return IMPORTER_TYPES_CHOICES[self.format_type] + + def get_formater_type(self, target, import_instance=None): + if self.formater_type not in IMPORTER_TYPES_DCT.keys(): + return + kwargs = {'db_target': target, 'import_instance': import_instance} + if self.many_split: + kwargs['many_split'] = self.many_split + if self.formater_type == 'TypeFormater': + if self.options not in TARGET_MODELS_KEYS: + logger.warning( + "**WARN FormaterType.get_formater_type**: {} " + "is not in TARGET_MODELS_KEYS".format(self.options)) + return + model = None + if self.options in dir(): + model = dir()[self.options] + else: + model = import_class(self.options) + return TypeFormater(model, **kwargs) + elif self.formater_type == 'UnicodeFormater': + if self.options: + try: + return UnicodeFormater(int(self.options.strip()), **kwargs) + except ValueError: + pass + return UnicodeFormater(**kwargs) + elif self.formater_type == 'DateFormater': + date_formats = self.options + if self.many_split: + date_formats = self.options.split(kwargs.pop('many_split')) + return DateFormater(date_formats, **kwargs) + elif self.formater_type == 'StrToBoolean': + return StrToBoolean(**kwargs) + elif self.formater_type == 'UnknowType': + return + else: + return IMPORTER_TYPES_DCT[self.formater_type](**kwargs) + +IMPORT_STATE = (("C", _(u"Created")), + ("AP", _(u"Analyse in progress")), + ("A", _(u"Analysed")), + ("P", _(u"Import pending")), + ("IP", _(u"Import in progress")), + ("FE", _(u"Finished with errors")), + ("F", _(u"Finished")), + ("AC", _(u"Archived")), + ) + +IMPORT_STATE_DCT = dict(IMPORT_STATE) +ENCODINGS = [(settings.ENCODING, settings.ENCODING), + (settings.ALT_ENCODING, settings.ALT_ENCODING), + ('utf-8', 'utf-8')] + + +class Import(models.Model): + user = models.ForeignKey('IshtarUser') + name = models.CharField(_(u"Name"), max_length=500, + blank=True, null=True) + importer_type = models.ForeignKey(ImporterType) + imported_file = models.FileField( + _(u"Imported file"), upload_to="upload/imports/", max_length=220) + imported_images = models.FileField( + _(u"Associated images (zip file)"), upload_to="upload/imports/", + blank=True, null=True, max_length=220) + encoding = models.CharField(_(u"Encoding"), choices=ENCODINGS, + default=u'utf-8', max_length=15) + skip_lines = models.IntegerField(_(u"Skip lines"), default=1) + error_file = models.FileField(_(u"Error file"), + upload_to="upload/imports/", + blank=True, null=True, max_length=255) + result_file = models.FileField(_(u"Result file"), + upload_to="upload/imports/", + blank=True, null=True, max_length=255) + match_file = models.FileField(_(u"Match file"), + upload_to="upload/imports/", + blank=True, null=True, max_length=255) + state = models.CharField(_(u"State"), max_length=2, choices=IMPORT_STATE, + default=u'C') + conservative_import = models.BooleanField( + _(u"Conservative import"), default=False, + help_text='If set to true, do not overload existing values') + creation_date = models.DateTimeField( + _(u"Creation date"), auto_now_add=True, blank=True, null=True) + end_date = models.DateTimeField(_(u"End date"), blank=True, + null=True, editable=False) + seconds_remaining = models.IntegerField( + _(u"Remaining seconds"), blank=True, null=True, editable=False) + + class Meta: + verbose_name = _(u"Import") + verbose_name_plural = _(u"Imports") + + def __unicode__(self): + return u"{} | {}".format(self.name or u"-", self.importer_type) + + def need_matching(self): + return bool(TargetKey.objects.filter(associated_import=self, + is_set=False).count()) + + @property + def errors(self): + if not self.error_file: + return [] + errors = [] + with open(self.error_file.path, 'rb') as csvfile: + reader = csv.DictReader(csvfile, fieldnames=['line', 'column', + 'error']) + reader.next() # pass the header + for row in reader: + errors.append(row) + return errors + + def get_actions(self): + """ + Get available action relevant with the current status + """ + actions = [] + if self.state == 'C': + actions.append(('A', _(u"Analyse"))) + if self.state == 'A': + actions.append(('A', _(u"Re-analyse"))) + actions.append(('I', _(u"Launch import"))) + if self.state in ('F', 'FE'): + actions.append(('A', _(u"Re-analyse"))) + actions.append(('I', _(u"Re-import"))) + actions.append(('AC', _(u"Archive"))) + if self.state == 'AC': + actions.append(('A', _(u"Unarchive"))) + actions.append(('D', _(u"Delete"))) + return actions + + @property + def imported_filename(self): + return self.imported_file.name.split(os.sep)[-1] + + @property + def status(self): + if self.state not in IMPORT_STATE_DCT: + return "" + return IMPORT_STATE_DCT[self.state] + + def get_importer_instance(self): + return self.importer_type.get_importer_class(import_instance=self)( + skip_lines=self.skip_lines, import_instance=self, + conservative_import=self.conservative_import) + + @property + def data_table(self): + imported_file = self.imported_file.path + tmpdir = None + if zipfile.is_zipfile(imported_file): + z = zipfile.ZipFile(imported_file) + filename = None + for name in z.namelist(): + # get first CSV file found + if name.endswith('.csv'): + filename = name + break + if not filename: + return [] + tmpdir = tempfile.mkdtemp(prefix='tmp-ishtar-') + imported_file = z.extract(filename, tmpdir) + + encodings = [self.encoding] + encodings += [coding for coding, c in ENCODINGS + if coding != self.encoding] + for encoding in encodings: + try: + with open(imported_file) as csv_file: + vals = [line + for line in unicodecsv.reader(csv_file, + encoding=encoding)] + if tmpdir: + shutil.rmtree(tmpdir) + return vals + except UnicodeDecodeError: + pass # try the next encoding + if tmpdir: + shutil.rmtree(tmpdir) + return [] + + def initialize(self): + self.state = 'AP' + self.save() + self.get_importer_instance().initialize(self.data_table, output='db') + self.state = 'A' + self.save() + + def importation(self): + self.state = 'IP' + self.save() + importer = self.get_importer_instance() + importer.importation(self.data_table) + # result file + filename = slugify(self.importer_type.name) + now = datetime.datetime.now().isoformat('-').replace(':', '') + result_file = filename + "_result_%s.csv" % now + result_file = os.sep.join([self.result_file.storage.location, + result_file]) + with open(result_file, 'w') as fle: + fle.write(importer.get_csv_result().encode('utf-8')) + self.result_file = File(open(fle.name)) + if importer.errors: + self.state = 'FE' + error_file = filename + "_errors_%s.csv" % now + error_file = os.sep.join([self.error_file.storage.location, + error_file]) + with open(error_file, 'w') as fle: + fle.write(importer.get_csv_errors().encode('utf-8')) + self.error_file = File(open(fle.name)) + else: + self.state = 'F' + self.error_file = None + if importer.match_table: + match_file = filename + "_match_%s.csv" % now + match_file = os.sep.join([self.match_file.storage.location, + match_file]) + with open(match_file, 'w') as fle: + fle.write(importer.get_csv_matches().encode('utf-8')) + self.match_file = File(open(fle.name)) + self.save() + + def archive(self): + self.state = 'AC' + self.save() + + def get_all_imported(self): + imported = [] + for related, zorg in \ + self._meta.get_all_related_m2m_objects_with_model(): + accessor = related.get_accessor_name() + imported += [(accessor, obj) + for obj in getattr(self, accessor).all()] + return imported + + +def pre_delete_import(sender, **kwargs): + # deleted imported items when an import is delete + instance = kwargs.get('instance') + if not instance: + return + to_delete = [] + for accessor, imported in instance.get_all_imported(): + to_delete.append(imported) + for item in to_delete: + item.delete() + + +pre_delete.connect(pre_delete_import, sender=Import) |