summaryrefslogtreecommitdiff
path: root/ishtar_common/models_imports.py
diff options
context:
space:
mode:
authorÉtienne Loks <etienne.loks@iggdrasil.net>2017-08-21 18:23:29 +0200
committerÉtienne Loks <etienne.loks@iggdrasil.net>2017-08-21 18:23:29 +0200
commit1848a2fcab0d22056a2979459093a54c71f4060d (patch)
tree0d2270c4b34eab262d9ab1f3936c5fe4f8942289 /ishtar_common/models_imports.py
parent644a56222e5c937feb1c23eef95a4906840960bb (diff)
downloadIshtar-1848a2fcab0d22056a2979459093a54c71f4060d.tar.bz2
Ishtar-1848a2fcab0d22056a2979459093a54c71f4060d.zip
Models: refactoring move all import models to a specific file
Diffstat (limited to 'ishtar_common/models_imports.py')
-rw-r--r--ishtar_common/models_imports.py912
1 files changed, 912 insertions, 0 deletions
diff --git a/ishtar_common/models_imports.py b/ishtar_common/models_imports.py
new file mode 100644
index 000000000..dcb02c27e
--- /dev/null
+++ b/ishtar_common/models_imports.py
@@ -0,0 +1,912 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright (C) 2017 Étienne Loks <etienne.loks_AT_peacefrogsDOTnet>
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+# See the file COPYING for details.
+
+import csv
+import datetime
+from importlib import import_module
+import os
+import logging
+import shutil
+import re
+import tempfile
+import unicodecsv
+import zipfile
+
+from django.conf import settings
+from django.contrib.gis.db import models
+from django.core.exceptions import SuspiciousOperation
+from django.core.files import File
+from django.db.models.base import ModelBase
+from django.db.models.signals import pre_delete
+from django.template.defaultfilters import slugify
+from django.utils.translation import ugettext_lazy as _, pgettext_lazy
+
+from ishtar_common.utils import create_slug
+from ishtar_common.data_importer import Importer, ImportFormater, \
+ IntegerFormater, FloatFormater, UnicodeFormater, DateFormater, \
+ TypeFormater, YearFormater, StrToBoolean, FileFormater
+
+logger = logging.getLogger(__name__)
+
+IMPORTER_CLASSES = {}
+
+IMPORTER_CLASSES.update({
+ 'sra-pdl-files':
+ 'archaeological_files.data_importer.FileImporterSraPdL'})
+
+
+def get_model_fields(model):
+ """
+ Return a dict of fields from model
+ To be replace in Django 1.8 with get_fields, get_field
+ """
+ fields = {}
+ options = model._meta
+ for field in sorted(options.fields + options.many_to_many):
+ fields[field.name] = field
+ if hasattr(model, 'get_extra_fields'):
+ fields.update(model.get_extra_fields())
+ return fields
+
+
+def import_class(full_path_classname):
+ """
+ Return the model class from the full path
+ TODO: add a white list for more security
+ """
+ mods = full_path_classname.split('.')
+ if len(mods) == 1:
+ mods = ['ishtar_common', 'models', mods[0]]
+ elif 'models' not in mods and 'models_finds' not in mods \
+ and 'models_treatments' not in mods:
+ raise SuspiciousOperation(
+ u"Try to import a non model from a string")
+ module = import_module('.'.join(mods[:-1]))
+ return getattr(module, mods[-1])
+
+
+class ImportModelManager(models.Manager):
+ def get_by_natural_key(self, klass):
+ return self.get(klass=klass)
+
+
+class ImporterModel(models.Model):
+ name = models.CharField(_(u"Name"), max_length=200)
+ klass = models.CharField(_(u"Class name"), max_length=200, unique=True)
+ objects = ImportModelManager()
+
+ class Meta:
+ verbose_name = _(u"Importer - Model")
+ verbose_name_plural = _(u"Importer - Models")
+ ordering = ('name',)
+
+ def __unicode__(self):
+ return self.name
+
+ def natural_key(self):
+ return (self.klass, )
+
+
+class ImporterTypeManager(models.Manager):
+ def get_by_natural_key(self, slug):
+ return self.get(slug=slug)
+
+
+class ImporterType(models.Model):
+ """
+ Description of a table to be mapped with ishtar database
+ """
+ name = models.CharField(_(u"Name"), blank=True, null=True,
+ max_length=100)
+ slug = models.SlugField(_(u"Slug"), unique=True, max_length=100,
+ blank=True, null=True)
+ description = models.CharField(_(u"Description"), blank=True, null=True,
+ max_length=500)
+ users = models.ManyToManyField('IshtarUser', verbose_name=_(u"Users"),
+ blank=True)
+ associated_models = models.ForeignKey(
+ ImporterModel, verbose_name=_(u"Associated model"),
+ related_name='+', blank=True, null=True)
+ created_models = models.ManyToManyField(
+ ImporterModel, verbose_name=_(u"Models that can accept new items"),
+ blank=True, help_text=_(u"Leave blank for no restrictions"),
+ related_name='+')
+ is_template = models.BooleanField(_(u"Is template"), default=False)
+ unicity_keys = models.CharField(_(u"Unicity keys (separator \";\")"),
+ blank=True, null=True, max_length=500)
+ objects = ImporterTypeManager()
+
+ class Meta:
+ verbose_name = _(u"Importer - Type")
+ verbose_name_plural = _(u"Importer - Types")
+ ordering = ('name',)
+
+ def natural_key(self):
+ return (self.slug, )
+
+ def __unicode__(self):
+ return self.name
+
+ def get_importer_class(self, import_instance=None):
+ if self.slug and self.slug in IMPORTER_CLASSES:
+ cls = import_class(IMPORTER_CLASSES[self.slug])
+ return cls
+ OBJECT_CLS = import_class(self.associated_models.klass)
+ DEFAULTS = dict([(default.keys, default.values)
+ for default in self.defaults.all()])
+ LINE_FORMAT = []
+ idx = 0
+ for column in self.columns.order_by('col_number').all():
+ idx += 1
+ while column.col_number > idx:
+ LINE_FORMAT.append(None)
+ idx += 1
+ targets = []
+ formater_types = []
+ nb = column.targets.count()
+ if not nb:
+ LINE_FORMAT.append(None)
+ continue
+ force_news = []
+ concat_str = []
+ for target in column.targets.all():
+ ft = target.formater_type.get_formater_type(
+ target, import_instance=import_instance)
+ if not ft:
+ continue
+ formater_types.append(ft)
+ targets.append(target.target)
+ concat_str.append(target.concat_str)
+ force_news.append(target.force_new)
+ formater_kwargs = {}
+ if column.regexp_pre_filter:
+ formater_kwargs['regexp'] = re.compile(
+ column.regexp_pre_filter.regexp)
+ formater_kwargs['concat_str'] = concat_str
+ formater_kwargs['duplicate_fields'] = [
+ (field.field_name, field.force_new, field.concat,
+ field.concat_str)
+ for field in column.duplicate_fields.all()]
+ formater_kwargs['label'] = column.label
+ formater_kwargs['required'] = column.required
+ formater_kwargs['force_new'] = force_news
+ if column.export_field_name:
+ formater_kwargs['export_field_name'] = [
+ column.export_field_name]
+ formater = ImportFormater(targets, formater_types,
+ **formater_kwargs)
+ LINE_FORMAT.append(formater)
+ UNICITY_KEYS = []
+ if self.unicity_keys:
+ UNICITY_KEYS = [un.strip() for un in self.unicity_keys.split(';')]
+ MODEL_CREATION_LIMIT = []
+ for modls in self.created_models.all():
+ MODEL_CREATION_LIMIT.append(import_class(modls.klass))
+ args = {'OBJECT_CLS': OBJECT_CLS, 'DESC': self.description,
+ 'DEFAULTS': DEFAULTS, 'LINE_FORMAT': LINE_FORMAT,
+ 'UNICITY_KEYS': UNICITY_KEYS,
+ 'MODEL_CREATION_LIMIT': MODEL_CREATION_LIMIT}
+ name = str(''.join(
+ x for x in slugify(self.name).replace('-', ' ').title()
+ if not x.isspace()))
+ newclass = type(name, (Importer,), args)
+ return newclass
+
+ def save(self, *args, **kwargs):
+ if not self.slug:
+ self.slug = create_slug(ImporterType, self.name)
+ return super(ImporterType, self).save(*args, **kwargs)
+
+
+def get_associated_model(parent_model, keys):
+ model = None
+ if isinstance(parent_model, unicode) or \
+ isinstance(parent_model, str):
+ OBJECT_CLS = import_class(parent_model)
+ else:
+ OBJECT_CLS = parent_model
+ for idx, item in enumerate(keys):
+ if not idx:
+ field = get_model_fields(OBJECT_CLS)[item]
+ if hasattr(field, 'rel') and hasattr(field.rel, 'to'):
+ model = field.rel.to
+ if type(field) == ModelBase:
+ model = field
+ else:
+ return get_associated_model(model, keys[1:])
+ return model
+
+
+class ImporterDefaultManager(models.Manager):
+ def get_by_natural_key(self, importer_type, target):
+ return self.get(importer_type__slug=importer_type, target=target)
+
+
+class ImporterDefault(models.Model):
+ """
+ Targets of default values in an import
+ """
+ importer_type = models.ForeignKey(ImporterType, related_name='defaults')
+ target = models.CharField(u"Target", max_length=500)
+
+ class Meta:
+ verbose_name = _(u"Importer - Default")
+ verbose_name_plural = _(u"Importer - Defaults")
+ unique_together = ('importer_type', 'target')
+ objects = ImporterDefaultManager()
+
+ def __unicode__(self):
+ return u"{} - {}".format(self.importer_type, self.target)
+
+ def natural_key(self):
+ return self.importer_type.slug, self.target
+
+ @property
+ def keys(self):
+ return tuple(self.target.split('__'))
+
+ @property
+ def associated_model(self):
+ return get_associated_model(self.importer_type.associated_models.klass,
+ self.keys)
+
+ @property
+ def values(self):
+ values = {}
+ for default_value in self.default_values.all():
+ values[default_value.target] = default_value.get_value()
+ return values
+
+
+class ImporterDefaultValuesManager(models.Manager):
+ def get_by_natural_key(self, def_target_type, def_target, target):
+ return self.get(default_target__importer_type__slug=def_target_type,
+ default_target__target=def_target,
+ target=target)
+
+
+class ImporterDefaultValues(models.Model):
+ """
+ Default values in an import
+ """
+ default_target = models.ForeignKey(ImporterDefault,
+ related_name='default_values')
+ target = models.CharField(u"Target", max_length=500)
+ value = models.CharField(u"Value", max_length=500)
+ objects = ImporterDefaultValuesManager()
+
+ def __unicode__(self):
+ return u"{} - {}".format(self.default_target, self.target, self.value)
+
+ class Meta:
+ verbose_name = _(u"Importer - Default value")
+ verbose_name_plural = _(u"Importer - Default values")
+
+ def natural_key(self):
+ return (self.default_target.importer_type.slug,
+ self.default_target.target,
+ self.target)
+
+ def get_value(self):
+ parent_model = self.default_target.associated_model
+ if not parent_model:
+ return self.value
+ fields = get_model_fields(parent_model)
+ target = self.target.strip()
+ if target not in fields:
+ return
+ field = fields[target]
+ if not hasattr(field, 'rel') or not hasattr(field.rel, 'to'):
+ return
+ model = field.rel.to
+ # if value is an id
+ try:
+ return model.objects.get(pk=int(self.value))
+ except (ValueError, model.DoesNotExist):
+ pass
+ # try with txt_idx
+ try:
+ return model.objects.get(txt_idx=self.value)
+ except (ValueError, model.DoesNotExist):
+ pass
+ return ""
+
+
+class ImporterColumnManager(models.Manager):
+ def get_by_natural_key(self, importer_type, col_number):
+ return self.get(importer_type__slug=importer_type,
+ col_number=col_number)
+
+
+class ImporterColumn(models.Model):
+ """
+ Import file column description
+ """
+ label = models.CharField(_(u"Label"), blank=True, null=True,
+ max_length=200)
+ importer_type = models.ForeignKey(ImporterType, related_name='columns')
+ col_number = models.IntegerField(_(u"Column number"), default=1)
+ description = models.TextField(_("Description"), blank=True, null=True)
+ regexp_pre_filter = models.ForeignKey("Regexp", blank=True, null=True)
+ required = models.BooleanField(_(u"Required"), default=False)
+ export_field_name = models.CharField(
+ _(u"Export field name"), blank=True, null=True, max_length=200,
+ help_text=_(u"Fill this field if the field name is ambiguous for "
+ u"export. For instance: concatenated fields.")
+ )
+ objects = ImporterColumnManager()
+
+ class Meta:
+ verbose_name = _(u"Importer - Column")
+ verbose_name_plural = _(u"Importer - Columns")
+ ordering = ('importer_type', 'col_number')
+ unique_together = ('importer_type', 'col_number')
+
+ def __unicode__(self):
+ return u"{} - {}".format(self.importer_type, self.col_number)
+
+ def natural_key(self):
+ return self.importer_type.slug, self.col_number
+
+ def targets_lbl(self):
+ return u', '.join([target.target for target in self.targets.all()])
+
+ def duplicate_fields_lbl(self):
+ return u', '.join([dp.field_name
+ for dp in self.duplicate_fields.all()])
+
+
+class ImporterDuplicateFieldManager(models.Manager):
+ def get_by_natural_key(self, importer_type, col_number, field_name):
+ return self.get(column__importer_type__slug=importer_type,
+ column__col_number=col_number,
+ field_name=field_name)
+
+
+class ImporterDuplicateField(models.Model):
+ """
+ Direct copy of result in other fields
+ """
+ column = models.ForeignKey(ImporterColumn, related_name='duplicate_fields')
+ field_name = models.CharField(_(u"Field name"), blank=True, null=True,
+ max_length=200)
+ force_new = models.BooleanField(_(u"Force creation of new items"),
+ default=False)
+ concat = models.BooleanField(_(u"Concatenate with existing"),
+ default=False)
+ concat_str = models.CharField(_(u"Concatenate character"), max_length=5,
+ blank=True, null=True)
+ objects = ImporterDuplicateFieldManager()
+
+ class Meta:
+ verbose_name = _(u"Importer - Duplicate field")
+ verbose_name_plural = _(u"Importer - Duplicate fields")
+ ordering = ('column', 'field_name')
+
+ def natural_key(self):
+ return self.column.importer_type, self.column.col_number, \
+ self.field_name
+
+
+class NamedManager(models.Manager):
+ def get_by_natural_key(self, name):
+ return self.get(name=name)
+
+
+class Regexp(models.Model):
+ name = models.CharField(_(u"Name"), max_length=100, unique=True)
+ description = models.CharField(_(u"Description"), blank=True, null=True,
+ max_length=500)
+ regexp = models.CharField(_(u"Regular expression"), max_length=500)
+ objects = NamedManager()
+
+ class Meta:
+ verbose_name = _(u"Importer - Regular expression")
+ verbose_name_plural = _(u"Importer - Regular expressions")
+
+ def __unicode__(self):
+ return self.name
+
+ def natural_key(self):
+ return (self.name, )
+
+
+class ImportTargetManager(models.Manager):
+ def get_by_natural_key(self, importer_type, col_number, target):
+ return self.get(column__importer_type__slug=importer_type,
+ column__col_number=col_number,
+ target=target)
+
+
+class ImportTarget(models.Model):
+ """
+ Ishtar database target for a column
+ """
+ column = models.ForeignKey(ImporterColumn, related_name='targets')
+ target = models.CharField(u"Target", max_length=500)
+ regexp_filter = models.ForeignKey("Regexp", blank=True, null=True)
+ formater_type = models.ForeignKey("FormaterType")
+ force_new = models.BooleanField(_(u"Force creation of new items"),
+ default=False)
+ concat = models.BooleanField(_(u"Concatenate with existing"),
+ default=False)
+ concat_str = models.CharField(_(u"Concatenate character"), max_length=5,
+ blank=True, null=True)
+ comment = models.TextField(_(u"Comment"), blank=True, null=True)
+ objects = ImportTargetManager()
+
+ class Meta:
+ verbose_name = _(u"Importer - Target")
+ verbose_name_plural = _(u"Importer - Targets")
+ unique_together = ('column', 'target')
+
+ def __unicode__(self):
+ return self.target[:50] if self.target else self.comment
+
+ def natural_key(self):
+ return self.column.importer_type.slug, self.column.col_number, \
+ self.target
+
+ @property
+ def associated_model(self):
+ try:
+ return get_associated_model(
+ self.column.importer_type.associated_models.klass,
+ self.target.split('__'))
+ except KeyError:
+ return
+
+ def get_choices(self):
+ if self.formater_type.formater_type == 'UnknowType' \
+ and self.column.importer_type.slug:
+ cls = self.column.importer_type.get_importer_class()
+ formt = cls().line_format[self.column.col_number - 1]
+ if hasattr(formt.formater, 'choices'):
+ return [('', '--' * 8)] + list(formt.formater.choices)
+ return [('', '--' * 8)]
+ if self.formater_type.formater_type == 'StrToBoolean':
+ return [('', '--' * 8),
+ ('True', _(u"True")),
+ ('False', _(u"False"))]
+ if not self.associated_model or not hasattr(self.associated_model,
+ 'get_types'):
+ return []
+ return self.associated_model.get_types()
+
+
+class TargetKey(models.Model):
+ """
+ User's link between import source and ishtar database.
+ Also temporary used for GeneralType to point missing link before adding
+ them in ItemKey table.
+ A targetkey connection can be create to be applied to on particular
+ import (associated_import), one particular user (associated_user) or to all
+ imports (associated_import and associated_user are empty).
+ """
+ target = models.ForeignKey(ImportTarget, related_name='keys')
+ key = models.TextField(_(u"Key"))
+ value = models.TextField(_(u"Value"), blank=True, null=True)
+ is_set = models.BooleanField(_(u"Is set"), default=False)
+ associated_import = models.ForeignKey('Import', blank=True, null=True)
+ associated_user = models.ForeignKey('IshtarUser', blank=True, null=True)
+
+ class Meta:
+ unique_together = ('target', 'key', 'associated_user',
+ 'associated_import')
+ verbose_name = _(u"Importer - Target key")
+ verbose_name_plural = _(u"Importer - Targets keys")
+
+ def __unicode__(self):
+ return u" - ".join([unicode(self.target), self.key[:50]])
+
+ def column_nb(self):
+ # for the admin
+ return self.target.column.col_number
+
+ def importer_type(self):
+ # for the admin
+ return self.target.column.importer_type.name
+
+ def format(self):
+ if not self.is_set:
+ return None
+ if self.target.formater_type.formater_type == 'StrToBoolean':
+ if self.value in ('False', '0'):
+ return False
+ elif self.value:
+ return True
+ return
+ return self.value
+
+ def save(self, *args, **kwargs):
+ obj = super(TargetKey, self).save(*args, **kwargs)
+ if not self.value:
+ return obj
+ associated_model = self.target.associated_model
+ if associated_model and hasattr(self.target.associated_model,
+ "add_key"):
+ v = None
+ # pk is given
+ try:
+ v = self.target.associated_model.objects.get(
+ pk=unicode(int(self.value)))
+ except (ValueError, self.target.associated_model.DoesNotExist):
+ # try with txt_idx
+ try:
+ v = self.target.associated_model.objects.get(
+ txt_idx=unicode(self.value))
+ except self.target.associated_model.DoesNotExist:
+ pass
+ if v:
+ v.add_key(self.key, importer=self.associated_import)
+ return obj
+
+TARGET_MODELS = [
+ ('OrganizationType', _(u"Organization type")),
+ ('TitleType', _(u"Title")),
+ ('SourceType', _(u"Source type")),
+ ('AuthorType', _(u"Author type")),
+ ('Format', _(u"Format")),
+ ('archaeological_operations.models.OperationType', _(u"Operation type")),
+ ('archaeological_operations.models.Period', _(u"Period")),
+ ('archaeological_operations.models.ReportState', _(u"Report state")),
+ ('archaeological_operations.models.RemainType', _(u"Remain type")),
+ ('archaeological_context_records.models.Unit', _(u"Unit")),
+ ('archaeological_context_records.models.ActivityType',
+ _(u"Activity type")),
+ ('archaeological_context_records.models.DocumentationType',
+ _(u"Documentation type")),
+ ('archaeological_finds.models.MaterialType', _(u"Material")),
+ ('archaeological_finds.models.ConservatoryState',
+ _(u"Conservatory state")),
+ ('archaeological_warehouse.models.ContainerType', _(u"Container type")),
+ ('archaeological_finds.models.PreservationType', _(u"Preservation type")),
+ ('archaeological_finds.models.ObjectType', _(u"Object type")),
+ ('archaeological_finds.models.IntegrityType', _(u"Integrity type")),
+ ('archaeological_finds.models.RemarkabilityType',
+ _(u"Remarkability type")),
+ ('archaeological_finds.models.BatchType', _(u"Batch type")),
+ ('archaeological_context_records.models.IdentificationType',
+ _("Identification type")),
+ ('archaeological_context_records.models.RelationType',
+ _(u"Context record relation type")),
+ ('SpatialReferenceSystem', _(u"Spatial reference system")),
+ ('SupportType', _(u"Support type")),
+ ('TitleType', _(u"Title type")),
+]
+
+TARGET_MODELS_KEYS = [tm[0] for tm in TARGET_MODELS]
+
+IMPORTER_TYPES = (
+ ('IntegerFormater', _(u"Integer")),
+ ('FloatFormater', _(u"Float")),
+ ('UnicodeFormater', _(u"String")),
+ ('DateFormater', _(u"Date")),
+ ('TypeFormater', _(u"Type")),
+ ('YearFormater', _(u"Year")),
+ ('StrToBoolean', _(u"String to boolean")),
+ ('FileFormater', pgettext_lazy("filesystem", u"File")),
+ ('UnknowType', _(u"Unknow type"))
+)
+
+IMPORTER_TYPES_DCT = {
+ 'IntegerFormater': IntegerFormater,
+ 'FloatFormater': FloatFormater,
+ 'UnicodeFormater': UnicodeFormater,
+ 'DateFormater': DateFormater,
+ 'TypeFormater': TypeFormater,
+ 'YearFormater': YearFormater,
+ 'StrToBoolean': StrToBoolean,
+ 'FileFormater': FileFormater,
+ 'UnknowType': None,
+}
+
+DATE_FORMATS = (
+ ('%Y', _(u"4 digit year. e.g.: \"2015\"")),
+ ('%Y/%m/%d', _(u"4 digit year/month/day. e.g.: \"2015/02/04\"")),
+ ('%d/%m/%Y', _(u"Day/month/4 digit year. e.g.: \"04/02/2015\"")),
+)
+
+IMPORTER_TYPES_CHOICES = {'TypeFormater': TARGET_MODELS,
+ 'DateFormater': DATE_FORMATS}
+
+
+class FormaterTypeManager(models.Manager):
+ def get_by_natural_key(self, formater_type, options, many_split):
+ return self.get(formater_type=formater_type,
+ options=options, many_split=many_split)
+
+
+class FormaterType(models.Model):
+ formater_type = models.CharField(u"Formater type", max_length=20,
+ choices=IMPORTER_TYPES)
+ options = models.CharField(_(u"Options"), max_length=500, blank=True,
+ null=True)
+ many_split = models.CharField(_(u"Split character(s)"), max_length=10,
+ blank=True, null=True)
+ objects = FormaterTypeManager()
+
+ class Meta:
+ verbose_name = _(u"Importer - Formater type")
+ verbose_name_plural = _(u"Importer - Formater types")
+ unique_together = ('formater_type', 'options', 'many_split')
+ ordering = ('formater_type', 'options')
+
+ def natural_key(self):
+ return self.formater_type, self.options, self.many_split
+
+ def __unicode__(self):
+ return u" - ".join(
+ [unicode(dict(IMPORTER_TYPES)[self.formater_type])
+ if self.formater_type in IMPORTER_TYPES_DCT else ''] +
+ [getattr(self, k) for k in ('options', 'many_split')
+ if getattr(self, k)])
+
+ def get_choices(self):
+ if self.format_type in IMPORTER_TYPES_CHOICES:
+ return IMPORTER_TYPES_CHOICES[self.format_type]
+
+ def get_formater_type(self, target, import_instance=None):
+ if self.formater_type not in IMPORTER_TYPES_DCT.keys():
+ return
+ kwargs = {'db_target': target, 'import_instance': import_instance}
+ if self.many_split:
+ kwargs['many_split'] = self.many_split
+ if self.formater_type == 'TypeFormater':
+ if self.options not in TARGET_MODELS_KEYS:
+ logger.warning(
+ "**WARN FormaterType.get_formater_type**: {} "
+ "is not in TARGET_MODELS_KEYS".format(self.options))
+ return
+ model = None
+ if self.options in dir():
+ model = dir()[self.options]
+ else:
+ model = import_class(self.options)
+ return TypeFormater(model, **kwargs)
+ elif self.formater_type == 'UnicodeFormater':
+ if self.options:
+ try:
+ return UnicodeFormater(int(self.options.strip()), **kwargs)
+ except ValueError:
+ pass
+ return UnicodeFormater(**kwargs)
+ elif self.formater_type == 'DateFormater':
+ date_formats = self.options
+ if self.many_split:
+ date_formats = self.options.split(kwargs.pop('many_split'))
+ return DateFormater(date_formats, **kwargs)
+ elif self.formater_type == 'StrToBoolean':
+ return StrToBoolean(**kwargs)
+ elif self.formater_type == 'UnknowType':
+ return
+ else:
+ return IMPORTER_TYPES_DCT[self.formater_type](**kwargs)
+
+IMPORT_STATE = (("C", _(u"Created")),
+ ("AP", _(u"Analyse in progress")),
+ ("A", _(u"Analysed")),
+ ("P", _(u"Import pending")),
+ ("IP", _(u"Import in progress")),
+ ("FE", _(u"Finished with errors")),
+ ("F", _(u"Finished")),
+ ("AC", _(u"Archived")),
+ )
+
+IMPORT_STATE_DCT = dict(IMPORT_STATE)
+ENCODINGS = [(settings.ENCODING, settings.ENCODING),
+ (settings.ALT_ENCODING, settings.ALT_ENCODING),
+ ('utf-8', 'utf-8')]
+
+
+class Import(models.Model):
+ user = models.ForeignKey('IshtarUser')
+ name = models.CharField(_(u"Name"), max_length=500,
+ blank=True, null=True)
+ importer_type = models.ForeignKey(ImporterType)
+ imported_file = models.FileField(
+ _(u"Imported file"), upload_to="upload/imports/", max_length=220)
+ imported_images = models.FileField(
+ _(u"Associated images (zip file)"), upload_to="upload/imports/",
+ blank=True, null=True, max_length=220)
+ encoding = models.CharField(_(u"Encoding"), choices=ENCODINGS,
+ default=u'utf-8', max_length=15)
+ skip_lines = models.IntegerField(_(u"Skip lines"), default=1)
+ error_file = models.FileField(_(u"Error file"),
+ upload_to="upload/imports/",
+ blank=True, null=True, max_length=255)
+ result_file = models.FileField(_(u"Result file"),
+ upload_to="upload/imports/",
+ blank=True, null=True, max_length=255)
+ match_file = models.FileField(_(u"Match file"),
+ upload_to="upload/imports/",
+ blank=True, null=True, max_length=255)
+ state = models.CharField(_(u"State"), max_length=2, choices=IMPORT_STATE,
+ default=u'C')
+ conservative_import = models.BooleanField(
+ _(u"Conservative import"), default=False,
+ help_text='If set to true, do not overload existing values')
+ creation_date = models.DateTimeField(
+ _(u"Creation date"), auto_now_add=True, blank=True, null=True)
+ end_date = models.DateTimeField(_(u"End date"), blank=True,
+ null=True, editable=False)
+ seconds_remaining = models.IntegerField(
+ _(u"Remaining seconds"), blank=True, null=True, editable=False)
+
+ class Meta:
+ verbose_name = _(u"Import")
+ verbose_name_plural = _(u"Imports")
+
+ def __unicode__(self):
+ return u"{} | {}".format(self.name or u"-", self.importer_type)
+
+ def need_matching(self):
+ return bool(TargetKey.objects.filter(associated_import=self,
+ is_set=False).count())
+
+ @property
+ def errors(self):
+ if not self.error_file:
+ return []
+ errors = []
+ with open(self.error_file.path, 'rb') as csvfile:
+ reader = csv.DictReader(csvfile, fieldnames=['line', 'column',
+ 'error'])
+ reader.next() # pass the header
+ for row in reader:
+ errors.append(row)
+ return errors
+
+ def get_actions(self):
+ """
+ Get available action relevant with the current status
+ """
+ actions = []
+ if self.state == 'C':
+ actions.append(('A', _(u"Analyse")))
+ if self.state == 'A':
+ actions.append(('A', _(u"Re-analyse")))
+ actions.append(('I', _(u"Launch import")))
+ if self.state in ('F', 'FE'):
+ actions.append(('A', _(u"Re-analyse")))
+ actions.append(('I', _(u"Re-import")))
+ actions.append(('AC', _(u"Archive")))
+ if self.state == 'AC':
+ actions.append(('A', _(u"Unarchive")))
+ actions.append(('D', _(u"Delete")))
+ return actions
+
+ @property
+ def imported_filename(self):
+ return self.imported_file.name.split(os.sep)[-1]
+
+ @property
+ def status(self):
+ if self.state not in IMPORT_STATE_DCT:
+ return ""
+ return IMPORT_STATE_DCT[self.state]
+
+ def get_importer_instance(self):
+ return self.importer_type.get_importer_class(import_instance=self)(
+ skip_lines=self.skip_lines, import_instance=self,
+ conservative_import=self.conservative_import)
+
+ @property
+ def data_table(self):
+ imported_file = self.imported_file.path
+ tmpdir = None
+ if zipfile.is_zipfile(imported_file):
+ z = zipfile.ZipFile(imported_file)
+ filename = None
+ for name in z.namelist():
+ # get first CSV file found
+ if name.endswith('.csv'):
+ filename = name
+ break
+ if not filename:
+ return []
+ tmpdir = tempfile.mkdtemp(prefix='tmp-ishtar-')
+ imported_file = z.extract(filename, tmpdir)
+
+ encodings = [self.encoding]
+ encodings += [coding for coding, c in ENCODINGS
+ if coding != self.encoding]
+ for encoding in encodings:
+ try:
+ with open(imported_file) as csv_file:
+ vals = [line
+ for line in unicodecsv.reader(csv_file,
+ encoding=encoding)]
+ if tmpdir:
+ shutil.rmtree(tmpdir)
+ return vals
+ except UnicodeDecodeError:
+ pass # try the next encoding
+ if tmpdir:
+ shutil.rmtree(tmpdir)
+ return []
+
+ def initialize(self):
+ self.state = 'AP'
+ self.save()
+ self.get_importer_instance().initialize(self.data_table, output='db')
+ self.state = 'A'
+ self.save()
+
+ def importation(self):
+ self.state = 'IP'
+ self.save()
+ importer = self.get_importer_instance()
+ importer.importation(self.data_table)
+ # result file
+ filename = slugify(self.importer_type.name)
+ now = datetime.datetime.now().isoformat('-').replace(':', '')
+ result_file = filename + "_result_%s.csv" % now
+ result_file = os.sep.join([self.result_file.storage.location,
+ result_file])
+ with open(result_file, 'w') as fle:
+ fle.write(importer.get_csv_result().encode('utf-8'))
+ self.result_file = File(open(fle.name))
+ if importer.errors:
+ self.state = 'FE'
+ error_file = filename + "_errors_%s.csv" % now
+ error_file = os.sep.join([self.error_file.storage.location,
+ error_file])
+ with open(error_file, 'w') as fle:
+ fle.write(importer.get_csv_errors().encode('utf-8'))
+ self.error_file = File(open(fle.name))
+ else:
+ self.state = 'F'
+ self.error_file = None
+ if importer.match_table:
+ match_file = filename + "_match_%s.csv" % now
+ match_file = os.sep.join([self.match_file.storage.location,
+ match_file])
+ with open(match_file, 'w') as fle:
+ fle.write(importer.get_csv_matches().encode('utf-8'))
+ self.match_file = File(open(fle.name))
+ self.save()
+
+ def archive(self):
+ self.state = 'AC'
+ self.save()
+
+ def get_all_imported(self):
+ imported = []
+ for related, zorg in \
+ self._meta.get_all_related_m2m_objects_with_model():
+ accessor = related.get_accessor_name()
+ imported += [(accessor, obj)
+ for obj in getattr(self, accessor).all()]
+ return imported
+
+
+def pre_delete_import(sender, **kwargs):
+ # deleted imported items when an import is delete
+ instance = kwargs.get('instance')
+ if not instance:
+ return
+ to_delete = []
+ for accessor, imported in instance.get_all_imported():
+ to_delete.append(imported)
+ for item in to_delete:
+ item.delete()
+
+
+pre_delete.connect(pre_delete_import, sender=Import)