diff options
author | Étienne Loks <etienne.loks@iggdrasil.net> | 2017-08-21 18:23:29 +0200 |
---|---|---|
committer | Étienne Loks <etienne.loks@iggdrasil.net> | 2017-08-21 18:23:29 +0200 |
commit | 44c98340f5978ff4818a66351235ab33df316f18 (patch) | |
tree | 0d2270c4b34eab262d9ab1f3936c5fe4f8942289 /ishtar_common | |
parent | 5a9821f832f632512182835b42741073da14ca75 (diff) | |
download | Ishtar-44c98340f5978ff4818a66351235ab33df316f18.tar.bz2 Ishtar-44c98340f5978ff4818a66351235ab33df316f18.zip |
Models: refactoring move all import models to a specific file
Diffstat (limited to 'ishtar_common')
-rw-r--r-- | ishtar_common/models.py | 898 | ||||
-rw-r--r-- | ishtar_common/models_imports.py | 912 |
2 files changed, 933 insertions, 877 deletions
diff --git a/ishtar_common/models.py b/ishtar_common/models.py index dd8204f21..725c3ac83 100644 --- a/ishtar_common/models.py +++ b/ishtar_common/models.py @@ -70,6 +70,17 @@ from ishtar_common.data_importer import Importer, ImportFormater, \ IntegerFormater, FloatFormater, UnicodeFormater, DateFormater, \ TypeFormater, YearFormater, StrToBoolean, FileFormater +from ishtar_common.models_imports import ImporterModel, ImporterType, \ + ImporterDefault, ImporterDefaultValues, ImporterColumn, \ + ImporterDuplicateField, Regexp, ImportTarget, TargetKey, FormaterType, \ + Import + +__all__ = [ + 'ImporterModel', 'ImporterType', 'ImporterDefault', 'ImporterDefaultValues', + 'ImporterColumn', 'ImporterDuplicateField', 'Regexp', 'ImportTarget', + 'TargetKey', 'FormaterType', 'Import' +] + logger = logging.getLogger(__name__) @@ -123,15 +134,6 @@ def check_model_access_control(request, model, available_perms=None): return allowed, own -class Imported(models.Model): - imports = models.ManyToManyField( - 'Import', blank=True, - related_name="imported_%(app_label)s_%(class)s") - - class Meta: - abstract = True - - class ValueGetter(object): _prefix = "" GET_VALUES_EXTRA = [] @@ -798,7 +800,7 @@ class ItemKey(models.Model): object_id = models.PositiveIntegerField() content_object = GenericForeignKey('content_type', 'object_id') importer = models.ForeignKey( - 'Import', null=True, blank=True, + Import, null=True, blank=True, help_text=_(u"Specific key to an import")) def __unicode__(self): @@ -913,6 +915,15 @@ class BulkUpdatedItem(object): return transaction_id, False +class Imported(models.Model): + imports = models.ManyToManyField( + Import, blank=True, + related_name="imported_%(app_label)s_%(class)s") + + class Meta: + abstract = True + + class BaseHistorizedItem(Imported): IS_BASKET = False history_modifier = models.ForeignKey( @@ -1880,873 +1891,6 @@ post_delete.connect(post_save_cache, sender=OrganizationType) organization_type_pk_lazy = lazy(OrganizationType.get_or_create_pk, unicode) organization_type_pks_lazy = lazy(OrganizationType.get_or_create_pks, unicode) -IMPORTER_CLASSES = {} - -IMPORTER_CLASSES.update({ - 'sra-pdl-files': - 'archaeological_files.data_importer.FileImporterSraPdL'}) - - -def get_model_fields(model): - """ - Return a dict of fields from model - To be replace in Django 1.8 with get_fields, get_field - """ - fields = {} - options = model._meta - for field in sorted(options.fields + options.many_to_many): - fields[field.name] = field - if hasattr(model, 'get_extra_fields'): - fields.update(model.get_extra_fields()) - return fields - - -def import_class(full_path_classname): - """ - Return the model class from the full path - TODO: add a white list for more security - """ - mods = full_path_classname.split('.') - if len(mods) == 1: - mods = ['ishtar_common', 'models', mods[0]] - elif 'models' not in mods and 'models_finds' not in mods \ - and 'models_treatments' not in mods: - raise SuspiciousOperation( - u"Try to import a non model from a string") - module = import_module('.'.join(mods[:-1])) - return getattr(module, mods[-1]) - - -class ImportModelManager(models.Manager): - def get_by_natural_key(self, klass): - return self.get(klass=klass) - - -class ImporterModel(models.Model): - name = models.CharField(_(u"Name"), max_length=200) - klass = models.CharField(_(u"Class name"), max_length=200, unique=True) - objects = ImportModelManager() - - class Meta: - verbose_name = _(u"Importer - Model") - verbose_name_plural = _(u"Importer - Models") - ordering = ('name',) - - def __unicode__(self): - return self.name - - def natural_key(self): - return (self.klass, ) - - -class ImporterTypeManager(models.Manager): - def get_by_natural_key(self, slug): - return self.get(slug=slug) - - -class ImporterType(models.Model): - """ - Description of a table to be mapped with ishtar database - """ - name = models.CharField(_(u"Name"), blank=True, null=True, - max_length=100) - slug = models.SlugField(_(u"Slug"), unique=True, max_length=100, - blank=True, null=True) - description = models.CharField(_(u"Description"), blank=True, null=True, - max_length=500) - users = models.ManyToManyField('IshtarUser', verbose_name=_(u"Users"), - blank=True) - associated_models = models.ForeignKey( - ImporterModel, verbose_name=_(u"Associated model"), - related_name='+', blank=True, null=True) - created_models = models.ManyToManyField( - ImporterModel, verbose_name=_(u"Models that can accept new items"), - blank=True, help_text=_(u"Leave blank for no restrictions"), - related_name='+') - is_template = models.BooleanField(_(u"Is template"), default=False) - unicity_keys = models.CharField(_(u"Unicity keys (separator \";\")"), - blank=True, null=True, max_length=500) - objects = ImporterTypeManager() - - class Meta: - verbose_name = _(u"Importer - Type") - verbose_name_plural = _(u"Importer - Types") - ordering = ('name',) - - def natural_key(self): - return (self.slug, ) - - def __unicode__(self): - return self.name - - def get_importer_class(self, import_instance=None): - if self.slug and self.slug in IMPORTER_CLASSES: - cls = import_class(IMPORTER_CLASSES[self.slug]) - return cls - OBJECT_CLS = import_class(self.associated_models.klass) - DEFAULTS = dict([(default.keys, default.values) - for default in self.defaults.all()]) - LINE_FORMAT = [] - idx = 0 - for column in self.columns.order_by('col_number').all(): - idx += 1 - while column.col_number > idx: - LINE_FORMAT.append(None) - idx += 1 - targets = [] - formater_types = [] - nb = column.targets.count() - if not nb: - LINE_FORMAT.append(None) - continue - force_news = [] - concat_str = [] - for target in column.targets.all(): - ft = target.formater_type.get_formater_type( - target, import_instance=import_instance) - if not ft: - continue - formater_types.append(ft) - targets.append(target.target) - concat_str.append(target.concat_str) - force_news.append(target.force_new) - formater_kwargs = {} - if column.regexp_pre_filter: - formater_kwargs['regexp'] = re.compile( - column.regexp_pre_filter.regexp) - formater_kwargs['concat_str'] = concat_str - formater_kwargs['duplicate_fields'] = [ - (field.field_name, field.force_new, field.concat, - field.concat_str) - for field in column.duplicate_fields.all()] - formater_kwargs['label'] = column.label - formater_kwargs['required'] = column.required - formater_kwargs['force_new'] = force_news - if column.export_field_name: - formater_kwargs['export_field_name'] = [ - column.export_field_name] - formater = ImportFormater(targets, formater_types, - **formater_kwargs) - LINE_FORMAT.append(formater) - UNICITY_KEYS = [] - if self.unicity_keys: - UNICITY_KEYS = [un.strip() for un in self.unicity_keys.split(';')] - MODEL_CREATION_LIMIT = [] - for modls in self.created_models.all(): - MODEL_CREATION_LIMIT.append(import_class(modls.klass)) - args = {'OBJECT_CLS': OBJECT_CLS, 'DESC': self.description, - 'DEFAULTS': DEFAULTS, 'LINE_FORMAT': LINE_FORMAT, - 'UNICITY_KEYS': UNICITY_KEYS, - 'MODEL_CREATION_LIMIT': MODEL_CREATION_LIMIT} - name = str(''.join( - x for x in slugify(self.name).replace('-', ' ').title() - if not x.isspace())) - newclass = type(name, (Importer,), args) - return newclass - - def save(self, *args, **kwargs): - if not self.slug: - self.slug = create_slug(ImporterType, self.name) - return super(ImporterType, self).save(*args, **kwargs) - - -def get_associated_model(parent_model, keys): - model = None - if isinstance(parent_model, unicode) or \ - isinstance(parent_model, str): - OBJECT_CLS = import_class(parent_model) - else: - OBJECT_CLS = parent_model - for idx, item in enumerate(keys): - if not idx: - field = get_model_fields(OBJECT_CLS)[item] - if hasattr(field, 'rel') and hasattr(field.rel, 'to'): - model = field.rel.to - if type(field) == ModelBase: - model = field - else: - return get_associated_model(model, keys[1:]) - return model - - -class ImporterDefaultManager(models.Manager): - def get_by_natural_key(self, importer_type, target): - return self.get(importer_type__slug=importer_type, target=target) - - -class ImporterDefault(models.Model): - """ - Targets of default values in an import - """ - importer_type = models.ForeignKey(ImporterType, related_name='defaults') - target = models.CharField(u"Target", max_length=500) - - class Meta: - verbose_name = _(u"Importer - Default") - verbose_name_plural = _(u"Importer - Defaults") - unique_together = ('importer_type', 'target') - objects = ImporterDefaultManager() - - def __unicode__(self): - return u"{} - {}".format(self.importer_type, self.target) - - def natural_key(self): - return self.importer_type.slug, self.target - - @property - def keys(self): - return tuple(self.target.split('__')) - - @property - def associated_model(self): - return get_associated_model(self.importer_type.associated_models.klass, - self.keys) - - @property - def values(self): - values = {} - for default_value in self.default_values.all(): - values[default_value.target] = default_value.get_value() - return values - - -class ImporterDefaultValuesManager(models.Manager): - def get_by_natural_key(self, def_target_type, def_target, target): - return self.get(default_target__importer_type__slug=def_target_type, - default_target__target=def_target, - target=target) - - -class ImporterDefaultValues(models.Model): - """ - Default values in an import - """ - default_target = models.ForeignKey(ImporterDefault, - related_name='default_values') - target = models.CharField(u"Target", max_length=500) - value = models.CharField(u"Value", max_length=500) - objects = ImporterDefaultValuesManager() - - def __unicode__(self): - return u"{} - {}".format(self.default_target, self.target, self.value) - - class Meta: - verbose_name = _(u"Importer - Default value") - verbose_name_plural = _(u"Importer - Default values") - - def natural_key(self): - return (self.default_target.importer_type.slug, - self.default_target.target, - self.target) - - def get_value(self): - parent_model = self.default_target.associated_model - if not parent_model: - return self.value - fields = get_model_fields(parent_model) - target = self.target.strip() - if target not in fields: - return - field = fields[target] - if not hasattr(field, 'rel') or not hasattr(field.rel, 'to'): - return - model = field.rel.to - # if value is an id - try: - return model.objects.get(pk=int(self.value)) - except (ValueError, model.DoesNotExist): - pass - # try with txt_idx - try: - return model.objects.get(txt_idx=self.value) - except (ValueError, model.DoesNotExist): - pass - return "" - - -class ImporterColumnManager(models.Manager): - def get_by_natural_key(self, importer_type, col_number): - return self.get(importer_type__slug=importer_type, - col_number=col_number) - - -class ImporterColumn(models.Model): - """ - Import file column description - """ - label = models.CharField(_(u"Label"), blank=True, null=True, - max_length=200) - importer_type = models.ForeignKey(ImporterType, related_name='columns') - col_number = models.IntegerField(_(u"Column number"), default=1) - description = models.TextField(_("Description"), blank=True, null=True) - regexp_pre_filter = models.ForeignKey("Regexp", blank=True, null=True) - required = models.BooleanField(_(u"Required"), default=False) - export_field_name = models.CharField( - _(u"Export field name"), blank=True, null=True, max_length=200, - help_text=_(u"Fill this field if the field name is ambiguous for " - u"export. For instance: concatenated fields.") - ) - objects = ImporterColumnManager() - - class Meta: - verbose_name = _(u"Importer - Column") - verbose_name_plural = _(u"Importer - Columns") - ordering = ('importer_type', 'col_number') - unique_together = ('importer_type', 'col_number') - - def __unicode__(self): - return u"{} - {}".format(self.importer_type, self.col_number) - - def natural_key(self): - return self.importer_type.slug, self.col_number - - def targets_lbl(self): - return u', '.join([target.target for target in self.targets.all()]) - - def duplicate_fields_lbl(self): - return u', '.join([dp.field_name - for dp in self.duplicate_fields.all()]) - - -class ImporterDuplicateFieldManager(models.Manager): - def get_by_natural_key(self, importer_type, col_number, field_name): - return self.get(column__importer_type__slug=importer_type, - column__col_number=col_number, - field_name=field_name) - - -class ImporterDuplicateField(models.Model): - """ - Direct copy of result in other fields - """ - column = models.ForeignKey(ImporterColumn, related_name='duplicate_fields') - field_name = models.CharField(_(u"Field name"), blank=True, null=True, - max_length=200) - force_new = models.BooleanField(_(u"Force creation of new items"), - default=False) - concat = models.BooleanField(_(u"Concatenate with existing"), - default=False) - concat_str = models.CharField(_(u"Concatenate character"), max_length=5, - blank=True, null=True) - objects = ImporterDuplicateFieldManager() - - class Meta: - verbose_name = _(u"Importer - Duplicate field") - verbose_name_plural = _(u"Importer - Duplicate fields") - ordering = ('column', 'field_name') - - def natural_key(self): - return self.column.importer_type, self.column.col_number, \ - self.field_name - - -class NamedManager(models.Manager): - def get_by_natural_key(self, name): - return self.get(name=name) - - -class Regexp(models.Model): - name = models.CharField(_(u"Name"), max_length=100, unique=True) - description = models.CharField(_(u"Description"), blank=True, null=True, - max_length=500) - regexp = models.CharField(_(u"Regular expression"), max_length=500) - objects = NamedManager() - - class Meta: - verbose_name = _(u"Importer - Regular expression") - verbose_name_plural = _(u"Importer - Regular expressions") - - def __unicode__(self): - return self.name - - def natural_key(self): - return (self.name, ) - - -class ImportTargetManager(models.Manager): - def get_by_natural_key(self, importer_type, col_number, target): - return self.get(column__importer_type__slug=importer_type, - column__col_number=col_number, - target=target) - - -class ImportTarget(models.Model): - """ - Ishtar database target for a column - """ - column = models.ForeignKey(ImporterColumn, related_name='targets') - target = models.CharField(u"Target", max_length=500) - regexp_filter = models.ForeignKey("Regexp", blank=True, null=True) - formater_type = models.ForeignKey("FormaterType") - force_new = models.BooleanField(_(u"Force creation of new items"), - default=False) - concat = models.BooleanField(_(u"Concatenate with existing"), - default=False) - concat_str = models.CharField(_(u"Concatenate character"), max_length=5, - blank=True, null=True) - comment = models.TextField(_(u"Comment"), blank=True, null=True) - objects = ImportTargetManager() - - class Meta: - verbose_name = _(u"Importer - Target") - verbose_name_plural = _(u"Importer - Targets") - unique_together = ('column', 'target') - - def __unicode__(self): - return self.target[:50] if self.target else self.comment - - def natural_key(self): - return self.column.importer_type.slug, self.column.col_number, \ - self.target - - @property - def associated_model(self): - try: - return get_associated_model( - self.column.importer_type.associated_models.klass, - self.target.split('__')) - except KeyError: - return - - def get_choices(self): - if self.formater_type.formater_type == 'UnknowType' \ - and self.column.importer_type.slug: - cls = self.column.importer_type.get_importer_class() - formt = cls().line_format[self.column.col_number - 1] - if hasattr(formt.formater, 'choices'): - return [('', '--' * 8)] + list(formt.formater.choices) - return [('', '--' * 8)] - if self.formater_type.formater_type == 'StrToBoolean': - return [('', '--' * 8), - ('True', _(u"True")), - ('False', _(u"False"))] - if not self.associated_model or not hasattr(self.associated_model, - 'get_types'): - return [] - return self.associated_model.get_types() - - -class TargetKey(models.Model): - """ - User's link between import source and ishtar database. - Also temporary used for GeneralType to point missing link before adding - them in ItemKey table. - A targetkey connection can be create to be applied to on particular - import (associated_import), one particular user (associated_user) or to all - imports (associated_import and associated_user are empty). - """ - target = models.ForeignKey(ImportTarget, related_name='keys') - key = models.TextField(_(u"Key")) - value = models.TextField(_(u"Value"), blank=True, null=True) - is_set = models.BooleanField(_(u"Is set"), default=False) - associated_import = models.ForeignKey('Import', blank=True, null=True) - associated_user = models.ForeignKey('IshtarUser', blank=True, null=True) - - class Meta: - unique_together = ('target', 'key', 'associated_user', - 'associated_import') - verbose_name = _(u"Importer - Target key") - verbose_name_plural = _(u"Importer - Targets keys") - - def __unicode__(self): - return u" - ".join([unicode(self.target), self.key[:50]]) - - def column_nb(self): - # for the admin - return self.target.column.col_number - - def importer_type(self): - # for the admin - return self.target.column.importer_type.name - - def format(self): - if not self.is_set: - return None - if self.target.formater_type.formater_type == 'StrToBoolean': - if self.value in ('False', '0'): - return False - elif self.value: - return True - return - return self.value - - def save(self, *args, **kwargs): - obj = super(TargetKey, self).save(*args, **kwargs) - if not self.value: - return obj - associated_model = self.target.associated_model - if associated_model and hasattr(self.target.associated_model, - "add_key"): - v = None - # pk is given - try: - v = self.target.associated_model.objects.get( - pk=unicode(int(self.value))) - except (ValueError, self.target.associated_model.DoesNotExist): - # try with txt_idx - try: - v = self.target.associated_model.objects.get( - txt_idx=unicode(self.value)) - except self.target.associated_model.DoesNotExist: - pass - if v: - v.add_key(self.key, importer=self.associated_import) - return obj - -TARGET_MODELS = [ - ('OrganizationType', _(u"Organization type")), - ('TitleType', _(u"Title")), - ('SourceType', _(u"Source type")), - ('AuthorType', _(u"Author type")), - ('Format', _(u"Format")), - ('archaeological_operations.models.OperationType', _(u"Operation type")), - ('archaeological_operations.models.Period', _(u"Period")), - ('archaeological_operations.models.ReportState', _(u"Report state")), - ('archaeological_operations.models.RemainType', _(u"Remain type")), - ('archaeological_context_records.models.Unit', _(u"Unit")), - ('archaeological_context_records.models.ActivityType', - _(u"Activity type")), - ('archaeological_context_records.models.DocumentationType', - _(u"Documentation type")), - ('archaeological_finds.models.MaterialType', _(u"Material")), - ('archaeological_finds.models.ConservatoryState', - _(u"Conservatory state")), - ('archaeological_warehouse.models.ContainerType', _(u"Container type")), - ('archaeological_finds.models.PreservationType', _(u"Preservation type")), - ('archaeological_finds.models.ObjectType', _(u"Object type")), - ('archaeological_finds.models.IntegrityType', _(u"Integrity type")), - ('archaeological_finds.models.RemarkabilityType', - _(u"Remarkability type")), - ('archaeological_finds.models.BatchType', _(u"Batch type")), - ('archaeological_context_records.models.IdentificationType', - _("Identification type")), - ('archaeological_context_records.models.RelationType', - _(u"Context record relation type")), - ('SpatialReferenceSystem', _(u"Spatial reference system")), - ('SupportType', _(u"Support type")), - ('TitleType', _(u"Title type")), -] - -TARGET_MODELS_KEYS = [tm[0] for tm in TARGET_MODELS] - -IMPORTER_TYPES = ( - ('IntegerFormater', _(u"Integer")), - ('FloatFormater', _(u"Float")), - ('UnicodeFormater', _(u"String")), - ('DateFormater', _(u"Date")), - ('TypeFormater', _(u"Type")), - ('YearFormater', _(u"Year")), - ('StrToBoolean', _(u"String to boolean")), - ('FileFormater', pgettext_lazy("filesystem", u"File")), - ('UnknowType', _(u"Unknow type")) -) - -IMPORTER_TYPES_DCT = { - 'IntegerFormater': IntegerFormater, - 'FloatFormater': FloatFormater, - 'UnicodeFormater': UnicodeFormater, - 'DateFormater': DateFormater, - 'TypeFormater': TypeFormater, - 'YearFormater': YearFormater, - 'StrToBoolean': StrToBoolean, - 'FileFormater': FileFormater, - 'UnknowType': None, -} - -DATE_FORMATS = ( - ('%Y', _(u"4 digit year. e.g.: \"2015\"")), - ('%Y/%m/%d', _(u"4 digit year/month/day. e.g.: \"2015/02/04\"")), - ('%d/%m/%Y', _(u"Day/month/4 digit year. e.g.: \"04/02/2015\"")), -) - -IMPORTER_TYPES_CHOICES = {'TypeFormater': TARGET_MODELS, - 'DateFormater': DATE_FORMATS} - - -class FormaterTypeManager(models.Manager): - def get_by_natural_key(self, formater_type, options, many_split): - return self.get(formater_type=formater_type, - options=options, many_split=many_split) - - -class FormaterType(models.Model): - formater_type = models.CharField(u"Formater type", max_length=20, - choices=IMPORTER_TYPES) - options = models.CharField(_(u"Options"), max_length=500, blank=True, - null=True) - many_split = models.CharField(_(u"Split character(s)"), max_length=10, - blank=True, null=True) - objects = FormaterTypeManager() - - class Meta: - verbose_name = _(u"Importer - Formater type") - verbose_name_plural = _(u"Importer - Formater types") - unique_together = ('formater_type', 'options', 'many_split') - ordering = ('formater_type', 'options') - - def natural_key(self): - return self.formater_type, self.options, self.many_split - - def __unicode__(self): - return u" - ".join( - [unicode(dict(IMPORTER_TYPES)[self.formater_type]) - if self.formater_type in IMPORTER_TYPES_DCT else ''] + - [getattr(self, k) for k in ('options', 'many_split') - if getattr(self, k)]) - - def get_choices(self): - if self.format_type in IMPORTER_TYPES_CHOICES: - return IMPORTER_TYPES_CHOICES[self.format_type] - - def get_formater_type(self, target, import_instance=None): - if self.formater_type not in IMPORTER_TYPES_DCT.keys(): - return - kwargs = {'db_target': target, 'import_instance': import_instance} - if self.many_split: - kwargs['many_split'] = self.many_split - if self.formater_type == 'TypeFormater': - if self.options not in TARGET_MODELS_KEYS: - logger.warning( - "**WARN FormaterType.get_formater_type**: {} " - "is not in TARGET_MODELS_KEYS".format(self.options)) - return - model = None - if self.options in dir(): - model = dir()[self.options] - else: - model = import_class(self.options) - return TypeFormater(model, **kwargs) - elif self.formater_type == 'UnicodeFormater': - if self.options: - try: - return UnicodeFormater(int(self.options.strip()), **kwargs) - except ValueError: - pass - return UnicodeFormater(**kwargs) - elif self.formater_type == 'DateFormater': - date_formats = self.options - if self.many_split: - date_formats = self.options.split(kwargs.pop('many_split')) - return DateFormater(date_formats, **kwargs) - elif self.formater_type == 'StrToBoolean': - return StrToBoolean(**kwargs) - elif self.formater_type == 'UnknowType': - return - else: - return IMPORTER_TYPES_DCT[self.formater_type](**kwargs) - -IMPORT_STATE = (("C", _(u"Created")), - ("AP", _(u"Analyse in progress")), - ("A", _(u"Analysed")), - ("P", _(u"Import pending")), - ("IP", _(u"Import in progress")), - ("FE", _(u"Finished with errors")), - ("F", _(u"Finished")), - ("AC", _(u"Archived")), - ) - -IMPORT_STATE_DCT = dict(IMPORT_STATE) -ENCODINGS = [(settings.ENCODING, settings.ENCODING), - (settings.ALT_ENCODING, settings.ALT_ENCODING), - ('utf-8', 'utf-8')] - - -class Import(models.Model): - user = models.ForeignKey('IshtarUser') - name = models.CharField(_(u"Name"), max_length=500, - blank=True, null=True) - importer_type = models.ForeignKey(ImporterType) - imported_file = models.FileField( - _(u"Imported file"), upload_to="upload/imports/", max_length=220) - imported_images = models.FileField( - _(u"Associated images (zip file)"), upload_to="upload/imports/", - blank=True, null=True, max_length=220) - encoding = models.CharField(_(u"Encoding"), choices=ENCODINGS, - default=u'utf-8', max_length=15) - skip_lines = models.IntegerField(_(u"Skip lines"), default=1) - error_file = models.FileField(_(u"Error file"), - upload_to="upload/imports/", - blank=True, null=True, max_length=255) - result_file = models.FileField(_(u"Result file"), - upload_to="upload/imports/", - blank=True, null=True, max_length=255) - match_file = models.FileField(_(u"Match file"), - upload_to="upload/imports/", - blank=True, null=True, max_length=255) - state = models.CharField(_(u"State"), max_length=2, choices=IMPORT_STATE, - default=u'C') - conservative_import = models.BooleanField( - _(u"Conservative import"), default=False, - help_text='If set to true, do not overload existing values') - creation_date = models.DateTimeField( - _(u"Creation date"), auto_now_add=True, blank=True, null=True) - end_date = models.DateTimeField(_(u"End date"), blank=True, - null=True, editable=False) - seconds_remaining = models.IntegerField( - _(u"Remaining seconds"), blank=True, null=True, editable=False) - - class Meta: - verbose_name = _(u"Import") - verbose_name_plural = _(u"Imports") - - def __unicode__(self): - return u"{} | {}".format(self.name or u"-", self.importer_type) - - def need_matching(self): - return bool(TargetKey.objects.filter(associated_import=self, - is_set=False).count()) - - @property - def errors(self): - if not self.error_file: - return [] - errors = [] - with open(self.error_file.path, 'rb') as csvfile: - reader = csv.DictReader(csvfile, fieldnames=['line', 'column', - 'error']) - reader.next() # pass the header - for row in reader: - errors.append(row) - return errors - - def get_actions(self): - """ - Get available action relevant with the current status - """ - actions = [] - if self.state == 'C': - actions.append(('A', _(u"Analyse"))) - if self.state == 'A': - actions.append(('A', _(u"Re-analyse"))) - actions.append(('I', _(u"Launch import"))) - if self.state in ('F', 'FE'): - actions.append(('A', _(u"Re-analyse"))) - actions.append(('I', _(u"Re-import"))) - actions.append(('AC', _(u"Archive"))) - if self.state == 'AC': - actions.append(('A', _(u"Unarchive"))) - actions.append(('D', _(u"Delete"))) - return actions - - @property - def imported_filename(self): - return self.imported_file.name.split(os.sep)[-1] - - @property - def status(self): - if self.state not in IMPORT_STATE_DCT: - return "" - return IMPORT_STATE_DCT[self.state] - - def get_importer_instance(self): - return self.importer_type.get_importer_class(import_instance=self)( - skip_lines=self.skip_lines, import_instance=self, - conservative_import=self.conservative_import) - - @property - def data_table(self): - imported_file = self.imported_file.path - tmpdir = None - if zipfile.is_zipfile(imported_file): - z = zipfile.ZipFile(imported_file) - filename = None - for name in z.namelist(): - # get first CSV file found - if name.endswith('.csv'): - filename = name - break - if not filename: - return [] - tmpdir = tempfile.mkdtemp(prefix='tmp-ishtar-') - imported_file = z.extract(filename, tmpdir) - - encodings = [self.encoding] - encodings += [coding for coding, c in ENCODINGS - if coding != self.encoding] - for encoding in encodings: - try: - with open(imported_file) as csv_file: - vals = [line - for line in unicodecsv.reader(csv_file, - encoding=encoding)] - if tmpdir: - shutil.rmtree(tmpdir) - return vals - except UnicodeDecodeError: - pass # try the next encoding - if tmpdir: - shutil.rmtree(tmpdir) - return [] - - def initialize(self): - self.state = 'AP' - self.save() - self.get_importer_instance().initialize(self.data_table, output='db') - self.state = 'A' - self.save() - - def importation(self): - self.state = 'IP' - self.save() - importer = self.get_importer_instance() - importer.importation(self.data_table) - # result file - filename = slugify(self.importer_type.name) - now = datetime.datetime.now().isoformat('-').replace(':', '') - result_file = filename + "_result_%s.csv" % now - result_file = os.sep.join([self.result_file.storage.location, - result_file]) - with open(result_file, 'w') as fle: - fle.write(importer.get_csv_result().encode('utf-8')) - self.result_file = File(open(fle.name)) - if importer.errors: - self.state = 'FE' - error_file = filename + "_errors_%s.csv" % now - error_file = os.sep.join([self.error_file.storage.location, - error_file]) - with open(error_file, 'w') as fle: - fle.write(importer.get_csv_errors().encode('utf-8')) - self.error_file = File(open(fle.name)) - else: - self.state = 'F' - self.error_file = None - if importer.match_table: - match_file = filename + "_match_%s.csv" % now - match_file = os.sep.join([self.match_file.storage.location, - match_file]) - with open(match_file, 'w') as fle: - fle.write(importer.get_csv_matches().encode('utf-8')) - self.match_file = File(open(fle.name)) - self.save() - - def archive(self): - self.state = 'AC' - self.save() - - def get_all_imported(self): - imported = [] - for related, zorg in \ - self._meta.get_all_related_m2m_objects_with_model(): - accessor = related.get_accessor_name() - imported += [(accessor, obj) - for obj in getattr(self, accessor).all()] - return imported - - -def pre_delete_import(sender, **kwargs): - # deleted imported items when an import is delete - instance = kwargs.get('instance') - if not instance: - return - to_delete = [] - for accessor, imported in instance.get_all_imported(): - to_delete.append(imported) - for item in to_delete: - item.delete() - - -pre_delete.connect(pre_delete_import, sender=Import) - class Organization(Address, Merge, OwnPerms, ValueGetter): TABLE_COLS = ('name', 'organization_type', 'town') diff --git a/ishtar_common/models_imports.py b/ishtar_common/models_imports.py new file mode 100644 index 000000000..dcb02c27e --- /dev/null +++ b/ishtar_common/models_imports.py @@ -0,0 +1,912 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright (C) 2017 Étienne Loks <etienne.loks_AT_peacefrogsDOTnet> + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +# See the file COPYING for details. + +import csv +import datetime +from importlib import import_module +import os +import logging +import shutil +import re +import tempfile +import unicodecsv +import zipfile + +from django.conf import settings +from django.contrib.gis.db import models +from django.core.exceptions import SuspiciousOperation +from django.core.files import File +from django.db.models.base import ModelBase +from django.db.models.signals import pre_delete +from django.template.defaultfilters import slugify +from django.utils.translation import ugettext_lazy as _, pgettext_lazy + +from ishtar_common.utils import create_slug +from ishtar_common.data_importer import Importer, ImportFormater, \ + IntegerFormater, FloatFormater, UnicodeFormater, DateFormater, \ + TypeFormater, YearFormater, StrToBoolean, FileFormater + +logger = logging.getLogger(__name__) + +IMPORTER_CLASSES = {} + +IMPORTER_CLASSES.update({ + 'sra-pdl-files': + 'archaeological_files.data_importer.FileImporterSraPdL'}) + + +def get_model_fields(model): + """ + Return a dict of fields from model + To be replace in Django 1.8 with get_fields, get_field + """ + fields = {} + options = model._meta + for field in sorted(options.fields + options.many_to_many): + fields[field.name] = field + if hasattr(model, 'get_extra_fields'): + fields.update(model.get_extra_fields()) + return fields + + +def import_class(full_path_classname): + """ + Return the model class from the full path + TODO: add a white list for more security + """ + mods = full_path_classname.split('.') + if len(mods) == 1: + mods = ['ishtar_common', 'models', mods[0]] + elif 'models' not in mods and 'models_finds' not in mods \ + and 'models_treatments' not in mods: + raise SuspiciousOperation( + u"Try to import a non model from a string") + module = import_module('.'.join(mods[:-1])) + return getattr(module, mods[-1]) + + +class ImportModelManager(models.Manager): + def get_by_natural_key(self, klass): + return self.get(klass=klass) + + +class ImporterModel(models.Model): + name = models.CharField(_(u"Name"), max_length=200) + klass = models.CharField(_(u"Class name"), max_length=200, unique=True) + objects = ImportModelManager() + + class Meta: + verbose_name = _(u"Importer - Model") + verbose_name_plural = _(u"Importer - Models") + ordering = ('name',) + + def __unicode__(self): + return self.name + + def natural_key(self): + return (self.klass, ) + + +class ImporterTypeManager(models.Manager): + def get_by_natural_key(self, slug): + return self.get(slug=slug) + + +class ImporterType(models.Model): + """ + Description of a table to be mapped with ishtar database + """ + name = models.CharField(_(u"Name"), blank=True, null=True, + max_length=100) + slug = models.SlugField(_(u"Slug"), unique=True, max_length=100, + blank=True, null=True) + description = models.CharField(_(u"Description"), blank=True, null=True, + max_length=500) + users = models.ManyToManyField('IshtarUser', verbose_name=_(u"Users"), + blank=True) + associated_models = models.ForeignKey( + ImporterModel, verbose_name=_(u"Associated model"), + related_name='+', blank=True, null=True) + created_models = models.ManyToManyField( + ImporterModel, verbose_name=_(u"Models that can accept new items"), + blank=True, help_text=_(u"Leave blank for no restrictions"), + related_name='+') + is_template = models.BooleanField(_(u"Is template"), default=False) + unicity_keys = models.CharField(_(u"Unicity keys (separator \";\")"), + blank=True, null=True, max_length=500) + objects = ImporterTypeManager() + + class Meta: + verbose_name = _(u"Importer - Type") + verbose_name_plural = _(u"Importer - Types") + ordering = ('name',) + + def natural_key(self): + return (self.slug, ) + + def __unicode__(self): + return self.name + + def get_importer_class(self, import_instance=None): + if self.slug and self.slug in IMPORTER_CLASSES: + cls = import_class(IMPORTER_CLASSES[self.slug]) + return cls + OBJECT_CLS = import_class(self.associated_models.klass) + DEFAULTS = dict([(default.keys, default.values) + for default in self.defaults.all()]) + LINE_FORMAT = [] + idx = 0 + for column in self.columns.order_by('col_number').all(): + idx += 1 + while column.col_number > idx: + LINE_FORMAT.append(None) + idx += 1 + targets = [] + formater_types = [] + nb = column.targets.count() + if not nb: + LINE_FORMAT.append(None) + continue + force_news = [] + concat_str = [] + for target in column.targets.all(): + ft = target.formater_type.get_formater_type( + target, import_instance=import_instance) + if not ft: + continue + formater_types.append(ft) + targets.append(target.target) + concat_str.append(target.concat_str) + force_news.append(target.force_new) + formater_kwargs = {} + if column.regexp_pre_filter: + formater_kwargs['regexp'] = re.compile( + column.regexp_pre_filter.regexp) + formater_kwargs['concat_str'] = concat_str + formater_kwargs['duplicate_fields'] = [ + (field.field_name, field.force_new, field.concat, + field.concat_str) + for field in column.duplicate_fields.all()] + formater_kwargs['label'] = column.label + formater_kwargs['required'] = column.required + formater_kwargs['force_new'] = force_news + if column.export_field_name: + formater_kwargs['export_field_name'] = [ + column.export_field_name] + formater = ImportFormater(targets, formater_types, + **formater_kwargs) + LINE_FORMAT.append(formater) + UNICITY_KEYS = [] + if self.unicity_keys: + UNICITY_KEYS = [un.strip() for un in self.unicity_keys.split(';')] + MODEL_CREATION_LIMIT = [] + for modls in self.created_models.all(): + MODEL_CREATION_LIMIT.append(import_class(modls.klass)) + args = {'OBJECT_CLS': OBJECT_CLS, 'DESC': self.description, + 'DEFAULTS': DEFAULTS, 'LINE_FORMAT': LINE_FORMAT, + 'UNICITY_KEYS': UNICITY_KEYS, + 'MODEL_CREATION_LIMIT': MODEL_CREATION_LIMIT} + name = str(''.join( + x for x in slugify(self.name).replace('-', ' ').title() + if not x.isspace())) + newclass = type(name, (Importer,), args) + return newclass + + def save(self, *args, **kwargs): + if not self.slug: + self.slug = create_slug(ImporterType, self.name) + return super(ImporterType, self).save(*args, **kwargs) + + +def get_associated_model(parent_model, keys): + model = None + if isinstance(parent_model, unicode) or \ + isinstance(parent_model, str): + OBJECT_CLS = import_class(parent_model) + else: + OBJECT_CLS = parent_model + for idx, item in enumerate(keys): + if not idx: + field = get_model_fields(OBJECT_CLS)[item] + if hasattr(field, 'rel') and hasattr(field.rel, 'to'): + model = field.rel.to + if type(field) == ModelBase: + model = field + else: + return get_associated_model(model, keys[1:]) + return model + + +class ImporterDefaultManager(models.Manager): + def get_by_natural_key(self, importer_type, target): + return self.get(importer_type__slug=importer_type, target=target) + + +class ImporterDefault(models.Model): + """ + Targets of default values in an import + """ + importer_type = models.ForeignKey(ImporterType, related_name='defaults') + target = models.CharField(u"Target", max_length=500) + + class Meta: + verbose_name = _(u"Importer - Default") + verbose_name_plural = _(u"Importer - Defaults") + unique_together = ('importer_type', 'target') + objects = ImporterDefaultManager() + + def __unicode__(self): + return u"{} - {}".format(self.importer_type, self.target) + + def natural_key(self): + return self.importer_type.slug, self.target + + @property + def keys(self): + return tuple(self.target.split('__')) + + @property + def associated_model(self): + return get_associated_model(self.importer_type.associated_models.klass, + self.keys) + + @property + def values(self): + values = {} + for default_value in self.default_values.all(): + values[default_value.target] = default_value.get_value() + return values + + +class ImporterDefaultValuesManager(models.Manager): + def get_by_natural_key(self, def_target_type, def_target, target): + return self.get(default_target__importer_type__slug=def_target_type, + default_target__target=def_target, + target=target) + + +class ImporterDefaultValues(models.Model): + """ + Default values in an import + """ + default_target = models.ForeignKey(ImporterDefault, + related_name='default_values') + target = models.CharField(u"Target", max_length=500) + value = models.CharField(u"Value", max_length=500) + objects = ImporterDefaultValuesManager() + + def __unicode__(self): + return u"{} - {}".format(self.default_target, self.target, self.value) + + class Meta: + verbose_name = _(u"Importer - Default value") + verbose_name_plural = _(u"Importer - Default values") + + def natural_key(self): + return (self.default_target.importer_type.slug, + self.default_target.target, + self.target) + + def get_value(self): + parent_model = self.default_target.associated_model + if not parent_model: + return self.value + fields = get_model_fields(parent_model) + target = self.target.strip() + if target not in fields: + return + field = fields[target] + if not hasattr(field, 'rel') or not hasattr(field.rel, 'to'): + return + model = field.rel.to + # if value is an id + try: + return model.objects.get(pk=int(self.value)) + except (ValueError, model.DoesNotExist): + pass + # try with txt_idx + try: + return model.objects.get(txt_idx=self.value) + except (ValueError, model.DoesNotExist): + pass + return "" + + +class ImporterColumnManager(models.Manager): + def get_by_natural_key(self, importer_type, col_number): + return self.get(importer_type__slug=importer_type, + col_number=col_number) + + +class ImporterColumn(models.Model): + """ + Import file column description + """ + label = models.CharField(_(u"Label"), blank=True, null=True, + max_length=200) + importer_type = models.ForeignKey(ImporterType, related_name='columns') + col_number = models.IntegerField(_(u"Column number"), default=1) + description = models.TextField(_("Description"), blank=True, null=True) + regexp_pre_filter = models.ForeignKey("Regexp", blank=True, null=True) + required = models.BooleanField(_(u"Required"), default=False) + export_field_name = models.CharField( + _(u"Export field name"), blank=True, null=True, max_length=200, + help_text=_(u"Fill this field if the field name is ambiguous for " + u"export. For instance: concatenated fields.") + ) + objects = ImporterColumnManager() + + class Meta: + verbose_name = _(u"Importer - Column") + verbose_name_plural = _(u"Importer - Columns") + ordering = ('importer_type', 'col_number') + unique_together = ('importer_type', 'col_number') + + def __unicode__(self): + return u"{} - {}".format(self.importer_type, self.col_number) + + def natural_key(self): + return self.importer_type.slug, self.col_number + + def targets_lbl(self): + return u', '.join([target.target for target in self.targets.all()]) + + def duplicate_fields_lbl(self): + return u', '.join([dp.field_name + for dp in self.duplicate_fields.all()]) + + +class ImporterDuplicateFieldManager(models.Manager): + def get_by_natural_key(self, importer_type, col_number, field_name): + return self.get(column__importer_type__slug=importer_type, + column__col_number=col_number, + field_name=field_name) + + +class ImporterDuplicateField(models.Model): + """ + Direct copy of result in other fields + """ + column = models.ForeignKey(ImporterColumn, related_name='duplicate_fields') + field_name = models.CharField(_(u"Field name"), blank=True, null=True, + max_length=200) + force_new = models.BooleanField(_(u"Force creation of new items"), + default=False) + concat = models.BooleanField(_(u"Concatenate with existing"), + default=False) + concat_str = models.CharField(_(u"Concatenate character"), max_length=5, + blank=True, null=True) + objects = ImporterDuplicateFieldManager() + + class Meta: + verbose_name = _(u"Importer - Duplicate field") + verbose_name_plural = _(u"Importer - Duplicate fields") + ordering = ('column', 'field_name') + + def natural_key(self): + return self.column.importer_type, self.column.col_number, \ + self.field_name + + +class NamedManager(models.Manager): + def get_by_natural_key(self, name): + return self.get(name=name) + + +class Regexp(models.Model): + name = models.CharField(_(u"Name"), max_length=100, unique=True) + description = models.CharField(_(u"Description"), blank=True, null=True, + max_length=500) + regexp = models.CharField(_(u"Regular expression"), max_length=500) + objects = NamedManager() + + class Meta: + verbose_name = _(u"Importer - Regular expression") + verbose_name_plural = _(u"Importer - Regular expressions") + + def __unicode__(self): + return self.name + + def natural_key(self): + return (self.name, ) + + +class ImportTargetManager(models.Manager): + def get_by_natural_key(self, importer_type, col_number, target): + return self.get(column__importer_type__slug=importer_type, + column__col_number=col_number, + target=target) + + +class ImportTarget(models.Model): + """ + Ishtar database target for a column + """ + column = models.ForeignKey(ImporterColumn, related_name='targets') + target = models.CharField(u"Target", max_length=500) + regexp_filter = models.ForeignKey("Regexp", blank=True, null=True) + formater_type = models.ForeignKey("FormaterType") + force_new = models.BooleanField(_(u"Force creation of new items"), + default=False) + concat = models.BooleanField(_(u"Concatenate with existing"), + default=False) + concat_str = models.CharField(_(u"Concatenate character"), max_length=5, + blank=True, null=True) + comment = models.TextField(_(u"Comment"), blank=True, null=True) + objects = ImportTargetManager() + + class Meta: + verbose_name = _(u"Importer - Target") + verbose_name_plural = _(u"Importer - Targets") + unique_together = ('column', 'target') + + def __unicode__(self): + return self.target[:50] if self.target else self.comment + + def natural_key(self): + return self.column.importer_type.slug, self.column.col_number, \ + self.target + + @property + def associated_model(self): + try: + return get_associated_model( + self.column.importer_type.associated_models.klass, + self.target.split('__')) + except KeyError: + return + + def get_choices(self): + if self.formater_type.formater_type == 'UnknowType' \ + and self.column.importer_type.slug: + cls = self.column.importer_type.get_importer_class() + formt = cls().line_format[self.column.col_number - 1] + if hasattr(formt.formater, 'choices'): + return [('', '--' * 8)] + list(formt.formater.choices) + return [('', '--' * 8)] + if self.formater_type.formater_type == 'StrToBoolean': + return [('', '--' * 8), + ('True', _(u"True")), + ('False', _(u"False"))] + if not self.associated_model or not hasattr(self.associated_model, + 'get_types'): + return [] + return self.associated_model.get_types() + + +class TargetKey(models.Model): + """ + User's link between import source and ishtar database. + Also temporary used for GeneralType to point missing link before adding + them in ItemKey table. + A targetkey connection can be create to be applied to on particular + import (associated_import), one particular user (associated_user) or to all + imports (associated_import and associated_user are empty). + """ + target = models.ForeignKey(ImportTarget, related_name='keys') + key = models.TextField(_(u"Key")) + value = models.TextField(_(u"Value"), blank=True, null=True) + is_set = models.BooleanField(_(u"Is set"), default=False) + associated_import = models.ForeignKey('Import', blank=True, null=True) + associated_user = models.ForeignKey('IshtarUser', blank=True, null=True) + + class Meta: + unique_together = ('target', 'key', 'associated_user', + 'associated_import') + verbose_name = _(u"Importer - Target key") + verbose_name_plural = _(u"Importer - Targets keys") + + def __unicode__(self): + return u" - ".join([unicode(self.target), self.key[:50]]) + + def column_nb(self): + # for the admin + return self.target.column.col_number + + def importer_type(self): + # for the admin + return self.target.column.importer_type.name + + def format(self): + if not self.is_set: + return None + if self.target.formater_type.formater_type == 'StrToBoolean': + if self.value in ('False', '0'): + return False + elif self.value: + return True + return + return self.value + + def save(self, *args, **kwargs): + obj = super(TargetKey, self).save(*args, **kwargs) + if not self.value: + return obj + associated_model = self.target.associated_model + if associated_model and hasattr(self.target.associated_model, + "add_key"): + v = None + # pk is given + try: + v = self.target.associated_model.objects.get( + pk=unicode(int(self.value))) + except (ValueError, self.target.associated_model.DoesNotExist): + # try with txt_idx + try: + v = self.target.associated_model.objects.get( + txt_idx=unicode(self.value)) + except self.target.associated_model.DoesNotExist: + pass + if v: + v.add_key(self.key, importer=self.associated_import) + return obj + +TARGET_MODELS = [ + ('OrganizationType', _(u"Organization type")), + ('TitleType', _(u"Title")), + ('SourceType', _(u"Source type")), + ('AuthorType', _(u"Author type")), + ('Format', _(u"Format")), + ('archaeological_operations.models.OperationType', _(u"Operation type")), + ('archaeological_operations.models.Period', _(u"Period")), + ('archaeological_operations.models.ReportState', _(u"Report state")), + ('archaeological_operations.models.RemainType', _(u"Remain type")), + ('archaeological_context_records.models.Unit', _(u"Unit")), + ('archaeological_context_records.models.ActivityType', + _(u"Activity type")), + ('archaeological_context_records.models.DocumentationType', + _(u"Documentation type")), + ('archaeological_finds.models.MaterialType', _(u"Material")), + ('archaeological_finds.models.ConservatoryState', + _(u"Conservatory state")), + ('archaeological_warehouse.models.ContainerType', _(u"Container type")), + ('archaeological_finds.models.PreservationType', _(u"Preservation type")), + ('archaeological_finds.models.ObjectType', _(u"Object type")), + ('archaeological_finds.models.IntegrityType', _(u"Integrity type")), + ('archaeological_finds.models.RemarkabilityType', + _(u"Remarkability type")), + ('archaeological_finds.models.BatchType', _(u"Batch type")), + ('archaeological_context_records.models.IdentificationType', + _("Identification type")), + ('archaeological_context_records.models.RelationType', + _(u"Context record relation type")), + ('SpatialReferenceSystem', _(u"Spatial reference system")), + ('SupportType', _(u"Support type")), + ('TitleType', _(u"Title type")), +] + +TARGET_MODELS_KEYS = [tm[0] for tm in TARGET_MODELS] + +IMPORTER_TYPES = ( + ('IntegerFormater', _(u"Integer")), + ('FloatFormater', _(u"Float")), + ('UnicodeFormater', _(u"String")), + ('DateFormater', _(u"Date")), + ('TypeFormater', _(u"Type")), + ('YearFormater', _(u"Year")), + ('StrToBoolean', _(u"String to boolean")), + ('FileFormater', pgettext_lazy("filesystem", u"File")), + ('UnknowType', _(u"Unknow type")) +) + +IMPORTER_TYPES_DCT = { + 'IntegerFormater': IntegerFormater, + 'FloatFormater': FloatFormater, + 'UnicodeFormater': UnicodeFormater, + 'DateFormater': DateFormater, + 'TypeFormater': TypeFormater, + 'YearFormater': YearFormater, + 'StrToBoolean': StrToBoolean, + 'FileFormater': FileFormater, + 'UnknowType': None, +} + +DATE_FORMATS = ( + ('%Y', _(u"4 digit year. e.g.: \"2015\"")), + ('%Y/%m/%d', _(u"4 digit year/month/day. e.g.: \"2015/02/04\"")), + ('%d/%m/%Y', _(u"Day/month/4 digit year. e.g.: \"04/02/2015\"")), +) + +IMPORTER_TYPES_CHOICES = {'TypeFormater': TARGET_MODELS, + 'DateFormater': DATE_FORMATS} + + +class FormaterTypeManager(models.Manager): + def get_by_natural_key(self, formater_type, options, many_split): + return self.get(formater_type=formater_type, + options=options, many_split=many_split) + + +class FormaterType(models.Model): + formater_type = models.CharField(u"Formater type", max_length=20, + choices=IMPORTER_TYPES) + options = models.CharField(_(u"Options"), max_length=500, blank=True, + null=True) + many_split = models.CharField(_(u"Split character(s)"), max_length=10, + blank=True, null=True) + objects = FormaterTypeManager() + + class Meta: + verbose_name = _(u"Importer - Formater type") + verbose_name_plural = _(u"Importer - Formater types") + unique_together = ('formater_type', 'options', 'many_split') + ordering = ('formater_type', 'options') + + def natural_key(self): + return self.formater_type, self.options, self.many_split + + def __unicode__(self): + return u" - ".join( + [unicode(dict(IMPORTER_TYPES)[self.formater_type]) + if self.formater_type in IMPORTER_TYPES_DCT else ''] + + [getattr(self, k) for k in ('options', 'many_split') + if getattr(self, k)]) + + def get_choices(self): + if self.format_type in IMPORTER_TYPES_CHOICES: + return IMPORTER_TYPES_CHOICES[self.format_type] + + def get_formater_type(self, target, import_instance=None): + if self.formater_type not in IMPORTER_TYPES_DCT.keys(): + return + kwargs = {'db_target': target, 'import_instance': import_instance} + if self.many_split: + kwargs['many_split'] = self.many_split + if self.formater_type == 'TypeFormater': + if self.options not in TARGET_MODELS_KEYS: + logger.warning( + "**WARN FormaterType.get_formater_type**: {} " + "is not in TARGET_MODELS_KEYS".format(self.options)) + return + model = None + if self.options in dir(): + model = dir()[self.options] + else: + model = import_class(self.options) + return TypeFormater(model, **kwargs) + elif self.formater_type == 'UnicodeFormater': + if self.options: + try: + return UnicodeFormater(int(self.options.strip()), **kwargs) + except ValueError: + pass + return UnicodeFormater(**kwargs) + elif self.formater_type == 'DateFormater': + date_formats = self.options + if self.many_split: + date_formats = self.options.split(kwargs.pop('many_split')) + return DateFormater(date_formats, **kwargs) + elif self.formater_type == 'StrToBoolean': + return StrToBoolean(**kwargs) + elif self.formater_type == 'UnknowType': + return + else: + return IMPORTER_TYPES_DCT[self.formater_type](**kwargs) + +IMPORT_STATE = (("C", _(u"Created")), + ("AP", _(u"Analyse in progress")), + ("A", _(u"Analysed")), + ("P", _(u"Import pending")), + ("IP", _(u"Import in progress")), + ("FE", _(u"Finished with errors")), + ("F", _(u"Finished")), + ("AC", _(u"Archived")), + ) + +IMPORT_STATE_DCT = dict(IMPORT_STATE) +ENCODINGS = [(settings.ENCODING, settings.ENCODING), + (settings.ALT_ENCODING, settings.ALT_ENCODING), + ('utf-8', 'utf-8')] + + +class Import(models.Model): + user = models.ForeignKey('IshtarUser') + name = models.CharField(_(u"Name"), max_length=500, + blank=True, null=True) + importer_type = models.ForeignKey(ImporterType) + imported_file = models.FileField( + _(u"Imported file"), upload_to="upload/imports/", max_length=220) + imported_images = models.FileField( + _(u"Associated images (zip file)"), upload_to="upload/imports/", + blank=True, null=True, max_length=220) + encoding = models.CharField(_(u"Encoding"), choices=ENCODINGS, + default=u'utf-8', max_length=15) + skip_lines = models.IntegerField(_(u"Skip lines"), default=1) + error_file = models.FileField(_(u"Error file"), + upload_to="upload/imports/", + blank=True, null=True, max_length=255) + result_file = models.FileField(_(u"Result file"), + upload_to="upload/imports/", + blank=True, null=True, max_length=255) + match_file = models.FileField(_(u"Match file"), + upload_to="upload/imports/", + blank=True, null=True, max_length=255) + state = models.CharField(_(u"State"), max_length=2, choices=IMPORT_STATE, + default=u'C') + conservative_import = models.BooleanField( + _(u"Conservative import"), default=False, + help_text='If set to true, do not overload existing values') + creation_date = models.DateTimeField( + _(u"Creation date"), auto_now_add=True, blank=True, null=True) + end_date = models.DateTimeField(_(u"End date"), blank=True, + null=True, editable=False) + seconds_remaining = models.IntegerField( + _(u"Remaining seconds"), blank=True, null=True, editable=False) + + class Meta: + verbose_name = _(u"Import") + verbose_name_plural = _(u"Imports") + + def __unicode__(self): + return u"{} | {}".format(self.name or u"-", self.importer_type) + + def need_matching(self): + return bool(TargetKey.objects.filter(associated_import=self, + is_set=False).count()) + + @property + def errors(self): + if not self.error_file: + return [] + errors = [] + with open(self.error_file.path, 'rb') as csvfile: + reader = csv.DictReader(csvfile, fieldnames=['line', 'column', + 'error']) + reader.next() # pass the header + for row in reader: + errors.append(row) + return errors + + def get_actions(self): + """ + Get available action relevant with the current status + """ + actions = [] + if self.state == 'C': + actions.append(('A', _(u"Analyse"))) + if self.state == 'A': + actions.append(('A', _(u"Re-analyse"))) + actions.append(('I', _(u"Launch import"))) + if self.state in ('F', 'FE'): + actions.append(('A', _(u"Re-analyse"))) + actions.append(('I', _(u"Re-import"))) + actions.append(('AC', _(u"Archive"))) + if self.state == 'AC': + actions.append(('A', _(u"Unarchive"))) + actions.append(('D', _(u"Delete"))) + return actions + + @property + def imported_filename(self): + return self.imported_file.name.split(os.sep)[-1] + + @property + def status(self): + if self.state not in IMPORT_STATE_DCT: + return "" + return IMPORT_STATE_DCT[self.state] + + def get_importer_instance(self): + return self.importer_type.get_importer_class(import_instance=self)( + skip_lines=self.skip_lines, import_instance=self, + conservative_import=self.conservative_import) + + @property + def data_table(self): + imported_file = self.imported_file.path + tmpdir = None + if zipfile.is_zipfile(imported_file): + z = zipfile.ZipFile(imported_file) + filename = None + for name in z.namelist(): + # get first CSV file found + if name.endswith('.csv'): + filename = name + break + if not filename: + return [] + tmpdir = tempfile.mkdtemp(prefix='tmp-ishtar-') + imported_file = z.extract(filename, tmpdir) + + encodings = [self.encoding] + encodings += [coding for coding, c in ENCODINGS + if coding != self.encoding] + for encoding in encodings: + try: + with open(imported_file) as csv_file: + vals = [line + for line in unicodecsv.reader(csv_file, + encoding=encoding)] + if tmpdir: + shutil.rmtree(tmpdir) + return vals + except UnicodeDecodeError: + pass # try the next encoding + if tmpdir: + shutil.rmtree(tmpdir) + return [] + + def initialize(self): + self.state = 'AP' + self.save() + self.get_importer_instance().initialize(self.data_table, output='db') + self.state = 'A' + self.save() + + def importation(self): + self.state = 'IP' + self.save() + importer = self.get_importer_instance() + importer.importation(self.data_table) + # result file + filename = slugify(self.importer_type.name) + now = datetime.datetime.now().isoformat('-').replace(':', '') + result_file = filename + "_result_%s.csv" % now + result_file = os.sep.join([self.result_file.storage.location, + result_file]) + with open(result_file, 'w') as fle: + fle.write(importer.get_csv_result().encode('utf-8')) + self.result_file = File(open(fle.name)) + if importer.errors: + self.state = 'FE' + error_file = filename + "_errors_%s.csv" % now + error_file = os.sep.join([self.error_file.storage.location, + error_file]) + with open(error_file, 'w') as fle: + fle.write(importer.get_csv_errors().encode('utf-8')) + self.error_file = File(open(fle.name)) + else: + self.state = 'F' + self.error_file = None + if importer.match_table: + match_file = filename + "_match_%s.csv" % now + match_file = os.sep.join([self.match_file.storage.location, + match_file]) + with open(match_file, 'w') as fle: + fle.write(importer.get_csv_matches().encode('utf-8')) + self.match_file = File(open(fle.name)) + self.save() + + def archive(self): + self.state = 'AC' + self.save() + + def get_all_imported(self): + imported = [] + for related, zorg in \ + self._meta.get_all_related_m2m_objects_with_model(): + accessor = related.get_accessor_name() + imported += [(accessor, obj) + for obj in getattr(self, accessor).all()] + return imported + + +def pre_delete_import(sender, **kwargs): + # deleted imported items when an import is delete + instance = kwargs.get('instance') + if not instance: + return + to_delete = [] + for accessor, imported in instance.get_all_imported(): + to_delete.append(imported) + for item in to_delete: + item.delete() + + +pre_delete.connect(pre_delete_import, sender=Import) |