diff options
author | Étienne Loks <etienne.loks@iggdrasil.net> | 2017-08-21 18:23:29 +0200 |
---|---|---|
committer | Étienne Loks <etienne.loks@iggdrasil.net> | 2017-08-21 18:23:29 +0200 |
commit | 1848a2fcab0d22056a2979459093a54c71f4060d (patch) | |
tree | 0d2270c4b34eab262d9ab1f3936c5fe4f8942289 /ishtar_common/models.py | |
parent | 644a56222e5c937feb1c23eef95a4906840960bb (diff) | |
download | Ishtar-1848a2fcab0d22056a2979459093a54c71f4060d.tar.bz2 Ishtar-1848a2fcab0d22056a2979459093a54c71f4060d.zip |
Models: refactoring move all import models to a specific file
Diffstat (limited to 'ishtar_common/models.py')
-rw-r--r-- | ishtar_common/models.py | 898 |
1 files changed, 21 insertions, 877 deletions
diff --git a/ishtar_common/models.py b/ishtar_common/models.py index dd8204f21..725c3ac83 100644 --- a/ishtar_common/models.py +++ b/ishtar_common/models.py @@ -70,6 +70,17 @@ from ishtar_common.data_importer import Importer, ImportFormater, \ IntegerFormater, FloatFormater, UnicodeFormater, DateFormater, \ TypeFormater, YearFormater, StrToBoolean, FileFormater +from ishtar_common.models_imports import ImporterModel, ImporterType, \ + ImporterDefault, ImporterDefaultValues, ImporterColumn, \ + ImporterDuplicateField, Regexp, ImportTarget, TargetKey, FormaterType, \ + Import + +__all__ = [ + 'ImporterModel', 'ImporterType', 'ImporterDefault', 'ImporterDefaultValues', + 'ImporterColumn', 'ImporterDuplicateField', 'Regexp', 'ImportTarget', + 'TargetKey', 'FormaterType', 'Import' +] + logger = logging.getLogger(__name__) @@ -123,15 +134,6 @@ def check_model_access_control(request, model, available_perms=None): return allowed, own -class Imported(models.Model): - imports = models.ManyToManyField( - 'Import', blank=True, - related_name="imported_%(app_label)s_%(class)s") - - class Meta: - abstract = True - - class ValueGetter(object): _prefix = "" GET_VALUES_EXTRA = [] @@ -798,7 +800,7 @@ class ItemKey(models.Model): object_id = models.PositiveIntegerField() content_object = GenericForeignKey('content_type', 'object_id') importer = models.ForeignKey( - 'Import', null=True, blank=True, + Import, null=True, blank=True, help_text=_(u"Specific key to an import")) def __unicode__(self): @@ -913,6 +915,15 @@ class BulkUpdatedItem(object): return transaction_id, False +class Imported(models.Model): + imports = models.ManyToManyField( + Import, blank=True, + related_name="imported_%(app_label)s_%(class)s") + + class Meta: + abstract = True + + class BaseHistorizedItem(Imported): IS_BASKET = False history_modifier = models.ForeignKey( @@ -1880,873 +1891,6 @@ post_delete.connect(post_save_cache, sender=OrganizationType) organization_type_pk_lazy = lazy(OrganizationType.get_or_create_pk, unicode) organization_type_pks_lazy = lazy(OrganizationType.get_or_create_pks, unicode) -IMPORTER_CLASSES = {} - -IMPORTER_CLASSES.update({ - 'sra-pdl-files': - 'archaeological_files.data_importer.FileImporterSraPdL'}) - - -def get_model_fields(model): - """ - Return a dict of fields from model - To be replace in Django 1.8 with get_fields, get_field - """ - fields = {} - options = model._meta - for field in sorted(options.fields + options.many_to_many): - fields[field.name] = field - if hasattr(model, 'get_extra_fields'): - fields.update(model.get_extra_fields()) - return fields - - -def import_class(full_path_classname): - """ - Return the model class from the full path - TODO: add a white list for more security - """ - mods = full_path_classname.split('.') - if len(mods) == 1: - mods = ['ishtar_common', 'models', mods[0]] - elif 'models' not in mods and 'models_finds' not in mods \ - and 'models_treatments' not in mods: - raise SuspiciousOperation( - u"Try to import a non model from a string") - module = import_module('.'.join(mods[:-1])) - return getattr(module, mods[-1]) - - -class ImportModelManager(models.Manager): - def get_by_natural_key(self, klass): - return self.get(klass=klass) - - -class ImporterModel(models.Model): - name = models.CharField(_(u"Name"), max_length=200) - klass = models.CharField(_(u"Class name"), max_length=200, unique=True) - objects = ImportModelManager() - - class Meta: - verbose_name = _(u"Importer - Model") - verbose_name_plural = _(u"Importer - Models") - ordering = ('name',) - - def __unicode__(self): - return self.name - - def natural_key(self): - return (self.klass, ) - - -class ImporterTypeManager(models.Manager): - def get_by_natural_key(self, slug): - return self.get(slug=slug) - - -class ImporterType(models.Model): - """ - Description of a table to be mapped with ishtar database - """ - name = models.CharField(_(u"Name"), blank=True, null=True, - max_length=100) - slug = models.SlugField(_(u"Slug"), unique=True, max_length=100, - blank=True, null=True) - description = models.CharField(_(u"Description"), blank=True, null=True, - max_length=500) - users = models.ManyToManyField('IshtarUser', verbose_name=_(u"Users"), - blank=True) - associated_models = models.ForeignKey( - ImporterModel, verbose_name=_(u"Associated model"), - related_name='+', blank=True, null=True) - created_models = models.ManyToManyField( - ImporterModel, verbose_name=_(u"Models that can accept new items"), - blank=True, help_text=_(u"Leave blank for no restrictions"), - related_name='+') - is_template = models.BooleanField(_(u"Is template"), default=False) - unicity_keys = models.CharField(_(u"Unicity keys (separator \";\")"), - blank=True, null=True, max_length=500) - objects = ImporterTypeManager() - - class Meta: - verbose_name = _(u"Importer - Type") - verbose_name_plural = _(u"Importer - Types") - ordering = ('name',) - - def natural_key(self): - return (self.slug, ) - - def __unicode__(self): - return self.name - - def get_importer_class(self, import_instance=None): - if self.slug and self.slug in IMPORTER_CLASSES: - cls = import_class(IMPORTER_CLASSES[self.slug]) - return cls - OBJECT_CLS = import_class(self.associated_models.klass) - DEFAULTS = dict([(default.keys, default.values) - for default in self.defaults.all()]) - LINE_FORMAT = [] - idx = 0 - for column in self.columns.order_by('col_number').all(): - idx += 1 - while column.col_number > idx: - LINE_FORMAT.append(None) - idx += 1 - targets = [] - formater_types = [] - nb = column.targets.count() - if not nb: - LINE_FORMAT.append(None) - continue - force_news = [] - concat_str = [] - for target in column.targets.all(): - ft = target.formater_type.get_formater_type( - target, import_instance=import_instance) - if not ft: - continue - formater_types.append(ft) - targets.append(target.target) - concat_str.append(target.concat_str) - force_news.append(target.force_new) - formater_kwargs = {} - if column.regexp_pre_filter: - formater_kwargs['regexp'] = re.compile( - column.regexp_pre_filter.regexp) - formater_kwargs['concat_str'] = concat_str - formater_kwargs['duplicate_fields'] = [ - (field.field_name, field.force_new, field.concat, - field.concat_str) - for field in column.duplicate_fields.all()] - formater_kwargs['label'] = column.label - formater_kwargs['required'] = column.required - formater_kwargs['force_new'] = force_news - if column.export_field_name: - formater_kwargs['export_field_name'] = [ - column.export_field_name] - formater = ImportFormater(targets, formater_types, - **formater_kwargs) - LINE_FORMAT.append(formater) - UNICITY_KEYS = [] - if self.unicity_keys: - UNICITY_KEYS = [un.strip() for un in self.unicity_keys.split(';')] - MODEL_CREATION_LIMIT = [] - for modls in self.created_models.all(): - MODEL_CREATION_LIMIT.append(import_class(modls.klass)) - args = {'OBJECT_CLS': OBJECT_CLS, 'DESC': self.description, - 'DEFAULTS': DEFAULTS, 'LINE_FORMAT': LINE_FORMAT, - 'UNICITY_KEYS': UNICITY_KEYS, - 'MODEL_CREATION_LIMIT': MODEL_CREATION_LIMIT} - name = str(''.join( - x for x in slugify(self.name).replace('-', ' ').title() - if not x.isspace())) - newclass = type(name, (Importer,), args) - return newclass - - def save(self, *args, **kwargs): - if not self.slug: - self.slug = create_slug(ImporterType, self.name) - return super(ImporterType, self).save(*args, **kwargs) - - -def get_associated_model(parent_model, keys): - model = None - if isinstance(parent_model, unicode) or \ - isinstance(parent_model, str): - OBJECT_CLS = import_class(parent_model) - else: - OBJECT_CLS = parent_model - for idx, item in enumerate(keys): - if not idx: - field = get_model_fields(OBJECT_CLS)[item] - if hasattr(field, 'rel') and hasattr(field.rel, 'to'): - model = field.rel.to - if type(field) == ModelBase: - model = field - else: - return get_associated_model(model, keys[1:]) - return model - - -class ImporterDefaultManager(models.Manager): - def get_by_natural_key(self, importer_type, target): - return self.get(importer_type__slug=importer_type, target=target) - - -class ImporterDefault(models.Model): - """ - Targets of default values in an import - """ - importer_type = models.ForeignKey(ImporterType, related_name='defaults') - target = models.CharField(u"Target", max_length=500) - - class Meta: - verbose_name = _(u"Importer - Default") - verbose_name_plural = _(u"Importer - Defaults") - unique_together = ('importer_type', 'target') - objects = ImporterDefaultManager() - - def __unicode__(self): - return u"{} - {}".format(self.importer_type, self.target) - - def natural_key(self): - return self.importer_type.slug, self.target - - @property - def keys(self): - return tuple(self.target.split('__')) - - @property - def associated_model(self): - return get_associated_model(self.importer_type.associated_models.klass, - self.keys) - - @property - def values(self): - values = {} - for default_value in self.default_values.all(): - values[default_value.target] = default_value.get_value() - return values - - -class ImporterDefaultValuesManager(models.Manager): - def get_by_natural_key(self, def_target_type, def_target, target): - return self.get(default_target__importer_type__slug=def_target_type, - default_target__target=def_target, - target=target) - - -class ImporterDefaultValues(models.Model): - """ - Default values in an import - """ - default_target = models.ForeignKey(ImporterDefault, - related_name='default_values') - target = models.CharField(u"Target", max_length=500) - value = models.CharField(u"Value", max_length=500) - objects = ImporterDefaultValuesManager() - - def __unicode__(self): - return u"{} - {}".format(self.default_target, self.target, self.value) - - class Meta: - verbose_name = _(u"Importer - Default value") - verbose_name_plural = _(u"Importer - Default values") - - def natural_key(self): - return (self.default_target.importer_type.slug, - self.default_target.target, - self.target) - - def get_value(self): - parent_model = self.default_target.associated_model - if not parent_model: - return self.value - fields = get_model_fields(parent_model) - target = self.target.strip() - if target not in fields: - return - field = fields[target] - if not hasattr(field, 'rel') or not hasattr(field.rel, 'to'): - return - model = field.rel.to - # if value is an id - try: - return model.objects.get(pk=int(self.value)) - except (ValueError, model.DoesNotExist): - pass - # try with txt_idx - try: - return model.objects.get(txt_idx=self.value) - except (ValueError, model.DoesNotExist): - pass - return "" - - -class ImporterColumnManager(models.Manager): - def get_by_natural_key(self, importer_type, col_number): - return self.get(importer_type__slug=importer_type, - col_number=col_number) - - -class ImporterColumn(models.Model): - """ - Import file column description - """ - label = models.CharField(_(u"Label"), blank=True, null=True, - max_length=200) - importer_type = models.ForeignKey(ImporterType, related_name='columns') - col_number = models.IntegerField(_(u"Column number"), default=1) - description = models.TextField(_("Description"), blank=True, null=True) - regexp_pre_filter = models.ForeignKey("Regexp", blank=True, null=True) - required = models.BooleanField(_(u"Required"), default=False) - export_field_name = models.CharField( - _(u"Export field name"), blank=True, null=True, max_length=200, - help_text=_(u"Fill this field if the field name is ambiguous for " - u"export. For instance: concatenated fields.") - ) - objects = ImporterColumnManager() - - class Meta: - verbose_name = _(u"Importer - Column") - verbose_name_plural = _(u"Importer - Columns") - ordering = ('importer_type', 'col_number') - unique_together = ('importer_type', 'col_number') - - def __unicode__(self): - return u"{} - {}".format(self.importer_type, self.col_number) - - def natural_key(self): - return self.importer_type.slug, self.col_number - - def targets_lbl(self): - return u', '.join([target.target for target in self.targets.all()]) - - def duplicate_fields_lbl(self): - return u', '.join([dp.field_name - for dp in self.duplicate_fields.all()]) - - -class ImporterDuplicateFieldManager(models.Manager): - def get_by_natural_key(self, importer_type, col_number, field_name): - return self.get(column__importer_type__slug=importer_type, - column__col_number=col_number, - field_name=field_name) - - -class ImporterDuplicateField(models.Model): - """ - Direct copy of result in other fields - """ - column = models.ForeignKey(ImporterColumn, related_name='duplicate_fields') - field_name = models.CharField(_(u"Field name"), blank=True, null=True, - max_length=200) - force_new = models.BooleanField(_(u"Force creation of new items"), - default=False) - concat = models.BooleanField(_(u"Concatenate with existing"), - default=False) - concat_str = models.CharField(_(u"Concatenate character"), max_length=5, - blank=True, null=True) - objects = ImporterDuplicateFieldManager() - - class Meta: - verbose_name = _(u"Importer - Duplicate field") - verbose_name_plural = _(u"Importer - Duplicate fields") - ordering = ('column', 'field_name') - - def natural_key(self): - return self.column.importer_type, self.column.col_number, \ - self.field_name - - -class NamedManager(models.Manager): - def get_by_natural_key(self, name): - return self.get(name=name) - - -class Regexp(models.Model): - name = models.CharField(_(u"Name"), max_length=100, unique=True) - description = models.CharField(_(u"Description"), blank=True, null=True, - max_length=500) - regexp = models.CharField(_(u"Regular expression"), max_length=500) - objects = NamedManager() - - class Meta: - verbose_name = _(u"Importer - Regular expression") - verbose_name_plural = _(u"Importer - Regular expressions") - - def __unicode__(self): - return self.name - - def natural_key(self): - return (self.name, ) - - -class ImportTargetManager(models.Manager): - def get_by_natural_key(self, importer_type, col_number, target): - return self.get(column__importer_type__slug=importer_type, - column__col_number=col_number, - target=target) - - -class ImportTarget(models.Model): - """ - Ishtar database target for a column - """ - column = models.ForeignKey(ImporterColumn, related_name='targets') - target = models.CharField(u"Target", max_length=500) - regexp_filter = models.ForeignKey("Regexp", blank=True, null=True) - formater_type = models.ForeignKey("FormaterType") - force_new = models.BooleanField(_(u"Force creation of new items"), - default=False) - concat = models.BooleanField(_(u"Concatenate with existing"), - default=False) - concat_str = models.CharField(_(u"Concatenate character"), max_length=5, - blank=True, null=True) - comment = models.TextField(_(u"Comment"), blank=True, null=True) - objects = ImportTargetManager() - - class Meta: - verbose_name = _(u"Importer - Target") - verbose_name_plural = _(u"Importer - Targets") - unique_together = ('column', 'target') - - def __unicode__(self): - return self.target[:50] if self.target else self.comment - - def natural_key(self): - return self.column.importer_type.slug, self.column.col_number, \ - self.target - - @property - def associated_model(self): - try: - return get_associated_model( - self.column.importer_type.associated_models.klass, - self.target.split('__')) - except KeyError: - return - - def get_choices(self): - if self.formater_type.formater_type == 'UnknowType' \ - and self.column.importer_type.slug: - cls = self.column.importer_type.get_importer_class() - formt = cls().line_format[self.column.col_number - 1] - if hasattr(formt.formater, 'choices'): - return [('', '--' * 8)] + list(formt.formater.choices) - return [('', '--' * 8)] - if self.formater_type.formater_type == 'StrToBoolean': - return [('', '--' * 8), - ('True', _(u"True")), - ('False', _(u"False"))] - if not self.associated_model or not hasattr(self.associated_model, - 'get_types'): - return [] - return self.associated_model.get_types() - - -class TargetKey(models.Model): - """ - User's link between import source and ishtar database. - Also temporary used for GeneralType to point missing link before adding - them in ItemKey table. - A targetkey connection can be create to be applied to on particular - import (associated_import), one particular user (associated_user) or to all - imports (associated_import and associated_user are empty). - """ - target = models.ForeignKey(ImportTarget, related_name='keys') - key = models.TextField(_(u"Key")) - value = models.TextField(_(u"Value"), blank=True, null=True) - is_set = models.BooleanField(_(u"Is set"), default=False) - associated_import = models.ForeignKey('Import', blank=True, null=True) - associated_user = models.ForeignKey('IshtarUser', blank=True, null=True) - - class Meta: - unique_together = ('target', 'key', 'associated_user', - 'associated_import') - verbose_name = _(u"Importer - Target key") - verbose_name_plural = _(u"Importer - Targets keys") - - def __unicode__(self): - return u" - ".join([unicode(self.target), self.key[:50]]) - - def column_nb(self): - # for the admin - return self.target.column.col_number - - def importer_type(self): - # for the admin - return self.target.column.importer_type.name - - def format(self): - if not self.is_set: - return None - if self.target.formater_type.formater_type == 'StrToBoolean': - if self.value in ('False', '0'): - return False - elif self.value: - return True - return - return self.value - - def save(self, *args, **kwargs): - obj = super(TargetKey, self).save(*args, **kwargs) - if not self.value: - return obj - associated_model = self.target.associated_model - if associated_model and hasattr(self.target.associated_model, - "add_key"): - v = None - # pk is given - try: - v = self.target.associated_model.objects.get( - pk=unicode(int(self.value))) - except (ValueError, self.target.associated_model.DoesNotExist): - # try with txt_idx - try: - v = self.target.associated_model.objects.get( - txt_idx=unicode(self.value)) - except self.target.associated_model.DoesNotExist: - pass - if v: - v.add_key(self.key, importer=self.associated_import) - return obj - -TARGET_MODELS = [ - ('OrganizationType', _(u"Organization type")), - ('TitleType', _(u"Title")), - ('SourceType', _(u"Source type")), - ('AuthorType', _(u"Author type")), - ('Format', _(u"Format")), - ('archaeological_operations.models.OperationType', _(u"Operation type")), - ('archaeological_operations.models.Period', _(u"Period")), - ('archaeological_operations.models.ReportState', _(u"Report state")), - ('archaeological_operations.models.RemainType', _(u"Remain type")), - ('archaeological_context_records.models.Unit', _(u"Unit")), - ('archaeological_context_records.models.ActivityType', - _(u"Activity type")), - ('archaeological_context_records.models.DocumentationType', - _(u"Documentation type")), - ('archaeological_finds.models.MaterialType', _(u"Material")), - ('archaeological_finds.models.ConservatoryState', - _(u"Conservatory state")), - ('archaeological_warehouse.models.ContainerType', _(u"Container type")), - ('archaeological_finds.models.PreservationType', _(u"Preservation type")), - ('archaeological_finds.models.ObjectType', _(u"Object type")), - ('archaeological_finds.models.IntegrityType', _(u"Integrity type")), - ('archaeological_finds.models.RemarkabilityType', - _(u"Remarkability type")), - ('archaeological_finds.models.BatchType', _(u"Batch type")), - ('archaeological_context_records.models.IdentificationType', - _("Identification type")), - ('archaeological_context_records.models.RelationType', - _(u"Context record relation type")), - ('SpatialReferenceSystem', _(u"Spatial reference system")), - ('SupportType', _(u"Support type")), - ('TitleType', _(u"Title type")), -] - -TARGET_MODELS_KEYS = [tm[0] for tm in TARGET_MODELS] - -IMPORTER_TYPES = ( - ('IntegerFormater', _(u"Integer")), - ('FloatFormater', _(u"Float")), - ('UnicodeFormater', _(u"String")), - ('DateFormater', _(u"Date")), - ('TypeFormater', _(u"Type")), - ('YearFormater', _(u"Year")), - ('StrToBoolean', _(u"String to boolean")), - ('FileFormater', pgettext_lazy("filesystem", u"File")), - ('UnknowType', _(u"Unknow type")) -) - -IMPORTER_TYPES_DCT = { - 'IntegerFormater': IntegerFormater, - 'FloatFormater': FloatFormater, - 'UnicodeFormater': UnicodeFormater, - 'DateFormater': DateFormater, - 'TypeFormater': TypeFormater, - 'YearFormater': YearFormater, - 'StrToBoolean': StrToBoolean, - 'FileFormater': FileFormater, - 'UnknowType': None, -} - -DATE_FORMATS = ( - ('%Y', _(u"4 digit year. e.g.: \"2015\"")), - ('%Y/%m/%d', _(u"4 digit year/month/day. e.g.: \"2015/02/04\"")), - ('%d/%m/%Y', _(u"Day/month/4 digit year. e.g.: \"04/02/2015\"")), -) - -IMPORTER_TYPES_CHOICES = {'TypeFormater': TARGET_MODELS, - 'DateFormater': DATE_FORMATS} - - -class FormaterTypeManager(models.Manager): - def get_by_natural_key(self, formater_type, options, many_split): - return self.get(formater_type=formater_type, - options=options, many_split=many_split) - - -class FormaterType(models.Model): - formater_type = models.CharField(u"Formater type", max_length=20, - choices=IMPORTER_TYPES) - options = models.CharField(_(u"Options"), max_length=500, blank=True, - null=True) - many_split = models.CharField(_(u"Split character(s)"), max_length=10, - blank=True, null=True) - objects = FormaterTypeManager() - - class Meta: - verbose_name = _(u"Importer - Formater type") - verbose_name_plural = _(u"Importer - Formater types") - unique_together = ('formater_type', 'options', 'many_split') - ordering = ('formater_type', 'options') - - def natural_key(self): - return self.formater_type, self.options, self.many_split - - def __unicode__(self): - return u" - ".join( - [unicode(dict(IMPORTER_TYPES)[self.formater_type]) - if self.formater_type in IMPORTER_TYPES_DCT else ''] + - [getattr(self, k) for k in ('options', 'many_split') - if getattr(self, k)]) - - def get_choices(self): - if self.format_type in IMPORTER_TYPES_CHOICES: - return IMPORTER_TYPES_CHOICES[self.format_type] - - def get_formater_type(self, target, import_instance=None): - if self.formater_type not in IMPORTER_TYPES_DCT.keys(): - return - kwargs = {'db_target': target, 'import_instance': import_instance} - if self.many_split: - kwargs['many_split'] = self.many_split - if self.formater_type == 'TypeFormater': - if self.options not in TARGET_MODELS_KEYS: - logger.warning( - "**WARN FormaterType.get_formater_type**: {} " - "is not in TARGET_MODELS_KEYS".format(self.options)) - return - model = None - if self.options in dir(): - model = dir()[self.options] - else: - model = import_class(self.options) - return TypeFormater(model, **kwargs) - elif self.formater_type == 'UnicodeFormater': - if self.options: - try: - return UnicodeFormater(int(self.options.strip()), **kwargs) - except ValueError: - pass - return UnicodeFormater(**kwargs) - elif self.formater_type == 'DateFormater': - date_formats = self.options - if self.many_split: - date_formats = self.options.split(kwargs.pop('many_split')) - return DateFormater(date_formats, **kwargs) - elif self.formater_type == 'StrToBoolean': - return StrToBoolean(**kwargs) - elif self.formater_type == 'UnknowType': - return - else: - return IMPORTER_TYPES_DCT[self.formater_type](**kwargs) - -IMPORT_STATE = (("C", _(u"Created")), - ("AP", _(u"Analyse in progress")), - ("A", _(u"Analysed")), - ("P", _(u"Import pending")), - ("IP", _(u"Import in progress")), - ("FE", _(u"Finished with errors")), - ("F", _(u"Finished")), - ("AC", _(u"Archived")), - ) - -IMPORT_STATE_DCT = dict(IMPORT_STATE) -ENCODINGS = [(settings.ENCODING, settings.ENCODING), - (settings.ALT_ENCODING, settings.ALT_ENCODING), - ('utf-8', 'utf-8')] - - -class Import(models.Model): - user = models.ForeignKey('IshtarUser') - name = models.CharField(_(u"Name"), max_length=500, - blank=True, null=True) - importer_type = models.ForeignKey(ImporterType) - imported_file = models.FileField( - _(u"Imported file"), upload_to="upload/imports/", max_length=220) - imported_images = models.FileField( - _(u"Associated images (zip file)"), upload_to="upload/imports/", - blank=True, null=True, max_length=220) - encoding = models.CharField(_(u"Encoding"), choices=ENCODINGS, - default=u'utf-8', max_length=15) - skip_lines = models.IntegerField(_(u"Skip lines"), default=1) - error_file = models.FileField(_(u"Error file"), - upload_to="upload/imports/", - blank=True, null=True, max_length=255) - result_file = models.FileField(_(u"Result file"), - upload_to="upload/imports/", - blank=True, null=True, max_length=255) - match_file = models.FileField(_(u"Match file"), - upload_to="upload/imports/", - blank=True, null=True, max_length=255) - state = models.CharField(_(u"State"), max_length=2, choices=IMPORT_STATE, - default=u'C') - conservative_import = models.BooleanField( - _(u"Conservative import"), default=False, - help_text='If set to true, do not overload existing values') - creation_date = models.DateTimeField( - _(u"Creation date"), auto_now_add=True, blank=True, null=True) - end_date = models.DateTimeField(_(u"End date"), blank=True, - null=True, editable=False) - seconds_remaining = models.IntegerField( - _(u"Remaining seconds"), blank=True, null=True, editable=False) - - class Meta: - verbose_name = _(u"Import") - verbose_name_plural = _(u"Imports") - - def __unicode__(self): - return u"{} | {}".format(self.name or u"-", self.importer_type) - - def need_matching(self): - return bool(TargetKey.objects.filter(associated_import=self, - is_set=False).count()) - - @property - def errors(self): - if not self.error_file: - return [] - errors = [] - with open(self.error_file.path, 'rb') as csvfile: - reader = csv.DictReader(csvfile, fieldnames=['line', 'column', - 'error']) - reader.next() # pass the header - for row in reader: - errors.append(row) - return errors - - def get_actions(self): - """ - Get available action relevant with the current status - """ - actions = [] - if self.state == 'C': - actions.append(('A', _(u"Analyse"))) - if self.state == 'A': - actions.append(('A', _(u"Re-analyse"))) - actions.append(('I', _(u"Launch import"))) - if self.state in ('F', 'FE'): - actions.append(('A', _(u"Re-analyse"))) - actions.append(('I', _(u"Re-import"))) - actions.append(('AC', _(u"Archive"))) - if self.state == 'AC': - actions.append(('A', _(u"Unarchive"))) - actions.append(('D', _(u"Delete"))) - return actions - - @property - def imported_filename(self): - return self.imported_file.name.split(os.sep)[-1] - - @property - def status(self): - if self.state not in IMPORT_STATE_DCT: - return "" - return IMPORT_STATE_DCT[self.state] - - def get_importer_instance(self): - return self.importer_type.get_importer_class(import_instance=self)( - skip_lines=self.skip_lines, import_instance=self, - conservative_import=self.conservative_import) - - @property - def data_table(self): - imported_file = self.imported_file.path - tmpdir = None - if zipfile.is_zipfile(imported_file): - z = zipfile.ZipFile(imported_file) - filename = None - for name in z.namelist(): - # get first CSV file found - if name.endswith('.csv'): - filename = name - break - if not filename: - return [] - tmpdir = tempfile.mkdtemp(prefix='tmp-ishtar-') - imported_file = z.extract(filename, tmpdir) - - encodings = [self.encoding] - encodings += [coding for coding, c in ENCODINGS - if coding != self.encoding] - for encoding in encodings: - try: - with open(imported_file) as csv_file: - vals = [line - for line in unicodecsv.reader(csv_file, - encoding=encoding)] - if tmpdir: - shutil.rmtree(tmpdir) - return vals - except UnicodeDecodeError: - pass # try the next encoding - if tmpdir: - shutil.rmtree(tmpdir) - return [] - - def initialize(self): - self.state = 'AP' - self.save() - self.get_importer_instance().initialize(self.data_table, output='db') - self.state = 'A' - self.save() - - def importation(self): - self.state = 'IP' - self.save() - importer = self.get_importer_instance() - importer.importation(self.data_table) - # result file - filename = slugify(self.importer_type.name) - now = datetime.datetime.now().isoformat('-').replace(':', '') - result_file = filename + "_result_%s.csv" % now - result_file = os.sep.join([self.result_file.storage.location, - result_file]) - with open(result_file, 'w') as fle: - fle.write(importer.get_csv_result().encode('utf-8')) - self.result_file = File(open(fle.name)) - if importer.errors: - self.state = 'FE' - error_file = filename + "_errors_%s.csv" % now - error_file = os.sep.join([self.error_file.storage.location, - error_file]) - with open(error_file, 'w') as fle: - fle.write(importer.get_csv_errors().encode('utf-8')) - self.error_file = File(open(fle.name)) - else: - self.state = 'F' - self.error_file = None - if importer.match_table: - match_file = filename + "_match_%s.csv" % now - match_file = os.sep.join([self.match_file.storage.location, - match_file]) - with open(match_file, 'w') as fle: - fle.write(importer.get_csv_matches().encode('utf-8')) - self.match_file = File(open(fle.name)) - self.save() - - def archive(self): - self.state = 'AC' - self.save() - - def get_all_imported(self): - imported = [] - for related, zorg in \ - self._meta.get_all_related_m2m_objects_with_model(): - accessor = related.get_accessor_name() - imported += [(accessor, obj) - for obj in getattr(self, accessor).all()] - return imported - - -def pre_delete_import(sender, **kwargs): - # deleted imported items when an import is delete - instance = kwargs.get('instance') - if not instance: - return - to_delete = [] - for accessor, imported in instance.get_all_imported(): - to_delete.append(imported) - for item in to_delete: - item.delete() - - -pre_delete.connect(pre_delete_import, sender=Import) - class Organization(Address, Merge, OwnPerms, ValueGetter): TABLE_COLS = ('name', 'organization_type', 'town') |