summaryrefslogtreecommitdiff
path: root/ishtar_common
diff options
context:
space:
mode:
authorÉtienne Loks <etienne.loks@iggdrasil.net>2017-08-21 18:23:29 +0200
committerÉtienne Loks <etienne.loks@iggdrasil.net>2017-08-21 18:23:29 +0200
commit44c98340f5978ff4818a66351235ab33df316f18 (patch)
tree0d2270c4b34eab262d9ab1f3936c5fe4f8942289 /ishtar_common
parent5a9821f832f632512182835b42741073da14ca75 (diff)
downloadIshtar-44c98340f5978ff4818a66351235ab33df316f18.tar.bz2
Ishtar-44c98340f5978ff4818a66351235ab33df316f18.zip
Models: refactoring move all import models to a specific file
Diffstat (limited to 'ishtar_common')
-rw-r--r--ishtar_common/models.py898
-rw-r--r--ishtar_common/models_imports.py912
2 files changed, 933 insertions, 877 deletions
diff --git a/ishtar_common/models.py b/ishtar_common/models.py
index dd8204f21..725c3ac83 100644
--- a/ishtar_common/models.py
+++ b/ishtar_common/models.py
@@ -70,6 +70,17 @@ from ishtar_common.data_importer import Importer, ImportFormater, \
IntegerFormater, FloatFormater, UnicodeFormater, DateFormater, \
TypeFormater, YearFormater, StrToBoolean, FileFormater
+from ishtar_common.models_imports import ImporterModel, ImporterType, \
+ ImporterDefault, ImporterDefaultValues, ImporterColumn, \
+ ImporterDuplicateField, Regexp, ImportTarget, TargetKey, FormaterType, \
+ Import
+
+__all__ = [
+ 'ImporterModel', 'ImporterType', 'ImporterDefault', 'ImporterDefaultValues',
+ 'ImporterColumn', 'ImporterDuplicateField', 'Regexp', 'ImportTarget',
+ 'TargetKey', 'FormaterType', 'Import'
+]
+
logger = logging.getLogger(__name__)
@@ -123,15 +134,6 @@ def check_model_access_control(request, model, available_perms=None):
return allowed, own
-class Imported(models.Model):
- imports = models.ManyToManyField(
- 'Import', blank=True,
- related_name="imported_%(app_label)s_%(class)s")
-
- class Meta:
- abstract = True
-
-
class ValueGetter(object):
_prefix = ""
GET_VALUES_EXTRA = []
@@ -798,7 +800,7 @@ class ItemKey(models.Model):
object_id = models.PositiveIntegerField()
content_object = GenericForeignKey('content_type', 'object_id')
importer = models.ForeignKey(
- 'Import', null=True, blank=True,
+ Import, null=True, blank=True,
help_text=_(u"Specific key to an import"))
def __unicode__(self):
@@ -913,6 +915,15 @@ class BulkUpdatedItem(object):
return transaction_id, False
+class Imported(models.Model):
+ imports = models.ManyToManyField(
+ Import, blank=True,
+ related_name="imported_%(app_label)s_%(class)s")
+
+ class Meta:
+ abstract = True
+
+
class BaseHistorizedItem(Imported):
IS_BASKET = False
history_modifier = models.ForeignKey(
@@ -1880,873 +1891,6 @@ post_delete.connect(post_save_cache, sender=OrganizationType)
organization_type_pk_lazy = lazy(OrganizationType.get_or_create_pk, unicode)
organization_type_pks_lazy = lazy(OrganizationType.get_or_create_pks, unicode)
-IMPORTER_CLASSES = {}
-
-IMPORTER_CLASSES.update({
- 'sra-pdl-files':
- 'archaeological_files.data_importer.FileImporterSraPdL'})
-
-
-def get_model_fields(model):
- """
- Return a dict of fields from model
- To be replace in Django 1.8 with get_fields, get_field
- """
- fields = {}
- options = model._meta
- for field in sorted(options.fields + options.many_to_many):
- fields[field.name] = field
- if hasattr(model, 'get_extra_fields'):
- fields.update(model.get_extra_fields())
- return fields
-
-
-def import_class(full_path_classname):
- """
- Return the model class from the full path
- TODO: add a white list for more security
- """
- mods = full_path_classname.split('.')
- if len(mods) == 1:
- mods = ['ishtar_common', 'models', mods[0]]
- elif 'models' not in mods and 'models_finds' not in mods \
- and 'models_treatments' not in mods:
- raise SuspiciousOperation(
- u"Try to import a non model from a string")
- module = import_module('.'.join(mods[:-1]))
- return getattr(module, mods[-1])
-
-
-class ImportModelManager(models.Manager):
- def get_by_natural_key(self, klass):
- return self.get(klass=klass)
-
-
-class ImporterModel(models.Model):
- name = models.CharField(_(u"Name"), max_length=200)
- klass = models.CharField(_(u"Class name"), max_length=200, unique=True)
- objects = ImportModelManager()
-
- class Meta:
- verbose_name = _(u"Importer - Model")
- verbose_name_plural = _(u"Importer - Models")
- ordering = ('name',)
-
- def __unicode__(self):
- return self.name
-
- def natural_key(self):
- return (self.klass, )
-
-
-class ImporterTypeManager(models.Manager):
- def get_by_natural_key(self, slug):
- return self.get(slug=slug)
-
-
-class ImporterType(models.Model):
- """
- Description of a table to be mapped with ishtar database
- """
- name = models.CharField(_(u"Name"), blank=True, null=True,
- max_length=100)
- slug = models.SlugField(_(u"Slug"), unique=True, max_length=100,
- blank=True, null=True)
- description = models.CharField(_(u"Description"), blank=True, null=True,
- max_length=500)
- users = models.ManyToManyField('IshtarUser', verbose_name=_(u"Users"),
- blank=True)
- associated_models = models.ForeignKey(
- ImporterModel, verbose_name=_(u"Associated model"),
- related_name='+', blank=True, null=True)
- created_models = models.ManyToManyField(
- ImporterModel, verbose_name=_(u"Models that can accept new items"),
- blank=True, help_text=_(u"Leave blank for no restrictions"),
- related_name='+')
- is_template = models.BooleanField(_(u"Is template"), default=False)
- unicity_keys = models.CharField(_(u"Unicity keys (separator \";\")"),
- blank=True, null=True, max_length=500)
- objects = ImporterTypeManager()
-
- class Meta:
- verbose_name = _(u"Importer - Type")
- verbose_name_plural = _(u"Importer - Types")
- ordering = ('name',)
-
- def natural_key(self):
- return (self.slug, )
-
- def __unicode__(self):
- return self.name
-
- def get_importer_class(self, import_instance=None):
- if self.slug and self.slug in IMPORTER_CLASSES:
- cls = import_class(IMPORTER_CLASSES[self.slug])
- return cls
- OBJECT_CLS = import_class(self.associated_models.klass)
- DEFAULTS = dict([(default.keys, default.values)
- for default in self.defaults.all()])
- LINE_FORMAT = []
- idx = 0
- for column in self.columns.order_by('col_number').all():
- idx += 1
- while column.col_number > idx:
- LINE_FORMAT.append(None)
- idx += 1
- targets = []
- formater_types = []
- nb = column.targets.count()
- if not nb:
- LINE_FORMAT.append(None)
- continue
- force_news = []
- concat_str = []
- for target in column.targets.all():
- ft = target.formater_type.get_formater_type(
- target, import_instance=import_instance)
- if not ft:
- continue
- formater_types.append(ft)
- targets.append(target.target)
- concat_str.append(target.concat_str)
- force_news.append(target.force_new)
- formater_kwargs = {}
- if column.regexp_pre_filter:
- formater_kwargs['regexp'] = re.compile(
- column.regexp_pre_filter.regexp)
- formater_kwargs['concat_str'] = concat_str
- formater_kwargs['duplicate_fields'] = [
- (field.field_name, field.force_new, field.concat,
- field.concat_str)
- for field in column.duplicate_fields.all()]
- formater_kwargs['label'] = column.label
- formater_kwargs['required'] = column.required
- formater_kwargs['force_new'] = force_news
- if column.export_field_name:
- formater_kwargs['export_field_name'] = [
- column.export_field_name]
- formater = ImportFormater(targets, formater_types,
- **formater_kwargs)
- LINE_FORMAT.append(formater)
- UNICITY_KEYS = []
- if self.unicity_keys:
- UNICITY_KEYS = [un.strip() for un in self.unicity_keys.split(';')]
- MODEL_CREATION_LIMIT = []
- for modls in self.created_models.all():
- MODEL_CREATION_LIMIT.append(import_class(modls.klass))
- args = {'OBJECT_CLS': OBJECT_CLS, 'DESC': self.description,
- 'DEFAULTS': DEFAULTS, 'LINE_FORMAT': LINE_FORMAT,
- 'UNICITY_KEYS': UNICITY_KEYS,
- 'MODEL_CREATION_LIMIT': MODEL_CREATION_LIMIT}
- name = str(''.join(
- x for x in slugify(self.name).replace('-', ' ').title()
- if not x.isspace()))
- newclass = type(name, (Importer,), args)
- return newclass
-
- def save(self, *args, **kwargs):
- if not self.slug:
- self.slug = create_slug(ImporterType, self.name)
- return super(ImporterType, self).save(*args, **kwargs)
-
-
-def get_associated_model(parent_model, keys):
- model = None
- if isinstance(parent_model, unicode) or \
- isinstance(parent_model, str):
- OBJECT_CLS = import_class(parent_model)
- else:
- OBJECT_CLS = parent_model
- for idx, item in enumerate(keys):
- if not idx:
- field = get_model_fields(OBJECT_CLS)[item]
- if hasattr(field, 'rel') and hasattr(field.rel, 'to'):
- model = field.rel.to
- if type(field) == ModelBase:
- model = field
- else:
- return get_associated_model(model, keys[1:])
- return model
-
-
-class ImporterDefaultManager(models.Manager):
- def get_by_natural_key(self, importer_type, target):
- return self.get(importer_type__slug=importer_type, target=target)
-
-
-class ImporterDefault(models.Model):
- """
- Targets of default values in an import
- """
- importer_type = models.ForeignKey(ImporterType, related_name='defaults')
- target = models.CharField(u"Target", max_length=500)
-
- class Meta:
- verbose_name = _(u"Importer - Default")
- verbose_name_plural = _(u"Importer - Defaults")
- unique_together = ('importer_type', 'target')
- objects = ImporterDefaultManager()
-
- def __unicode__(self):
- return u"{} - {}".format(self.importer_type, self.target)
-
- def natural_key(self):
- return self.importer_type.slug, self.target
-
- @property
- def keys(self):
- return tuple(self.target.split('__'))
-
- @property
- def associated_model(self):
- return get_associated_model(self.importer_type.associated_models.klass,
- self.keys)
-
- @property
- def values(self):
- values = {}
- for default_value in self.default_values.all():
- values[default_value.target] = default_value.get_value()
- return values
-
-
-class ImporterDefaultValuesManager(models.Manager):
- def get_by_natural_key(self, def_target_type, def_target, target):
- return self.get(default_target__importer_type__slug=def_target_type,
- default_target__target=def_target,
- target=target)
-
-
-class ImporterDefaultValues(models.Model):
- """
- Default values in an import
- """
- default_target = models.ForeignKey(ImporterDefault,
- related_name='default_values')
- target = models.CharField(u"Target", max_length=500)
- value = models.CharField(u"Value", max_length=500)
- objects = ImporterDefaultValuesManager()
-
- def __unicode__(self):
- return u"{} - {}".format(self.default_target, self.target, self.value)
-
- class Meta:
- verbose_name = _(u"Importer - Default value")
- verbose_name_plural = _(u"Importer - Default values")
-
- def natural_key(self):
- return (self.default_target.importer_type.slug,
- self.default_target.target,
- self.target)
-
- def get_value(self):
- parent_model = self.default_target.associated_model
- if not parent_model:
- return self.value
- fields = get_model_fields(parent_model)
- target = self.target.strip()
- if target not in fields:
- return
- field = fields[target]
- if not hasattr(field, 'rel') or not hasattr(field.rel, 'to'):
- return
- model = field.rel.to
- # if value is an id
- try:
- return model.objects.get(pk=int(self.value))
- except (ValueError, model.DoesNotExist):
- pass
- # try with txt_idx
- try:
- return model.objects.get(txt_idx=self.value)
- except (ValueError, model.DoesNotExist):
- pass
- return ""
-
-
-class ImporterColumnManager(models.Manager):
- def get_by_natural_key(self, importer_type, col_number):
- return self.get(importer_type__slug=importer_type,
- col_number=col_number)
-
-
-class ImporterColumn(models.Model):
- """
- Import file column description
- """
- label = models.CharField(_(u"Label"), blank=True, null=True,
- max_length=200)
- importer_type = models.ForeignKey(ImporterType, related_name='columns')
- col_number = models.IntegerField(_(u"Column number"), default=1)
- description = models.TextField(_("Description"), blank=True, null=True)
- regexp_pre_filter = models.ForeignKey("Regexp", blank=True, null=True)
- required = models.BooleanField(_(u"Required"), default=False)
- export_field_name = models.CharField(
- _(u"Export field name"), blank=True, null=True, max_length=200,
- help_text=_(u"Fill this field if the field name is ambiguous for "
- u"export. For instance: concatenated fields.")
- )
- objects = ImporterColumnManager()
-
- class Meta:
- verbose_name = _(u"Importer - Column")
- verbose_name_plural = _(u"Importer - Columns")
- ordering = ('importer_type', 'col_number')
- unique_together = ('importer_type', 'col_number')
-
- def __unicode__(self):
- return u"{} - {}".format(self.importer_type, self.col_number)
-
- def natural_key(self):
- return self.importer_type.slug, self.col_number
-
- def targets_lbl(self):
- return u', '.join([target.target for target in self.targets.all()])
-
- def duplicate_fields_lbl(self):
- return u', '.join([dp.field_name
- for dp in self.duplicate_fields.all()])
-
-
-class ImporterDuplicateFieldManager(models.Manager):
- def get_by_natural_key(self, importer_type, col_number, field_name):
- return self.get(column__importer_type__slug=importer_type,
- column__col_number=col_number,
- field_name=field_name)
-
-
-class ImporterDuplicateField(models.Model):
- """
- Direct copy of result in other fields
- """
- column = models.ForeignKey(ImporterColumn, related_name='duplicate_fields')
- field_name = models.CharField(_(u"Field name"), blank=True, null=True,
- max_length=200)
- force_new = models.BooleanField(_(u"Force creation of new items"),
- default=False)
- concat = models.BooleanField(_(u"Concatenate with existing"),
- default=False)
- concat_str = models.CharField(_(u"Concatenate character"), max_length=5,
- blank=True, null=True)
- objects = ImporterDuplicateFieldManager()
-
- class Meta:
- verbose_name = _(u"Importer - Duplicate field")
- verbose_name_plural = _(u"Importer - Duplicate fields")
- ordering = ('column', 'field_name')
-
- def natural_key(self):
- return self.column.importer_type, self.column.col_number, \
- self.field_name
-
-
-class NamedManager(models.Manager):
- def get_by_natural_key(self, name):
- return self.get(name=name)
-
-
-class Regexp(models.Model):
- name = models.CharField(_(u"Name"), max_length=100, unique=True)
- description = models.CharField(_(u"Description"), blank=True, null=True,
- max_length=500)
- regexp = models.CharField(_(u"Regular expression"), max_length=500)
- objects = NamedManager()
-
- class Meta:
- verbose_name = _(u"Importer - Regular expression")
- verbose_name_plural = _(u"Importer - Regular expressions")
-
- def __unicode__(self):
- return self.name
-
- def natural_key(self):
- return (self.name, )
-
-
-class ImportTargetManager(models.Manager):
- def get_by_natural_key(self, importer_type, col_number, target):
- return self.get(column__importer_type__slug=importer_type,
- column__col_number=col_number,
- target=target)
-
-
-class ImportTarget(models.Model):
- """
- Ishtar database target for a column
- """
- column = models.ForeignKey(ImporterColumn, related_name='targets')
- target = models.CharField(u"Target", max_length=500)
- regexp_filter = models.ForeignKey("Regexp", blank=True, null=True)
- formater_type = models.ForeignKey("FormaterType")
- force_new = models.BooleanField(_(u"Force creation of new items"),
- default=False)
- concat = models.BooleanField(_(u"Concatenate with existing"),
- default=False)
- concat_str = models.CharField(_(u"Concatenate character"), max_length=5,
- blank=True, null=True)
- comment = models.TextField(_(u"Comment"), blank=True, null=True)
- objects = ImportTargetManager()
-
- class Meta:
- verbose_name = _(u"Importer - Target")
- verbose_name_plural = _(u"Importer - Targets")
- unique_together = ('column', 'target')
-
- def __unicode__(self):
- return self.target[:50] if self.target else self.comment
-
- def natural_key(self):
- return self.column.importer_type.slug, self.column.col_number, \
- self.target
-
- @property
- def associated_model(self):
- try:
- return get_associated_model(
- self.column.importer_type.associated_models.klass,
- self.target.split('__'))
- except KeyError:
- return
-
- def get_choices(self):
- if self.formater_type.formater_type == 'UnknowType' \
- and self.column.importer_type.slug:
- cls = self.column.importer_type.get_importer_class()
- formt = cls().line_format[self.column.col_number - 1]
- if hasattr(formt.formater, 'choices'):
- return [('', '--' * 8)] + list(formt.formater.choices)
- return [('', '--' * 8)]
- if self.formater_type.formater_type == 'StrToBoolean':
- return [('', '--' * 8),
- ('True', _(u"True")),
- ('False', _(u"False"))]
- if not self.associated_model or not hasattr(self.associated_model,
- 'get_types'):
- return []
- return self.associated_model.get_types()
-
-
-class TargetKey(models.Model):
- """
- User's link between import source and ishtar database.
- Also temporary used for GeneralType to point missing link before adding
- them in ItemKey table.
- A targetkey connection can be create to be applied to on particular
- import (associated_import), one particular user (associated_user) or to all
- imports (associated_import and associated_user are empty).
- """
- target = models.ForeignKey(ImportTarget, related_name='keys')
- key = models.TextField(_(u"Key"))
- value = models.TextField(_(u"Value"), blank=True, null=True)
- is_set = models.BooleanField(_(u"Is set"), default=False)
- associated_import = models.ForeignKey('Import', blank=True, null=True)
- associated_user = models.ForeignKey('IshtarUser', blank=True, null=True)
-
- class Meta:
- unique_together = ('target', 'key', 'associated_user',
- 'associated_import')
- verbose_name = _(u"Importer - Target key")
- verbose_name_plural = _(u"Importer - Targets keys")
-
- def __unicode__(self):
- return u" - ".join([unicode(self.target), self.key[:50]])
-
- def column_nb(self):
- # for the admin
- return self.target.column.col_number
-
- def importer_type(self):
- # for the admin
- return self.target.column.importer_type.name
-
- def format(self):
- if not self.is_set:
- return None
- if self.target.formater_type.formater_type == 'StrToBoolean':
- if self.value in ('False', '0'):
- return False
- elif self.value:
- return True
- return
- return self.value
-
- def save(self, *args, **kwargs):
- obj = super(TargetKey, self).save(*args, **kwargs)
- if not self.value:
- return obj
- associated_model = self.target.associated_model
- if associated_model and hasattr(self.target.associated_model,
- "add_key"):
- v = None
- # pk is given
- try:
- v = self.target.associated_model.objects.get(
- pk=unicode(int(self.value)))
- except (ValueError, self.target.associated_model.DoesNotExist):
- # try with txt_idx
- try:
- v = self.target.associated_model.objects.get(
- txt_idx=unicode(self.value))
- except self.target.associated_model.DoesNotExist:
- pass
- if v:
- v.add_key(self.key, importer=self.associated_import)
- return obj
-
-TARGET_MODELS = [
- ('OrganizationType', _(u"Organization type")),
- ('TitleType', _(u"Title")),
- ('SourceType', _(u"Source type")),
- ('AuthorType', _(u"Author type")),
- ('Format', _(u"Format")),
- ('archaeological_operations.models.OperationType', _(u"Operation type")),
- ('archaeological_operations.models.Period', _(u"Period")),
- ('archaeological_operations.models.ReportState', _(u"Report state")),
- ('archaeological_operations.models.RemainType', _(u"Remain type")),
- ('archaeological_context_records.models.Unit', _(u"Unit")),
- ('archaeological_context_records.models.ActivityType',
- _(u"Activity type")),
- ('archaeological_context_records.models.DocumentationType',
- _(u"Documentation type")),
- ('archaeological_finds.models.MaterialType', _(u"Material")),
- ('archaeological_finds.models.ConservatoryState',
- _(u"Conservatory state")),
- ('archaeological_warehouse.models.ContainerType', _(u"Container type")),
- ('archaeological_finds.models.PreservationType', _(u"Preservation type")),
- ('archaeological_finds.models.ObjectType', _(u"Object type")),
- ('archaeological_finds.models.IntegrityType', _(u"Integrity type")),
- ('archaeological_finds.models.RemarkabilityType',
- _(u"Remarkability type")),
- ('archaeological_finds.models.BatchType', _(u"Batch type")),
- ('archaeological_context_records.models.IdentificationType',
- _("Identification type")),
- ('archaeological_context_records.models.RelationType',
- _(u"Context record relation type")),
- ('SpatialReferenceSystem', _(u"Spatial reference system")),
- ('SupportType', _(u"Support type")),
- ('TitleType', _(u"Title type")),
-]
-
-TARGET_MODELS_KEYS = [tm[0] for tm in TARGET_MODELS]
-
-IMPORTER_TYPES = (
- ('IntegerFormater', _(u"Integer")),
- ('FloatFormater', _(u"Float")),
- ('UnicodeFormater', _(u"String")),
- ('DateFormater', _(u"Date")),
- ('TypeFormater', _(u"Type")),
- ('YearFormater', _(u"Year")),
- ('StrToBoolean', _(u"String to boolean")),
- ('FileFormater', pgettext_lazy("filesystem", u"File")),
- ('UnknowType', _(u"Unknow type"))
-)
-
-IMPORTER_TYPES_DCT = {
- 'IntegerFormater': IntegerFormater,
- 'FloatFormater': FloatFormater,
- 'UnicodeFormater': UnicodeFormater,
- 'DateFormater': DateFormater,
- 'TypeFormater': TypeFormater,
- 'YearFormater': YearFormater,
- 'StrToBoolean': StrToBoolean,
- 'FileFormater': FileFormater,
- 'UnknowType': None,
-}
-
-DATE_FORMATS = (
- ('%Y', _(u"4 digit year. e.g.: \"2015\"")),
- ('%Y/%m/%d', _(u"4 digit year/month/day. e.g.: \"2015/02/04\"")),
- ('%d/%m/%Y', _(u"Day/month/4 digit year. e.g.: \"04/02/2015\"")),
-)
-
-IMPORTER_TYPES_CHOICES = {'TypeFormater': TARGET_MODELS,
- 'DateFormater': DATE_FORMATS}
-
-
-class FormaterTypeManager(models.Manager):
- def get_by_natural_key(self, formater_type, options, many_split):
- return self.get(formater_type=formater_type,
- options=options, many_split=many_split)
-
-
-class FormaterType(models.Model):
- formater_type = models.CharField(u"Formater type", max_length=20,
- choices=IMPORTER_TYPES)
- options = models.CharField(_(u"Options"), max_length=500, blank=True,
- null=True)
- many_split = models.CharField(_(u"Split character(s)"), max_length=10,
- blank=True, null=True)
- objects = FormaterTypeManager()
-
- class Meta:
- verbose_name = _(u"Importer - Formater type")
- verbose_name_plural = _(u"Importer - Formater types")
- unique_together = ('formater_type', 'options', 'many_split')
- ordering = ('formater_type', 'options')
-
- def natural_key(self):
- return self.formater_type, self.options, self.many_split
-
- def __unicode__(self):
- return u" - ".join(
- [unicode(dict(IMPORTER_TYPES)[self.formater_type])
- if self.formater_type in IMPORTER_TYPES_DCT else ''] +
- [getattr(self, k) for k in ('options', 'many_split')
- if getattr(self, k)])
-
- def get_choices(self):
- if self.format_type in IMPORTER_TYPES_CHOICES:
- return IMPORTER_TYPES_CHOICES[self.format_type]
-
- def get_formater_type(self, target, import_instance=None):
- if self.formater_type not in IMPORTER_TYPES_DCT.keys():
- return
- kwargs = {'db_target': target, 'import_instance': import_instance}
- if self.many_split:
- kwargs['many_split'] = self.many_split
- if self.formater_type == 'TypeFormater':
- if self.options not in TARGET_MODELS_KEYS:
- logger.warning(
- "**WARN FormaterType.get_formater_type**: {} "
- "is not in TARGET_MODELS_KEYS".format(self.options))
- return
- model = None
- if self.options in dir():
- model = dir()[self.options]
- else:
- model = import_class(self.options)
- return TypeFormater(model, **kwargs)
- elif self.formater_type == 'UnicodeFormater':
- if self.options:
- try:
- return UnicodeFormater(int(self.options.strip()), **kwargs)
- except ValueError:
- pass
- return UnicodeFormater(**kwargs)
- elif self.formater_type == 'DateFormater':
- date_formats = self.options
- if self.many_split:
- date_formats = self.options.split(kwargs.pop('many_split'))
- return DateFormater(date_formats, **kwargs)
- elif self.formater_type == 'StrToBoolean':
- return StrToBoolean(**kwargs)
- elif self.formater_type == 'UnknowType':
- return
- else:
- return IMPORTER_TYPES_DCT[self.formater_type](**kwargs)
-
-IMPORT_STATE = (("C", _(u"Created")),
- ("AP", _(u"Analyse in progress")),
- ("A", _(u"Analysed")),
- ("P", _(u"Import pending")),
- ("IP", _(u"Import in progress")),
- ("FE", _(u"Finished with errors")),
- ("F", _(u"Finished")),
- ("AC", _(u"Archived")),
- )
-
-IMPORT_STATE_DCT = dict(IMPORT_STATE)
-ENCODINGS = [(settings.ENCODING, settings.ENCODING),
- (settings.ALT_ENCODING, settings.ALT_ENCODING),
- ('utf-8', 'utf-8')]
-
-
-class Import(models.Model):
- user = models.ForeignKey('IshtarUser')
- name = models.CharField(_(u"Name"), max_length=500,
- blank=True, null=True)
- importer_type = models.ForeignKey(ImporterType)
- imported_file = models.FileField(
- _(u"Imported file"), upload_to="upload/imports/", max_length=220)
- imported_images = models.FileField(
- _(u"Associated images (zip file)"), upload_to="upload/imports/",
- blank=True, null=True, max_length=220)
- encoding = models.CharField(_(u"Encoding"), choices=ENCODINGS,
- default=u'utf-8', max_length=15)
- skip_lines = models.IntegerField(_(u"Skip lines"), default=1)
- error_file = models.FileField(_(u"Error file"),
- upload_to="upload/imports/",
- blank=True, null=True, max_length=255)
- result_file = models.FileField(_(u"Result file"),
- upload_to="upload/imports/",
- blank=True, null=True, max_length=255)
- match_file = models.FileField(_(u"Match file"),
- upload_to="upload/imports/",
- blank=True, null=True, max_length=255)
- state = models.CharField(_(u"State"), max_length=2, choices=IMPORT_STATE,
- default=u'C')
- conservative_import = models.BooleanField(
- _(u"Conservative import"), default=False,
- help_text='If set to true, do not overload existing values')
- creation_date = models.DateTimeField(
- _(u"Creation date"), auto_now_add=True, blank=True, null=True)
- end_date = models.DateTimeField(_(u"End date"), blank=True,
- null=True, editable=False)
- seconds_remaining = models.IntegerField(
- _(u"Remaining seconds"), blank=True, null=True, editable=False)
-
- class Meta:
- verbose_name = _(u"Import")
- verbose_name_plural = _(u"Imports")
-
- def __unicode__(self):
- return u"{} | {}".format(self.name or u"-", self.importer_type)
-
- def need_matching(self):
- return bool(TargetKey.objects.filter(associated_import=self,
- is_set=False).count())
-
- @property
- def errors(self):
- if not self.error_file:
- return []
- errors = []
- with open(self.error_file.path, 'rb') as csvfile:
- reader = csv.DictReader(csvfile, fieldnames=['line', 'column',
- 'error'])
- reader.next() # pass the header
- for row in reader:
- errors.append(row)
- return errors
-
- def get_actions(self):
- """
- Get available action relevant with the current status
- """
- actions = []
- if self.state == 'C':
- actions.append(('A', _(u"Analyse")))
- if self.state == 'A':
- actions.append(('A', _(u"Re-analyse")))
- actions.append(('I', _(u"Launch import")))
- if self.state in ('F', 'FE'):
- actions.append(('A', _(u"Re-analyse")))
- actions.append(('I', _(u"Re-import")))
- actions.append(('AC', _(u"Archive")))
- if self.state == 'AC':
- actions.append(('A', _(u"Unarchive")))
- actions.append(('D', _(u"Delete")))
- return actions
-
- @property
- def imported_filename(self):
- return self.imported_file.name.split(os.sep)[-1]
-
- @property
- def status(self):
- if self.state not in IMPORT_STATE_DCT:
- return ""
- return IMPORT_STATE_DCT[self.state]
-
- def get_importer_instance(self):
- return self.importer_type.get_importer_class(import_instance=self)(
- skip_lines=self.skip_lines, import_instance=self,
- conservative_import=self.conservative_import)
-
- @property
- def data_table(self):
- imported_file = self.imported_file.path
- tmpdir = None
- if zipfile.is_zipfile(imported_file):
- z = zipfile.ZipFile(imported_file)
- filename = None
- for name in z.namelist():
- # get first CSV file found
- if name.endswith('.csv'):
- filename = name
- break
- if not filename:
- return []
- tmpdir = tempfile.mkdtemp(prefix='tmp-ishtar-')
- imported_file = z.extract(filename, tmpdir)
-
- encodings = [self.encoding]
- encodings += [coding for coding, c in ENCODINGS
- if coding != self.encoding]
- for encoding in encodings:
- try:
- with open(imported_file) as csv_file:
- vals = [line
- for line in unicodecsv.reader(csv_file,
- encoding=encoding)]
- if tmpdir:
- shutil.rmtree(tmpdir)
- return vals
- except UnicodeDecodeError:
- pass # try the next encoding
- if tmpdir:
- shutil.rmtree(tmpdir)
- return []
-
- def initialize(self):
- self.state = 'AP'
- self.save()
- self.get_importer_instance().initialize(self.data_table, output='db')
- self.state = 'A'
- self.save()
-
- def importation(self):
- self.state = 'IP'
- self.save()
- importer = self.get_importer_instance()
- importer.importation(self.data_table)
- # result file
- filename = slugify(self.importer_type.name)
- now = datetime.datetime.now().isoformat('-').replace(':', '')
- result_file = filename + "_result_%s.csv" % now
- result_file = os.sep.join([self.result_file.storage.location,
- result_file])
- with open(result_file, 'w') as fle:
- fle.write(importer.get_csv_result().encode('utf-8'))
- self.result_file = File(open(fle.name))
- if importer.errors:
- self.state = 'FE'
- error_file = filename + "_errors_%s.csv" % now
- error_file = os.sep.join([self.error_file.storage.location,
- error_file])
- with open(error_file, 'w') as fle:
- fle.write(importer.get_csv_errors().encode('utf-8'))
- self.error_file = File(open(fle.name))
- else:
- self.state = 'F'
- self.error_file = None
- if importer.match_table:
- match_file = filename + "_match_%s.csv" % now
- match_file = os.sep.join([self.match_file.storage.location,
- match_file])
- with open(match_file, 'w') as fle:
- fle.write(importer.get_csv_matches().encode('utf-8'))
- self.match_file = File(open(fle.name))
- self.save()
-
- def archive(self):
- self.state = 'AC'
- self.save()
-
- def get_all_imported(self):
- imported = []
- for related, zorg in \
- self._meta.get_all_related_m2m_objects_with_model():
- accessor = related.get_accessor_name()
- imported += [(accessor, obj)
- for obj in getattr(self, accessor).all()]
- return imported
-
-
-def pre_delete_import(sender, **kwargs):
- # deleted imported items when an import is delete
- instance = kwargs.get('instance')
- if not instance:
- return
- to_delete = []
- for accessor, imported in instance.get_all_imported():
- to_delete.append(imported)
- for item in to_delete:
- item.delete()
-
-
-pre_delete.connect(pre_delete_import, sender=Import)
-
class Organization(Address, Merge, OwnPerms, ValueGetter):
TABLE_COLS = ('name', 'organization_type', 'town')
diff --git a/ishtar_common/models_imports.py b/ishtar_common/models_imports.py
new file mode 100644
index 000000000..dcb02c27e
--- /dev/null
+++ b/ishtar_common/models_imports.py
@@ -0,0 +1,912 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright (C) 2017 Étienne Loks <etienne.loks_AT_peacefrogsDOTnet>
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+# See the file COPYING for details.
+
+import csv
+import datetime
+from importlib import import_module
+import os
+import logging
+import shutil
+import re
+import tempfile
+import unicodecsv
+import zipfile
+
+from django.conf import settings
+from django.contrib.gis.db import models
+from django.core.exceptions import SuspiciousOperation
+from django.core.files import File
+from django.db.models.base import ModelBase
+from django.db.models.signals import pre_delete
+from django.template.defaultfilters import slugify
+from django.utils.translation import ugettext_lazy as _, pgettext_lazy
+
+from ishtar_common.utils import create_slug
+from ishtar_common.data_importer import Importer, ImportFormater, \
+ IntegerFormater, FloatFormater, UnicodeFormater, DateFormater, \
+ TypeFormater, YearFormater, StrToBoolean, FileFormater
+
+logger = logging.getLogger(__name__)
+
+IMPORTER_CLASSES = {}
+
+IMPORTER_CLASSES.update({
+ 'sra-pdl-files':
+ 'archaeological_files.data_importer.FileImporterSraPdL'})
+
+
+def get_model_fields(model):
+ """
+ Return a dict of fields from model
+ To be replace in Django 1.8 with get_fields, get_field
+ """
+ fields = {}
+ options = model._meta
+ for field in sorted(options.fields + options.many_to_many):
+ fields[field.name] = field
+ if hasattr(model, 'get_extra_fields'):
+ fields.update(model.get_extra_fields())
+ return fields
+
+
+def import_class(full_path_classname):
+ """
+ Return the model class from the full path
+ TODO: add a white list for more security
+ """
+ mods = full_path_classname.split('.')
+ if len(mods) == 1:
+ mods = ['ishtar_common', 'models', mods[0]]
+ elif 'models' not in mods and 'models_finds' not in mods \
+ and 'models_treatments' not in mods:
+ raise SuspiciousOperation(
+ u"Try to import a non model from a string")
+ module = import_module('.'.join(mods[:-1]))
+ return getattr(module, mods[-1])
+
+
+class ImportModelManager(models.Manager):
+ def get_by_natural_key(self, klass):
+ return self.get(klass=klass)
+
+
+class ImporterModel(models.Model):
+ name = models.CharField(_(u"Name"), max_length=200)
+ klass = models.CharField(_(u"Class name"), max_length=200, unique=True)
+ objects = ImportModelManager()
+
+ class Meta:
+ verbose_name = _(u"Importer - Model")
+ verbose_name_plural = _(u"Importer - Models")
+ ordering = ('name',)
+
+ def __unicode__(self):
+ return self.name
+
+ def natural_key(self):
+ return (self.klass, )
+
+
+class ImporterTypeManager(models.Manager):
+ def get_by_natural_key(self, slug):
+ return self.get(slug=slug)
+
+
+class ImporterType(models.Model):
+ """
+ Description of a table to be mapped with ishtar database
+ """
+ name = models.CharField(_(u"Name"), blank=True, null=True,
+ max_length=100)
+ slug = models.SlugField(_(u"Slug"), unique=True, max_length=100,
+ blank=True, null=True)
+ description = models.CharField(_(u"Description"), blank=True, null=True,
+ max_length=500)
+ users = models.ManyToManyField('IshtarUser', verbose_name=_(u"Users"),
+ blank=True)
+ associated_models = models.ForeignKey(
+ ImporterModel, verbose_name=_(u"Associated model"),
+ related_name='+', blank=True, null=True)
+ created_models = models.ManyToManyField(
+ ImporterModel, verbose_name=_(u"Models that can accept new items"),
+ blank=True, help_text=_(u"Leave blank for no restrictions"),
+ related_name='+')
+ is_template = models.BooleanField(_(u"Is template"), default=False)
+ unicity_keys = models.CharField(_(u"Unicity keys (separator \";\")"),
+ blank=True, null=True, max_length=500)
+ objects = ImporterTypeManager()
+
+ class Meta:
+ verbose_name = _(u"Importer - Type")
+ verbose_name_plural = _(u"Importer - Types")
+ ordering = ('name',)
+
+ def natural_key(self):
+ return (self.slug, )
+
+ def __unicode__(self):
+ return self.name
+
+ def get_importer_class(self, import_instance=None):
+ if self.slug and self.slug in IMPORTER_CLASSES:
+ cls = import_class(IMPORTER_CLASSES[self.slug])
+ return cls
+ OBJECT_CLS = import_class(self.associated_models.klass)
+ DEFAULTS = dict([(default.keys, default.values)
+ for default in self.defaults.all()])
+ LINE_FORMAT = []
+ idx = 0
+ for column in self.columns.order_by('col_number').all():
+ idx += 1
+ while column.col_number > idx:
+ LINE_FORMAT.append(None)
+ idx += 1
+ targets = []
+ formater_types = []
+ nb = column.targets.count()
+ if not nb:
+ LINE_FORMAT.append(None)
+ continue
+ force_news = []
+ concat_str = []
+ for target in column.targets.all():
+ ft = target.formater_type.get_formater_type(
+ target, import_instance=import_instance)
+ if not ft:
+ continue
+ formater_types.append(ft)
+ targets.append(target.target)
+ concat_str.append(target.concat_str)
+ force_news.append(target.force_new)
+ formater_kwargs = {}
+ if column.regexp_pre_filter:
+ formater_kwargs['regexp'] = re.compile(
+ column.regexp_pre_filter.regexp)
+ formater_kwargs['concat_str'] = concat_str
+ formater_kwargs['duplicate_fields'] = [
+ (field.field_name, field.force_new, field.concat,
+ field.concat_str)
+ for field in column.duplicate_fields.all()]
+ formater_kwargs['label'] = column.label
+ formater_kwargs['required'] = column.required
+ formater_kwargs['force_new'] = force_news
+ if column.export_field_name:
+ formater_kwargs['export_field_name'] = [
+ column.export_field_name]
+ formater = ImportFormater(targets, formater_types,
+ **formater_kwargs)
+ LINE_FORMAT.append(formater)
+ UNICITY_KEYS = []
+ if self.unicity_keys:
+ UNICITY_KEYS = [un.strip() for un in self.unicity_keys.split(';')]
+ MODEL_CREATION_LIMIT = []
+ for modls in self.created_models.all():
+ MODEL_CREATION_LIMIT.append(import_class(modls.klass))
+ args = {'OBJECT_CLS': OBJECT_CLS, 'DESC': self.description,
+ 'DEFAULTS': DEFAULTS, 'LINE_FORMAT': LINE_FORMAT,
+ 'UNICITY_KEYS': UNICITY_KEYS,
+ 'MODEL_CREATION_LIMIT': MODEL_CREATION_LIMIT}
+ name = str(''.join(
+ x for x in slugify(self.name).replace('-', ' ').title()
+ if not x.isspace()))
+ newclass = type(name, (Importer,), args)
+ return newclass
+
+ def save(self, *args, **kwargs):
+ if not self.slug:
+ self.slug = create_slug(ImporterType, self.name)
+ return super(ImporterType, self).save(*args, **kwargs)
+
+
+def get_associated_model(parent_model, keys):
+ model = None
+ if isinstance(parent_model, unicode) or \
+ isinstance(parent_model, str):
+ OBJECT_CLS = import_class(parent_model)
+ else:
+ OBJECT_CLS = parent_model
+ for idx, item in enumerate(keys):
+ if not idx:
+ field = get_model_fields(OBJECT_CLS)[item]
+ if hasattr(field, 'rel') and hasattr(field.rel, 'to'):
+ model = field.rel.to
+ if type(field) == ModelBase:
+ model = field
+ else:
+ return get_associated_model(model, keys[1:])
+ return model
+
+
+class ImporterDefaultManager(models.Manager):
+ def get_by_natural_key(self, importer_type, target):
+ return self.get(importer_type__slug=importer_type, target=target)
+
+
+class ImporterDefault(models.Model):
+ """
+ Targets of default values in an import
+ """
+ importer_type = models.ForeignKey(ImporterType, related_name='defaults')
+ target = models.CharField(u"Target", max_length=500)
+
+ class Meta:
+ verbose_name = _(u"Importer - Default")
+ verbose_name_plural = _(u"Importer - Defaults")
+ unique_together = ('importer_type', 'target')
+ objects = ImporterDefaultManager()
+
+ def __unicode__(self):
+ return u"{} - {}".format(self.importer_type, self.target)
+
+ def natural_key(self):
+ return self.importer_type.slug, self.target
+
+ @property
+ def keys(self):
+ return tuple(self.target.split('__'))
+
+ @property
+ def associated_model(self):
+ return get_associated_model(self.importer_type.associated_models.klass,
+ self.keys)
+
+ @property
+ def values(self):
+ values = {}
+ for default_value in self.default_values.all():
+ values[default_value.target] = default_value.get_value()
+ return values
+
+
+class ImporterDefaultValuesManager(models.Manager):
+ def get_by_natural_key(self, def_target_type, def_target, target):
+ return self.get(default_target__importer_type__slug=def_target_type,
+ default_target__target=def_target,
+ target=target)
+
+
+class ImporterDefaultValues(models.Model):
+ """
+ Default values in an import
+ """
+ default_target = models.ForeignKey(ImporterDefault,
+ related_name='default_values')
+ target = models.CharField(u"Target", max_length=500)
+ value = models.CharField(u"Value", max_length=500)
+ objects = ImporterDefaultValuesManager()
+
+ def __unicode__(self):
+ return u"{} - {}".format(self.default_target, self.target, self.value)
+
+ class Meta:
+ verbose_name = _(u"Importer - Default value")
+ verbose_name_plural = _(u"Importer - Default values")
+
+ def natural_key(self):
+ return (self.default_target.importer_type.slug,
+ self.default_target.target,
+ self.target)
+
+ def get_value(self):
+ parent_model = self.default_target.associated_model
+ if not parent_model:
+ return self.value
+ fields = get_model_fields(parent_model)
+ target = self.target.strip()
+ if target not in fields:
+ return
+ field = fields[target]
+ if not hasattr(field, 'rel') or not hasattr(field.rel, 'to'):
+ return
+ model = field.rel.to
+ # if value is an id
+ try:
+ return model.objects.get(pk=int(self.value))
+ except (ValueError, model.DoesNotExist):
+ pass
+ # try with txt_idx
+ try:
+ return model.objects.get(txt_idx=self.value)
+ except (ValueError, model.DoesNotExist):
+ pass
+ return ""
+
+
+class ImporterColumnManager(models.Manager):
+ def get_by_natural_key(self, importer_type, col_number):
+ return self.get(importer_type__slug=importer_type,
+ col_number=col_number)
+
+
+class ImporterColumn(models.Model):
+ """
+ Import file column description
+ """
+ label = models.CharField(_(u"Label"), blank=True, null=True,
+ max_length=200)
+ importer_type = models.ForeignKey(ImporterType, related_name='columns')
+ col_number = models.IntegerField(_(u"Column number"), default=1)
+ description = models.TextField(_("Description"), blank=True, null=True)
+ regexp_pre_filter = models.ForeignKey("Regexp", blank=True, null=True)
+ required = models.BooleanField(_(u"Required"), default=False)
+ export_field_name = models.CharField(
+ _(u"Export field name"), blank=True, null=True, max_length=200,
+ help_text=_(u"Fill this field if the field name is ambiguous for "
+ u"export. For instance: concatenated fields.")
+ )
+ objects = ImporterColumnManager()
+
+ class Meta:
+ verbose_name = _(u"Importer - Column")
+ verbose_name_plural = _(u"Importer - Columns")
+ ordering = ('importer_type', 'col_number')
+ unique_together = ('importer_type', 'col_number')
+
+ def __unicode__(self):
+ return u"{} - {}".format(self.importer_type, self.col_number)
+
+ def natural_key(self):
+ return self.importer_type.slug, self.col_number
+
+ def targets_lbl(self):
+ return u', '.join([target.target for target in self.targets.all()])
+
+ def duplicate_fields_lbl(self):
+ return u', '.join([dp.field_name
+ for dp in self.duplicate_fields.all()])
+
+
+class ImporterDuplicateFieldManager(models.Manager):
+ def get_by_natural_key(self, importer_type, col_number, field_name):
+ return self.get(column__importer_type__slug=importer_type,
+ column__col_number=col_number,
+ field_name=field_name)
+
+
+class ImporterDuplicateField(models.Model):
+ """
+ Direct copy of result in other fields
+ """
+ column = models.ForeignKey(ImporterColumn, related_name='duplicate_fields')
+ field_name = models.CharField(_(u"Field name"), blank=True, null=True,
+ max_length=200)
+ force_new = models.BooleanField(_(u"Force creation of new items"),
+ default=False)
+ concat = models.BooleanField(_(u"Concatenate with existing"),
+ default=False)
+ concat_str = models.CharField(_(u"Concatenate character"), max_length=5,
+ blank=True, null=True)
+ objects = ImporterDuplicateFieldManager()
+
+ class Meta:
+ verbose_name = _(u"Importer - Duplicate field")
+ verbose_name_plural = _(u"Importer - Duplicate fields")
+ ordering = ('column', 'field_name')
+
+ def natural_key(self):
+ return self.column.importer_type, self.column.col_number, \
+ self.field_name
+
+
+class NamedManager(models.Manager):
+ def get_by_natural_key(self, name):
+ return self.get(name=name)
+
+
+class Regexp(models.Model):
+ name = models.CharField(_(u"Name"), max_length=100, unique=True)
+ description = models.CharField(_(u"Description"), blank=True, null=True,
+ max_length=500)
+ regexp = models.CharField(_(u"Regular expression"), max_length=500)
+ objects = NamedManager()
+
+ class Meta:
+ verbose_name = _(u"Importer - Regular expression")
+ verbose_name_plural = _(u"Importer - Regular expressions")
+
+ def __unicode__(self):
+ return self.name
+
+ def natural_key(self):
+ return (self.name, )
+
+
+class ImportTargetManager(models.Manager):
+ def get_by_natural_key(self, importer_type, col_number, target):
+ return self.get(column__importer_type__slug=importer_type,
+ column__col_number=col_number,
+ target=target)
+
+
+class ImportTarget(models.Model):
+ """
+ Ishtar database target for a column
+ """
+ column = models.ForeignKey(ImporterColumn, related_name='targets')
+ target = models.CharField(u"Target", max_length=500)
+ regexp_filter = models.ForeignKey("Regexp", blank=True, null=True)
+ formater_type = models.ForeignKey("FormaterType")
+ force_new = models.BooleanField(_(u"Force creation of new items"),
+ default=False)
+ concat = models.BooleanField(_(u"Concatenate with existing"),
+ default=False)
+ concat_str = models.CharField(_(u"Concatenate character"), max_length=5,
+ blank=True, null=True)
+ comment = models.TextField(_(u"Comment"), blank=True, null=True)
+ objects = ImportTargetManager()
+
+ class Meta:
+ verbose_name = _(u"Importer - Target")
+ verbose_name_plural = _(u"Importer - Targets")
+ unique_together = ('column', 'target')
+
+ def __unicode__(self):
+ return self.target[:50] if self.target else self.comment
+
+ def natural_key(self):
+ return self.column.importer_type.slug, self.column.col_number, \
+ self.target
+
+ @property
+ def associated_model(self):
+ try:
+ return get_associated_model(
+ self.column.importer_type.associated_models.klass,
+ self.target.split('__'))
+ except KeyError:
+ return
+
+ def get_choices(self):
+ if self.formater_type.formater_type == 'UnknowType' \
+ and self.column.importer_type.slug:
+ cls = self.column.importer_type.get_importer_class()
+ formt = cls().line_format[self.column.col_number - 1]
+ if hasattr(formt.formater, 'choices'):
+ return [('', '--' * 8)] + list(formt.formater.choices)
+ return [('', '--' * 8)]
+ if self.formater_type.formater_type == 'StrToBoolean':
+ return [('', '--' * 8),
+ ('True', _(u"True")),
+ ('False', _(u"False"))]
+ if not self.associated_model or not hasattr(self.associated_model,
+ 'get_types'):
+ return []
+ return self.associated_model.get_types()
+
+
+class TargetKey(models.Model):
+ """
+ User's link between import source and ishtar database.
+ Also temporary used for GeneralType to point missing link before adding
+ them in ItemKey table.
+ A targetkey connection can be create to be applied to on particular
+ import (associated_import), one particular user (associated_user) or to all
+ imports (associated_import and associated_user are empty).
+ """
+ target = models.ForeignKey(ImportTarget, related_name='keys')
+ key = models.TextField(_(u"Key"))
+ value = models.TextField(_(u"Value"), blank=True, null=True)
+ is_set = models.BooleanField(_(u"Is set"), default=False)
+ associated_import = models.ForeignKey('Import', blank=True, null=True)
+ associated_user = models.ForeignKey('IshtarUser', blank=True, null=True)
+
+ class Meta:
+ unique_together = ('target', 'key', 'associated_user',
+ 'associated_import')
+ verbose_name = _(u"Importer - Target key")
+ verbose_name_plural = _(u"Importer - Targets keys")
+
+ def __unicode__(self):
+ return u" - ".join([unicode(self.target), self.key[:50]])
+
+ def column_nb(self):
+ # for the admin
+ return self.target.column.col_number
+
+ def importer_type(self):
+ # for the admin
+ return self.target.column.importer_type.name
+
+ def format(self):
+ if not self.is_set:
+ return None
+ if self.target.formater_type.formater_type == 'StrToBoolean':
+ if self.value in ('False', '0'):
+ return False
+ elif self.value:
+ return True
+ return
+ return self.value
+
+ def save(self, *args, **kwargs):
+ obj = super(TargetKey, self).save(*args, **kwargs)
+ if not self.value:
+ return obj
+ associated_model = self.target.associated_model
+ if associated_model and hasattr(self.target.associated_model,
+ "add_key"):
+ v = None
+ # pk is given
+ try:
+ v = self.target.associated_model.objects.get(
+ pk=unicode(int(self.value)))
+ except (ValueError, self.target.associated_model.DoesNotExist):
+ # try with txt_idx
+ try:
+ v = self.target.associated_model.objects.get(
+ txt_idx=unicode(self.value))
+ except self.target.associated_model.DoesNotExist:
+ pass
+ if v:
+ v.add_key(self.key, importer=self.associated_import)
+ return obj
+
+TARGET_MODELS = [
+ ('OrganizationType', _(u"Organization type")),
+ ('TitleType', _(u"Title")),
+ ('SourceType', _(u"Source type")),
+ ('AuthorType', _(u"Author type")),
+ ('Format', _(u"Format")),
+ ('archaeological_operations.models.OperationType', _(u"Operation type")),
+ ('archaeological_operations.models.Period', _(u"Period")),
+ ('archaeological_operations.models.ReportState', _(u"Report state")),
+ ('archaeological_operations.models.RemainType', _(u"Remain type")),
+ ('archaeological_context_records.models.Unit', _(u"Unit")),
+ ('archaeological_context_records.models.ActivityType',
+ _(u"Activity type")),
+ ('archaeological_context_records.models.DocumentationType',
+ _(u"Documentation type")),
+ ('archaeological_finds.models.MaterialType', _(u"Material")),
+ ('archaeological_finds.models.ConservatoryState',
+ _(u"Conservatory state")),
+ ('archaeological_warehouse.models.ContainerType', _(u"Container type")),
+ ('archaeological_finds.models.PreservationType', _(u"Preservation type")),
+ ('archaeological_finds.models.ObjectType', _(u"Object type")),
+ ('archaeological_finds.models.IntegrityType', _(u"Integrity type")),
+ ('archaeological_finds.models.RemarkabilityType',
+ _(u"Remarkability type")),
+ ('archaeological_finds.models.BatchType', _(u"Batch type")),
+ ('archaeological_context_records.models.IdentificationType',
+ _("Identification type")),
+ ('archaeological_context_records.models.RelationType',
+ _(u"Context record relation type")),
+ ('SpatialReferenceSystem', _(u"Spatial reference system")),
+ ('SupportType', _(u"Support type")),
+ ('TitleType', _(u"Title type")),
+]
+
+TARGET_MODELS_KEYS = [tm[0] for tm in TARGET_MODELS]
+
+IMPORTER_TYPES = (
+ ('IntegerFormater', _(u"Integer")),
+ ('FloatFormater', _(u"Float")),
+ ('UnicodeFormater', _(u"String")),
+ ('DateFormater', _(u"Date")),
+ ('TypeFormater', _(u"Type")),
+ ('YearFormater', _(u"Year")),
+ ('StrToBoolean', _(u"String to boolean")),
+ ('FileFormater', pgettext_lazy("filesystem", u"File")),
+ ('UnknowType', _(u"Unknow type"))
+)
+
+IMPORTER_TYPES_DCT = {
+ 'IntegerFormater': IntegerFormater,
+ 'FloatFormater': FloatFormater,
+ 'UnicodeFormater': UnicodeFormater,
+ 'DateFormater': DateFormater,
+ 'TypeFormater': TypeFormater,
+ 'YearFormater': YearFormater,
+ 'StrToBoolean': StrToBoolean,
+ 'FileFormater': FileFormater,
+ 'UnknowType': None,
+}
+
+DATE_FORMATS = (
+ ('%Y', _(u"4 digit year. e.g.: \"2015\"")),
+ ('%Y/%m/%d', _(u"4 digit year/month/day. e.g.: \"2015/02/04\"")),
+ ('%d/%m/%Y', _(u"Day/month/4 digit year. e.g.: \"04/02/2015\"")),
+)
+
+IMPORTER_TYPES_CHOICES = {'TypeFormater': TARGET_MODELS,
+ 'DateFormater': DATE_FORMATS}
+
+
+class FormaterTypeManager(models.Manager):
+ def get_by_natural_key(self, formater_type, options, many_split):
+ return self.get(formater_type=formater_type,
+ options=options, many_split=many_split)
+
+
+class FormaterType(models.Model):
+ formater_type = models.CharField(u"Formater type", max_length=20,
+ choices=IMPORTER_TYPES)
+ options = models.CharField(_(u"Options"), max_length=500, blank=True,
+ null=True)
+ many_split = models.CharField(_(u"Split character(s)"), max_length=10,
+ blank=True, null=True)
+ objects = FormaterTypeManager()
+
+ class Meta:
+ verbose_name = _(u"Importer - Formater type")
+ verbose_name_plural = _(u"Importer - Formater types")
+ unique_together = ('formater_type', 'options', 'many_split')
+ ordering = ('formater_type', 'options')
+
+ def natural_key(self):
+ return self.formater_type, self.options, self.many_split
+
+ def __unicode__(self):
+ return u" - ".join(
+ [unicode(dict(IMPORTER_TYPES)[self.formater_type])
+ if self.formater_type in IMPORTER_TYPES_DCT else ''] +
+ [getattr(self, k) for k in ('options', 'many_split')
+ if getattr(self, k)])
+
+ def get_choices(self):
+ if self.format_type in IMPORTER_TYPES_CHOICES:
+ return IMPORTER_TYPES_CHOICES[self.format_type]
+
+ def get_formater_type(self, target, import_instance=None):
+ if self.formater_type not in IMPORTER_TYPES_DCT.keys():
+ return
+ kwargs = {'db_target': target, 'import_instance': import_instance}
+ if self.many_split:
+ kwargs['many_split'] = self.many_split
+ if self.formater_type == 'TypeFormater':
+ if self.options not in TARGET_MODELS_KEYS:
+ logger.warning(
+ "**WARN FormaterType.get_formater_type**: {} "
+ "is not in TARGET_MODELS_KEYS".format(self.options))
+ return
+ model = None
+ if self.options in dir():
+ model = dir()[self.options]
+ else:
+ model = import_class(self.options)
+ return TypeFormater(model, **kwargs)
+ elif self.formater_type == 'UnicodeFormater':
+ if self.options:
+ try:
+ return UnicodeFormater(int(self.options.strip()), **kwargs)
+ except ValueError:
+ pass
+ return UnicodeFormater(**kwargs)
+ elif self.formater_type == 'DateFormater':
+ date_formats = self.options
+ if self.many_split:
+ date_formats = self.options.split(kwargs.pop('many_split'))
+ return DateFormater(date_formats, **kwargs)
+ elif self.formater_type == 'StrToBoolean':
+ return StrToBoolean(**kwargs)
+ elif self.formater_type == 'UnknowType':
+ return
+ else:
+ return IMPORTER_TYPES_DCT[self.formater_type](**kwargs)
+
+IMPORT_STATE = (("C", _(u"Created")),
+ ("AP", _(u"Analyse in progress")),
+ ("A", _(u"Analysed")),
+ ("P", _(u"Import pending")),
+ ("IP", _(u"Import in progress")),
+ ("FE", _(u"Finished with errors")),
+ ("F", _(u"Finished")),
+ ("AC", _(u"Archived")),
+ )
+
+IMPORT_STATE_DCT = dict(IMPORT_STATE)
+ENCODINGS = [(settings.ENCODING, settings.ENCODING),
+ (settings.ALT_ENCODING, settings.ALT_ENCODING),
+ ('utf-8', 'utf-8')]
+
+
+class Import(models.Model):
+ user = models.ForeignKey('IshtarUser')
+ name = models.CharField(_(u"Name"), max_length=500,
+ blank=True, null=True)
+ importer_type = models.ForeignKey(ImporterType)
+ imported_file = models.FileField(
+ _(u"Imported file"), upload_to="upload/imports/", max_length=220)
+ imported_images = models.FileField(
+ _(u"Associated images (zip file)"), upload_to="upload/imports/",
+ blank=True, null=True, max_length=220)
+ encoding = models.CharField(_(u"Encoding"), choices=ENCODINGS,
+ default=u'utf-8', max_length=15)
+ skip_lines = models.IntegerField(_(u"Skip lines"), default=1)
+ error_file = models.FileField(_(u"Error file"),
+ upload_to="upload/imports/",
+ blank=True, null=True, max_length=255)
+ result_file = models.FileField(_(u"Result file"),
+ upload_to="upload/imports/",
+ blank=True, null=True, max_length=255)
+ match_file = models.FileField(_(u"Match file"),
+ upload_to="upload/imports/",
+ blank=True, null=True, max_length=255)
+ state = models.CharField(_(u"State"), max_length=2, choices=IMPORT_STATE,
+ default=u'C')
+ conservative_import = models.BooleanField(
+ _(u"Conservative import"), default=False,
+ help_text='If set to true, do not overload existing values')
+ creation_date = models.DateTimeField(
+ _(u"Creation date"), auto_now_add=True, blank=True, null=True)
+ end_date = models.DateTimeField(_(u"End date"), blank=True,
+ null=True, editable=False)
+ seconds_remaining = models.IntegerField(
+ _(u"Remaining seconds"), blank=True, null=True, editable=False)
+
+ class Meta:
+ verbose_name = _(u"Import")
+ verbose_name_plural = _(u"Imports")
+
+ def __unicode__(self):
+ return u"{} | {}".format(self.name or u"-", self.importer_type)
+
+ def need_matching(self):
+ return bool(TargetKey.objects.filter(associated_import=self,
+ is_set=False).count())
+
+ @property
+ def errors(self):
+ if not self.error_file:
+ return []
+ errors = []
+ with open(self.error_file.path, 'rb') as csvfile:
+ reader = csv.DictReader(csvfile, fieldnames=['line', 'column',
+ 'error'])
+ reader.next() # pass the header
+ for row in reader:
+ errors.append(row)
+ return errors
+
+ def get_actions(self):
+ """
+ Get available action relevant with the current status
+ """
+ actions = []
+ if self.state == 'C':
+ actions.append(('A', _(u"Analyse")))
+ if self.state == 'A':
+ actions.append(('A', _(u"Re-analyse")))
+ actions.append(('I', _(u"Launch import")))
+ if self.state in ('F', 'FE'):
+ actions.append(('A', _(u"Re-analyse")))
+ actions.append(('I', _(u"Re-import")))
+ actions.append(('AC', _(u"Archive")))
+ if self.state == 'AC':
+ actions.append(('A', _(u"Unarchive")))
+ actions.append(('D', _(u"Delete")))
+ return actions
+
+ @property
+ def imported_filename(self):
+ return self.imported_file.name.split(os.sep)[-1]
+
+ @property
+ def status(self):
+ if self.state not in IMPORT_STATE_DCT:
+ return ""
+ return IMPORT_STATE_DCT[self.state]
+
+ def get_importer_instance(self):
+ return self.importer_type.get_importer_class(import_instance=self)(
+ skip_lines=self.skip_lines, import_instance=self,
+ conservative_import=self.conservative_import)
+
+ @property
+ def data_table(self):
+ imported_file = self.imported_file.path
+ tmpdir = None
+ if zipfile.is_zipfile(imported_file):
+ z = zipfile.ZipFile(imported_file)
+ filename = None
+ for name in z.namelist():
+ # get first CSV file found
+ if name.endswith('.csv'):
+ filename = name
+ break
+ if not filename:
+ return []
+ tmpdir = tempfile.mkdtemp(prefix='tmp-ishtar-')
+ imported_file = z.extract(filename, tmpdir)
+
+ encodings = [self.encoding]
+ encodings += [coding for coding, c in ENCODINGS
+ if coding != self.encoding]
+ for encoding in encodings:
+ try:
+ with open(imported_file) as csv_file:
+ vals = [line
+ for line in unicodecsv.reader(csv_file,
+ encoding=encoding)]
+ if tmpdir:
+ shutil.rmtree(tmpdir)
+ return vals
+ except UnicodeDecodeError:
+ pass # try the next encoding
+ if tmpdir:
+ shutil.rmtree(tmpdir)
+ return []
+
+ def initialize(self):
+ self.state = 'AP'
+ self.save()
+ self.get_importer_instance().initialize(self.data_table, output='db')
+ self.state = 'A'
+ self.save()
+
+ def importation(self):
+ self.state = 'IP'
+ self.save()
+ importer = self.get_importer_instance()
+ importer.importation(self.data_table)
+ # result file
+ filename = slugify(self.importer_type.name)
+ now = datetime.datetime.now().isoformat('-').replace(':', '')
+ result_file = filename + "_result_%s.csv" % now
+ result_file = os.sep.join([self.result_file.storage.location,
+ result_file])
+ with open(result_file, 'w') as fle:
+ fle.write(importer.get_csv_result().encode('utf-8'))
+ self.result_file = File(open(fle.name))
+ if importer.errors:
+ self.state = 'FE'
+ error_file = filename + "_errors_%s.csv" % now
+ error_file = os.sep.join([self.error_file.storage.location,
+ error_file])
+ with open(error_file, 'w') as fle:
+ fle.write(importer.get_csv_errors().encode('utf-8'))
+ self.error_file = File(open(fle.name))
+ else:
+ self.state = 'F'
+ self.error_file = None
+ if importer.match_table:
+ match_file = filename + "_match_%s.csv" % now
+ match_file = os.sep.join([self.match_file.storage.location,
+ match_file])
+ with open(match_file, 'w') as fle:
+ fle.write(importer.get_csv_matches().encode('utf-8'))
+ self.match_file = File(open(fle.name))
+ self.save()
+
+ def archive(self):
+ self.state = 'AC'
+ self.save()
+
+ def get_all_imported(self):
+ imported = []
+ for related, zorg in \
+ self._meta.get_all_related_m2m_objects_with_model():
+ accessor = related.get_accessor_name()
+ imported += [(accessor, obj)
+ for obj in getattr(self, accessor).all()]
+ return imported
+
+
+def pre_delete_import(sender, **kwargs):
+ # deleted imported items when an import is delete
+ instance = kwargs.get('instance')
+ if not instance:
+ return
+ to_delete = []
+ for accessor, imported in instance.get_all_imported():
+ to_delete.append(imported)
+ for item in to_delete:
+ item.delete()
+
+
+pre_delete.connect(pre_delete_import, sender=Import)