diff options
author | Étienne Loks <etienne.loks@iggdrasil.net> | 2021-09-01 17:58:56 +0200 |
---|---|---|
committer | Étienne Loks <etienne.loks@iggdrasil.net> | 2022-12-12 12:20:58 +0100 |
commit | 3b671732319aa14f194821a8f547ae280e1e0648 (patch) | |
tree | aa16f6f20916eecc7a99d9eed661d8bd1ddf9522 /ishtar_common/models_imports.py | |
parent | e416f8d0702e2f3f981b50aebc08c67f7034d6a1 (diff) | |
download | Ishtar-3b671732319aa14f194821a8f547ae280e1e0648.tar.bz2 Ishtar-3b671732319aa14f194821a8f547ae280e1e0648.zip |
Migration to Django 2.2 - missing on_delete - django.urls import reverse
Diffstat (limited to 'ishtar_common/models_imports.py')
-rw-r--r-- | ishtar_common/models_imports.py | 1024 |
1 files changed, 595 insertions, 429 deletions
diff --git a/ishtar_common/models_imports.py b/ishtar_common/models_imports.py index 3779b656e..54409e2c9 100644 --- a/ishtar_common/models_imports.py +++ b/ishtar_common/models_imports.py @@ -48,13 +48,32 @@ except (AssertionError, ImportError): UnoCalc = None from ishtar_common.model_managers import SlugModelManager -from ishtar_common.utils import create_slug, \ - get_all_related_m2m_objects_with_model, put_session_message, \ - put_session_var, get_session_var, num2col, max_size_help, import_class -from ishtar_common.data_importer import Importer, ImportFormater, \ - IntegerFormater, FloatFormater, UnicodeFormater, DateFormater, \ - TypeFormater, YearFormater, StrToBoolean, FileFormater, InseeFormater, \ - ImporterError, UpperCaseFormater, LowerCaseFormater +from ishtar_common.utils import ( + create_slug, + get_all_related_m2m_objects_with_model, + put_session_message, + put_session_var, + get_session_var, + num2col, + max_size_help, + import_class, +) +from ishtar_common.data_importer import ( + Importer, + ImportFormater, + IntegerFormater, + FloatFormater, + UnicodeFormater, + DateFormater, + TypeFormater, + YearFormater, + StrToBoolean, + FileFormater, + InseeFormater, + ImporterError, + UpperCaseFormater, + LowerCaseFormater, +) from ishtar_common.utils import task @@ -86,13 +105,13 @@ class ImporterModel(models.Model): class Meta: verbose_name = _("Model") verbose_name_plural = _("Models") - ordering = ('name',) + ordering = ("name",) def __str__(self): return self.name def natural_key(self): - return (self.klass, ) + return (self.klass,) class ImporterTypeManager(models.Manager): @@ -104,23 +123,32 @@ class ImporterType(models.Model): """ Description of a table to be mapped with ishtar database """ + name = models.CharField(_("Name"), max_length=200) slug = models.SlugField(_("Slug"), unique=True, max_length=100) - description = models.CharField(_("Description"), blank=True, null=True, - max_length=500) - users = models.ManyToManyField('IshtarUser', verbose_name=_("Users"), - blank=True) + description = models.CharField( + _("Description"), blank=True, null=True, max_length=500 + ) + users = models.ManyToManyField("IshtarUser", verbose_name=_("Users"), blank=True) associated_models = models.ForeignKey( - ImporterModel, verbose_name=_("Associated model"), + ImporterModel, + verbose_name=_("Associated model"), on_delete=models.SET_NULL, - related_name='importer_type_associated', blank=True, null=True) + related_name="importer_type_associated", + blank=True, + null=True, + ) created_models = models.ManyToManyField( - ImporterModel, verbose_name=_("Models that can accept new items"), - blank=True, help_text=_("Leave blank for no restrictions"), - related_name='importer_type_created') + ImporterModel, + verbose_name=_("Models that can accept new items"), + blank=True, + help_text=_("Leave blank for no restrictions"), + related_name="importer_type_created", + ) is_template = models.BooleanField(_("Can be exported"), default=False) - unicity_keys = models.CharField(_("Unicity keys (separator \";\")"), - blank=True, null=True, max_length=500) + unicity_keys = models.CharField( + _('Unicity keys (separator ";")'), blank=True, null=True, max_length=500 + ) available = models.BooleanField(_("Available"), default=True) objects = ImporterTypeManager() SERIALIZATION_EXCLUDE = ["users"] @@ -128,10 +156,10 @@ class ImporterType(models.Model): class Meta: verbose_name = _("Importer - Type") verbose_name_plural = _("Importer - Types") - ordering = ('name',) + ordering = ("name",) def natural_key(self): - return (self.slug, ) + return (self.slug,) def __str__(self): return self.name @@ -150,7 +178,7 @@ class ImporterType(models.Model): return col_number = 1 # user number so we start with 1 lst_col_number = 0 - for column in self.columns.order_by('col_number').all(): + for column in self.columns.order_by("col_number").all(): while column.col_number > col_number: col_number += 1 # header @@ -171,7 +199,7 @@ class ImporterType(models.Model): if not ft: continue # first we only manage TypeFormater - if ft.formater_type != 'TypeFormater': + if ft.formater_type != "TypeFormater": continue if not ft.options: # bad configuration continue @@ -181,11 +209,18 @@ class ImporterType(models.Model): lst = [] for typ in model.get_types(instances=True): lst.append(str(typ)) - end_row = uno.create_list(lst_sheet, lst_col_number, 0, - str(model._meta.verbose_name), lst) + end_row = uno.create_list( + lst_sheet, lst_col_number, 0, str(model._meta.verbose_name), lst + ) uno.set_cell_validation_list( - main_sheet, col_number, 2, ROW_NUMBER + 2, - lst_sheet, lst_col_number, [1, end_row]) + main_sheet, + col_number, + 2, + ROW_NUMBER + 2, + lst_sheet, + lst_col_number, + [1, end_row], + ) lst_col_number += 1 tmpdir = tempfile.mkdtemp(prefix="ishtar-templates-") dest_filename = "{}{}{}.ods".format(tmpdir, os.sep, self.name) @@ -194,12 +229,13 @@ class ImporterType(models.Model): def get_importer_class(self, import_instance=None): OBJECT_CLS = import_class(self.associated_models.klass) - DEFAULTS = dict([(default.keys, default.values) - for default in self.defaults.all()]) + DEFAULTS = dict( + [(default.keys, default.values) for default in self.defaults.all()] + ) LINE_FORMAT = [] LINE_EXPORT_FORMAT = [] idx = 0 - for column in self.columns.order_by('col_number').all(): + for column in self.columns.order_by("col_number").all(): idx += 1 while column.col_number > idx: LINE_FORMAT.append(None) @@ -212,8 +248,7 @@ class ImporterType(models.Model): LINE_FORMAT.append(None) if column.export_field_name: LINE_EXPORT_FORMAT.append( - ImportFormater(column.export_field_name, - label=column.label) + ImportFormater(column.export_field_name, label=column.label) ) continue force_news = [] @@ -221,7 +256,8 @@ class ImporterType(models.Model): concat = [] for target in column.targets.order_by("pk").all(): ft = target.formater_type.get_formater_type( - target, import_instance=import_instance) + target, import_instance=import_instance + ) if not ft: continue formater_types.append(ft) @@ -231,42 +267,46 @@ class ImporterType(models.Model): concat.append(target.concat) formater_kwargs = {} if column.regexp_pre_filter: - formater_kwargs['regexp'] = re.compile( - column.regexp_pre_filter.regexp) + formater_kwargs["regexp"] = re.compile(column.regexp_pre_filter.regexp) if column.value_format: - formater_kwargs['value_format'] = \ - column.value_format.format_string - formater_kwargs['concat'] = concat - formater_kwargs['concat_str'] = concat_str - formater_kwargs['duplicate_fields'] = [ - (field.field_name, field.force_new, field.concat, - field.concat_str) - for field in column.duplicate_fields.all()] - formater_kwargs['label'] = column.label - formater_kwargs['required'] = column.required - formater_kwargs['force_new'] = force_news - formater_kwargs['comment'] = column.description + formater_kwargs["value_format"] = column.value_format.format_string + formater_kwargs["concat"] = concat + formater_kwargs["concat_str"] = concat_str + formater_kwargs["duplicate_fields"] = [ + (field.field_name, field.force_new, field.concat, field.concat_str) + for field in column.duplicate_fields.all() + ] + formater_kwargs["label"] = column.label + formater_kwargs["required"] = column.required + formater_kwargs["force_new"] = force_news + formater_kwargs["comment"] = column.description if column.export_field_name: - formater_kwargs['export_field_name'] = [ - column.export_field_name] - formater = ImportFormater(targets, formater_types, - **formater_kwargs) + formater_kwargs["export_field_name"] = [column.export_field_name] + formater = ImportFormater(targets, formater_types, **formater_kwargs) LINE_FORMAT.append(formater) LINE_EXPORT_FORMAT.append(formater) UNICITY_KEYS = [] if self.unicity_keys: - UNICITY_KEYS = [un.strip() for un in self.unicity_keys.split(';')] + UNICITY_KEYS = [un.strip() for un in self.unicity_keys.split(";")] MODEL_CREATION_LIMIT = [] for modls in self.created_models.all(): MODEL_CREATION_LIMIT.append(import_class(modls.klass)) - args = {'OBJECT_CLS': OBJECT_CLS, 'DESC': self.description, - 'DEFAULTS': DEFAULTS, 'LINE_FORMAT': LINE_FORMAT, - 'UNICITY_KEYS': UNICITY_KEYS, - 'LINE_EXPORT_FORMAT': LINE_EXPORT_FORMAT, - 'MODEL_CREATION_LIMIT': MODEL_CREATION_LIMIT} - name = str(''.join( - x for x in slugify(self.name).replace('-', ' ').title() - if not x.isspace())) + args = { + "OBJECT_CLS": OBJECT_CLS, + "DESC": self.description, + "DEFAULTS": DEFAULTS, + "LINE_FORMAT": LINE_FORMAT, + "UNICITY_KEYS": UNICITY_KEYS, + "LINE_EXPORT_FORMAT": LINE_EXPORT_FORMAT, + "MODEL_CREATION_LIMIT": MODEL_CREATION_LIMIT, + } + name = str( + "".join( + x + for x in slugify(self.name).replace("-", " ").title() + if not x.isspace() + ) + ) newclass = type(name, (Importer,), args) return newclass @@ -289,25 +329,32 @@ def get_associated_model(parent_model, keys): elif not idx: if item not in fields: raise ImporterError( - str(_("Importer configuration error: " - "\"{}\" is not available for \"{}\"." - " Check your default and column " - "configuration")).format( - item, OBJECT_CLS.__name__)) + str( + _( + "Importer configuration error: " + '"{}" is not available for "{}".' + " Check your default and column " + "configuration" + ) + ).format(item, OBJECT_CLS.__name__) + ) field = fields[item] - if hasattr(field, 'rel') and hasattr(field.rel, 'to'): + if hasattr(field, "rel") and hasattr(field.rel, "to"): model = field.rel.to if type(field) == ModelBase: model = field else: if not model: raise ImporterError( - str(_("Importer configuration error: " - "\"{}\" is not available for \"{}\"." - " Check your default and column " - "configuration")).format( - "__".join(keys[1:]), - OBJECT_CLS.__name__)) + str( + _( + "Importer configuration error: " + '"{}" is not available for "{}".' + " Check your default and column " + "configuration" + ) + ).format("__".join(keys[1:]), OBJECT_CLS.__name__) + ) return get_associated_model(model, keys[1:]) return model @@ -321,13 +368,17 @@ class ImporterDefault(models.Model): """ Targets of default values in an import """ - importer_type = models.ForeignKey(ImporterType, related_name='defaults') + + importer_type = models.ForeignKey( + ImporterType, related_name="defaults", on_delete=models.CASCADE + ) target = models.CharField("Target", max_length=500) class Meta: verbose_name = _("Importer - Default") verbose_name_plural = _("Importer - Defaults") - unique_together = ('importer_type', 'target') + unique_together = ("importer_type", "target") + objects = ImporterDefaultManager() def __str__(self): @@ -338,14 +389,15 @@ class ImporterDefault(models.Model): @property def keys(self): - return tuple(t for t in self.target.split('__') if t not in ("-", "")) + return tuple(t for t in self.target.split("__") if t not in ("-", "")) @property def associated_model(self): if not self.keys: return import_class(self.importer_type.associated_models.klass) - return get_associated_model(self.importer_type.associated_models.klass, - self.keys) + return get_associated_model( + self.importer_type.associated_models.klass, self.keys + ) @property def values(self): @@ -360,17 +412,21 @@ class ImporterDefault(models.Model): class ImporterDefaultValuesManager(models.Manager): def get_by_natural_key(self, def_target_type, def_target, target): - return self.get(default_target__importer_type__slug=def_target_type, - default_target__target=def_target, - target=target) + return self.get( + default_target__importer_type__slug=def_target_type, + default_target__target=def_target, + target=target, + ) class ImporterDefaultValues(models.Model): """ Default values in an import """ - default_target = models.ForeignKey(ImporterDefault, - related_name='default_values') + + default_target = models.ForeignKey( + ImporterDefault, related_name="default_values", on_delete=models.CASCADE + ) target = models.CharField("Target", max_length=500) value = models.CharField("Value", max_length=500) objects = ImporterDefaultValuesManager() @@ -378,12 +434,14 @@ class ImporterDefaultValues(models.Model): class Meta: verbose_name = _("Importer - Default value") verbose_name_plural = _("Importer - Default values") - unique_together = ('default_target', 'target') + unique_together = ("default_target", "target") def natural_key(self): - return (self.default_target.importer_type.slug, - self.default_target.target, - self.target) + return ( + self.default_target.importer_type.slug, + self.default_target.target, + self.target, + ) def __str__(self): return "{} - {}".format(self.default_target, self.target, self.value) @@ -397,7 +455,7 @@ class ImporterDefaultValues(models.Model): if target not in fields: return field = fields[target] - if not hasattr(field, 'rel') or not hasattr(field.rel, 'to'): + if not hasattr(field, "rel") or not hasattr(field.rel, "to"): return self.value model = field.rel.to # if value is an id @@ -415,40 +473,52 @@ class ImporterDefaultValues(models.Model): class ImporterColumnManager(models.Manager): def get_by_natural_key(self, importer_type, col_number): - return self.get(importer_type__slug=importer_type, - col_number=col_number) + return self.get(importer_type__slug=importer_type, col_number=col_number) class ImporterColumn(models.Model): """ Import file column description """ - label = models.CharField(_("Label"), blank=True, null=True, - max_length=200) - importer_type = models.ForeignKey(ImporterType, related_name='columns') + + label = models.CharField(_("Label"), blank=True, null=True, max_length=200) + importer_type = models.ForeignKey( + ImporterType, related_name="columns", on_delete=models.CASCADE + ) col_number = models.IntegerField(_("Column number"), default=1) description = models.TextField(_("Description"), blank=True, null=True) regexp_pre_filter = models.ForeignKey( - "Regexp", blank=True, null=True, on_delete=models.SET_NULL, + "Regexp", + blank=True, + null=True, + on_delete=models.SET_NULL, related_name="columns", ) value_format = models.ForeignKey( - "ValueFormater", blank=True, null=True, on_delete=models.SET_NULL, - related_name="columns" + "ValueFormater", + blank=True, + null=True, + on_delete=models.SET_NULL, + related_name="columns", ) required = models.BooleanField(_("Required"), default=False) export_field_name = models.CharField( - _("Export field name"), blank=True, null=True, max_length=200, - help_text=_("Fill this field if the field name is ambiguous for " - "export. For instance: concatenated fields.") + _("Export field name"), + blank=True, + null=True, + max_length=200, + help_text=_( + "Fill this field if the field name is ambiguous for " + "export. For instance: concatenated fields." + ), ) objects = ImporterColumnManager() class Meta: verbose_name = _("Importer - Column") verbose_name_plural = _("Importer - Columns") - ordering = ('importer_type', 'col_number') - unique_together = ('importer_type', 'col_number') + ordering = ("importer_type", "col_number") + unique_together = ("importer_type", "col_number") def __str__(self): return "{} - {}".format(self.importer_type, self.col_number) @@ -461,11 +531,10 @@ class ImporterColumn(models.Model): return self.importer_type.slug, self.col_number def targets_lbl(self): - return ', '.join([target.target for target in self.targets.all()]) + return ", ".join([target.target for target in self.targets.all()]) def duplicate_fields_lbl(self): - return ', '.join([dp.field_name or "" - for dp in self.duplicate_fields.all()]) + return ", ".join([dp.field_name or "" for dp in self.duplicate_fields.all()]) def formater_type_lbl(self): return ', '.join([str(target.formater_type) for target in self.targets.all()]) @@ -473,35 +542,39 @@ class ImporterColumn(models.Model): class ImporterDuplicateFieldManager(models.Manager): def get_by_natural_key(self, importer_type, col_number, field_name): - return self.get(column__importer_type__slug=importer_type, - column__col_number=col_number, - field_name=field_name) + return self.get( + column__importer_type__slug=importer_type, + column__col_number=col_number, + field_name=field_name, + ) class ImporterDuplicateField(models.Model): """ Direct copy of result in other fields """ - column = models.ForeignKey(ImporterColumn, related_name='duplicate_fields') - field_name = models.CharField(_("Field name"), blank=True, null=True, - max_length=200) - force_new = models.BooleanField(_("Force creation of new items"), - default=False) - concat = models.BooleanField(_("Concatenate with existing"), - default=False) - concat_str = models.CharField(_("Concatenate character"), max_length=5, - blank=True, null=True) + + column = models.ForeignKey( + ImporterColumn, related_name="duplicate_fields", on_delete=models.CASCADE + ) + field_name = models.CharField( + _("Field name"), blank=True, null=True, max_length=200 + ) + force_new = models.BooleanField(_("Force creation of new items"), default=False) + concat = models.BooleanField(_("Concatenate with existing"), default=False) + concat_str = models.CharField( + _("Concatenate character"), max_length=5, blank=True, null=True + ) objects = ImporterDuplicateFieldManager() class Meta: verbose_name = _("Importer - Duplicate field") verbose_name_plural = _("Importer - Duplicate fields") - ordering = ('column', 'field_name') - unique_together = ('column', 'field_name') + ordering = ("column", "field_name") + unique_together = ("column", "field_name") def natural_key(self): - return self.column.importer_type.slug, self.column.col_number, \ - self.field_name + return self.column.importer_type.slug, self.column.col_number, self.field_name class NamedManager(models.Manager): @@ -523,7 +596,7 @@ class Regexp(models.Model): return self.name def natural_key(self): - return (self.name, ) + return (self.name,) class ValueFormater(models.Model): @@ -531,11 +604,14 @@ class ValueFormater(models.Model): slug = models.SlugField(_("Slug"), unique=True, max_length=100) description = models.TextField(_("Description"), blank=True, null=True) format_string = models.CharField( - _("Format string"), max_length=100, - help_text=_("A string used to format a value using the Python " - "\"format()\" method. The site https://pyformat.info/ " - "provide good examples of usage. Only one \"{}\" entry " - "is managed. The input is assumed to be a string.") + _("Format string"), + max_length=100, + help_text=_( + "A string used to format a value using the Python " + '"format()" method. The site https://pyformat.info/ ' + 'provide good examples of usage. Only one "{}" entry ' + "is managed. The input is assumed to be a string." + ), ) objects = SlugModelManager() @@ -551,41 +627,50 @@ class ValueFormater(models.Model): self.format_string.format("sample value") except ValueError: raise ValidationError( - {'format_string': _("The string provided generate an error. " - "Fix it.")} + { + "format_string": _( + "The string provided generate an error. " "Fix it." + ) + } ) def natural_key(self): - return (self.slug, ) + return (self.slug,) class ImportTargetManager(models.Manager): def get_by_natural_key(self, importer_type, col_number, target): - return self.get(column__importer_type__slug=importer_type, - column__col_number=col_number, - target=target) + return self.get( + column__importer_type__slug=importer_type, + column__col_number=col_number, + target=target, + ) class ImportTarget(models.Model): """ Ishtar database target for a column """ - column = models.ForeignKey(ImporterColumn, related_name='targets') + + column = models.ForeignKey( + ImporterColumn, related_name="targets", on_delete=models.CASCADE + ) target = models.CharField("Target", max_length=500) - formater_type = models.ForeignKey("FormaterType", related_name='targets') - force_new = models.BooleanField(_("Force creation of new items"), - default=False) - concat = models.BooleanField(_("Concatenate with existing"), - default=False) - concat_str = models.CharField(_("Concatenate character"), max_length=5, - blank=True, null=True) + formater_type = models.ForeignKey( + "FormaterType", related_name="targets", on_delete=models.CASCADE + ) + force_new = models.BooleanField(_("Force creation of new items"), default=False) + concat = models.BooleanField(_("Concatenate with existing"), default=False) + concat_str = models.CharField( + _("Concatenate character"), max_length=5, blank=True, null=True + ) comment = models.TextField(_("Comment"), blank=True, null=True) objects = ImportTargetManager() class Meta: verbose_name = _("Importer - Target") verbose_name_plural = _("Importer - Targets") - unique_together = ('column', 'target') + unique_together = ("column", "target") def __str__(self): return self.target[:50] if self.target else self.comment @@ -599,8 +684,7 @@ class ImportTarget(models.Model): return "{} - {}".format(self.target[:50], desc) def natural_key(self): - return self.column.importer_type.slug, self.column.col_number, \ - self.target + return self.column.importer_type.slug, self.column.col_number, self.target @property def associated_model(self): @@ -609,24 +693,24 @@ class ImportTarget(models.Model): try: return get_associated_model( self.column.importer_type.associated_models.klass, - self.target.split('__')) + self.target.split("__"), + ) except KeyError: return def get_choices(self): - if self.formater_type.formater_type == 'UnknowType' \ - and self.column.importer_type.slug: + if ( + self.formater_type.formater_type == "UnknowType" + and self.column.importer_type.slug + ): cls = self.column.importer_type.get_importer_class() formt = cls().line_format[self.column.col_number - 1] - if hasattr(formt.formater, 'choices'): - return [('', '--' * 8)] + list(formt.formater.choices) - return [('', '--' * 8)] - if self.formater_type.formater_type == 'StrToBoolean': - return [('', '--' * 8), - ('True', _("True")), - ('False', _("False"))] - if not self.associated_model or not hasattr(self.associated_model, - 'get_types'): + if hasattr(formt.formater, "choices"): + return [("", "--" * 8)] + list(formt.formater.choices) + return [("", "--" * 8)] + if self.formater_type.formater_type == "StrToBoolean": + return [("", "--" * 8), ("True", _("True")), ("False", _("False"))] + if not self.associated_model or not hasattr(self.associated_model, "get_types"): return [] return self.associated_model.get_types() @@ -635,11 +719,12 @@ class TargetKeyGroup(models.Model): """ Group of target keys for imports. """ + name = models.TextField(_("Name"), unique=True) - all_user_can_use = models.BooleanField(_("All users can use it"), - default=False) - all_user_can_modify = models.BooleanField(_("All users can modify it"), - default=False) + all_user_can_use = models.BooleanField(_("All users can use it"), default=False) + all_user_can_modify = models.BooleanField( + _("All users can modify it"), default=False + ) available = models.BooleanField(_("Available"), default=True) class Meta: @@ -660,20 +745,33 @@ class TargetKey(models.Model): one particular group (associated_group) or to all imports (associated_import, associated_user and associated_group are empty). """ - target = models.ForeignKey(ImportTarget, related_name='keys') + + target = models.ForeignKey( + ImportTarget, related_name="keys", on_delete=models.CASCADE + ) key = models.TextField(_("Key")) value = models.TextField(_("Value"), blank=True, null=True) is_set = models.BooleanField(_("Is set"), default=False) - associated_import = models.ForeignKey('Import', blank=True, null=True) - associated_user = models.ForeignKey('IshtarUser', blank=True, null=True) - associated_group = models.ForeignKey(TargetKeyGroup, blank=True, null=True) + associated_import = models.ForeignKey( + "Import", blank=True, null=True, on_delete=models.SET_NULL + ) + associated_user = models.ForeignKey( + "IshtarUser", blank=True, null=True, on_delete=models.SET_NULL + ) + associated_group = models.ForeignKey( + TargetKeyGroup, blank=True, null=True, on_delete=models.SET_NULL + ) class Meta: - unique_together = ('target', 'key', 'associated_user', - 'associated_import',) + unique_together = ( + "target", + "key", + "associated_user", + "associated_import", + ) verbose_name = _("Importer - Target key") verbose_name_plural = _("Importer - Targets keys") - ordering = ('target', 'key') + ordering = ("target", "key") def __str__(self): return " - ".join([str(self.target), self.key[:50]]) @@ -689,8 +787,8 @@ class TargetKey(models.Model): def format(self): if not self.is_set: return None - if self.target.formater_type.formater_type == 'StrToBoolean': - if self.value in ('False', '0'): + if self.target.formater_type.formater_type == "StrToBoolean": + if self.value in ("False", "0"): return False elif self.value: return True @@ -703,162 +801,165 @@ class TargetKey(models.Model): return obj v = None associated_model = self.target.associated_model - if associated_model and hasattr(self.target.associated_model, - "add_key"): + if associated_model and hasattr(self.target.associated_model, "add_key"): # pk is given try: - v = self.target.associated_model.objects.get( - pk=str(int(self.value))) + v = self.target.associated_model.objects.get(pk=str(int(self.value))) except (ValueError, self.target.associated_model.DoesNotExist): # try with txt_idx try: v = self.target.associated_model.objects.get( - txt_idx=str(self.value)) + txt_idx=str(self.value) + ) except self.target.associated_model.DoesNotExist: pass if v: keys = {} if self.associated_group: - keys['group'] = self.associated_group + keys["group"] = self.associated_group if self.associated_user: - keys['user'] = self.associated_user + keys["user"] = self.associated_user else: - keys['importer'] = self.associated_import + keys["importer"] = self.associated_import v.add_key(self.key, **keys) return obj TARGET_MODELS = [ - ('OrganizationType', _("Organization type")), - ('ishtar_common.models.OrganizationType', _("Organization type")), - ('ishtar_common.models.PersonType', _("Person type")), - ('TitleType', _("Title")), - ('SourceType', _("Source type")), - ('ishtar_common.models.SourceType', _("Source type")), - ('AuthorType', _("Author type")), - ('Format', _("Format")), - ('ishtar_common.models.Format', _("Format")), - ('ishtar_common.models.LicenseType', _("License type")), - ('ishtar_common.models.DocumentTag', _("Document tag")), - ('ishtar_common.models.Language', _("Language")), - ('ishtar_common.models.SupportType', _("Support type")), - ('archaeological_operations.models.OperationType', _("Operation type")), - ('archaeological_operations.models.Period', _("Period")), - ('archaeological_operations.models.ReportState', _("Report state")), - ('archaeological_operations.models.RemainType', _("Remain type")), - ('archaeological_operations.models.RelationType', - _("Operation relation type")), + ("OrganizationType", _("Organization type")), + ("ishtar_common.models.OrganizationType", _("Organization type")), + ("ishtar_common.models.PersonType", _("Person type")), + ("TitleType", _("Title")), + ("SourceType", _("Source type")), + ("ishtar_common.models.SourceType", _("Source type")), + ("AuthorType", _("Author type")), + ("Format", _("Format")), + ("ishtar_common.models.Format", _("Format")), + ("ishtar_common.models.LicenseType", _("License type")), + ("ishtar_common.models.DocumentTag", _("Document tag")), + ("ishtar_common.models.Language", _("Language")), + ("ishtar_common.models.SupportType", _("Support type")), + ("archaeological_operations.models.OperationType", _("Operation type")), + ("archaeological_operations.models.Period", _("Period")), + ("archaeological_operations.models.ReportState", _("Report state")), + ("archaeological_operations.models.RemainType", _("Remain type")), + ("archaeological_operations.models.RelationType", _("Operation relation type")), ("archaeological_operations.models.ActType", _("Act type")), - ('archaeological_context_records.models.Unit', _("Unit")), - ('archaeological_context_records.models.ActivityType', - _("Activity type")), - ('archaeological_context_records.models.DocumentationType', - _("Documentation type")), - ("archaeological_context_records.models.DatingQuality", - _("Dating quality")), - ('archaeological_finds.models.MaterialType', _("Material")), - ('archaeological_finds.models.ConservatoryState', - _("Conservatory state")), - ('archaeological_warehouse.models.ContainerType', _("Container type")), - ('archaeological_warehouse.models.WarehouseDivision', - _("Warehouse division")), - ('archaeological_warehouse.models.WarehouseType', _("Warehouse type")), - ('archaeological_finds.models.TreatmentType', _("Treatment type")), - ('archaeological_finds.models.TreatmentEmergencyType', - _("Treatment emergency type")), - ('archaeological_finds.models.ObjectType', _("Object type")), - ('archaeological_finds.models.IntegrityType', _("Integrity type")), - ('archaeological_finds.models.RemarkabilityType', - _("Remarkability type")), - ('archaeological_finds.models.AlterationType', _("Alteration type")), - ('archaeological_finds.models.AlterationCauseType', - _("Alteration cause type")), - ('archaeological_finds.models.BatchType', _("Batch type")), - ('archaeological_finds.models.CheckedType', _("Checked type")), - ('archaeological_finds.models.MaterialTypeQualityType', - _("Material type quality")), + ("archaeological_context_records.models.Unit", _("Unit")), + ("archaeological_context_records.models.ActivityType", _("Activity type")), + ( + "archaeological_context_records.models.DocumentationType", + _("Documentation type"), + ), + ("archaeological_context_records.models.DatingQuality", _("Dating quality")), + ("archaeological_finds.models.MaterialType", _("Material")), + ("archaeological_finds.models.ConservatoryState", _("Conservatory state")), + ("archaeological_warehouse.models.ContainerType", _("Container type")), + ("archaeological_warehouse.models.WarehouseDivision", _("Warehouse division")), + ("archaeological_warehouse.models.WarehouseType", _("Warehouse type")), + ("archaeological_finds.models.TreatmentType", _("Treatment type")), + ( + "archaeological_finds.models.TreatmentEmergencyType", + _("Treatment emergency type"), + ), + ("archaeological_finds.models.ObjectType", _("Object type")), + ("archaeological_finds.models.IntegrityType", _("Integrity type")), + ("archaeological_finds.models.RemarkabilityType", _("Remarkability type")), + ("archaeological_finds.models.AlterationType", _("Alteration type")), + ("archaeological_finds.models.AlterationCauseType", _("Alteration cause type")), + ("archaeological_finds.models.BatchType", _("Batch type")), + ("archaeological_finds.models.CheckedType", _("Checked type")), + ("archaeological_finds.models.MaterialTypeQualityType", _("Material type quality")), ("archaeological_finds.models.FunctionalArea", _("Functional area")), - ('archaeological_context_records.models.IdentificationType', - _("Identification type")), - ('archaeological_context_records.models.RelationType', - _("Context record relation type")), - ('SpatialReferenceSystem', _("Spatial reference system")), - ('SupportType', _("Support type")), - ('TitleType', _("Title type")), + ( + "archaeological_context_records.models.IdentificationType", + _("Identification type"), + ), + ( + "archaeological_context_records.models.RelationType", + _("Context record relation type"), + ), + ("SpatialReferenceSystem", _("Spatial reference system")), + ("SupportType", _("Support type")), + ("TitleType", _("Title type")), ] TARGET_MODELS_KEYS = [tm[0] for tm in TARGET_MODELS] IMPORTER_TYPES = ( - ('IntegerFormater', _("Integer")), - ('FloatFormater', _("Float")), - ('UnicodeFormater', _("String")), - ('DateFormater', _("Date")), - ('TypeFormater', _("Type")), - ('YearFormater', _("Year")), - ('InseeFormater', _("INSEE code")), - ('UpperFormater', _("Upper case")), - ('LowerFormater', _("Lower case")), - ('StrToBoolean', _("String to boolean")), - ('FileFormater', pgettext_lazy("filesystem", "File")), - ('UnknowType', _("Unknow type")) + ("IntegerFormater", _("Integer")), + ("FloatFormater", _("Float")), + ("UnicodeFormater", _("String")), + ("DateFormater", _("Date")), + ("TypeFormater", _("Type")), + ("YearFormater", _("Year")), + ("InseeFormater", _("INSEE code")), + ("UpperFormater", _("Upper case")), + ("LowerFormater", _("Lower case")), + ("StrToBoolean", _("String to boolean")), + ("FileFormater", pgettext_lazy("filesystem", "File")), + ("UnknowType", _("Unknow type")), ) IMPORTER_TYPES_DCT = { - 'IntegerFormater': IntegerFormater, - 'FloatFormater': FloatFormater, - 'UnicodeFormater': UnicodeFormater, - 'DateFormater': DateFormater, - 'TypeFormater': TypeFormater, - 'YearFormater': YearFormater, - 'StrToBoolean': StrToBoolean, - 'FileFormater': FileFormater, - 'InseeFormater': InseeFormater, - 'UpperFormater': UpperCaseFormater, - 'LowerFormater': LowerCaseFormater, - 'UnknowType': None, + "IntegerFormater": IntegerFormater, + "FloatFormater": FloatFormater, + "UnicodeFormater": UnicodeFormater, + "DateFormater": DateFormater, + "TypeFormater": TypeFormater, + "YearFormater": YearFormater, + "StrToBoolean": StrToBoolean, + "FileFormater": FileFormater, + "InseeFormater": InseeFormater, + "UpperFormater": UpperCaseFormater, + "LowerFormater": LowerCaseFormater, + "UnknowType": None, } DATE_FORMATS = ( - ('%Y', _("4 digit year. e.g.: \"2015\"")), - ('%Y/%m/%d', _("4 digit year/month/day. e.g.: \"2015/02/04\"")), - ('%d/%m/%Y', _("Day/month/4 digit year. e.g.: \"04/02/2015\"")), + ("%Y", _('4 digit year. e.g.: "2015"')), + ("%Y/%m/%d", _('4 digit year/month/day. e.g.: "2015/02/04"')), + ("%d/%m/%Y", _('Day/month/4 digit year. e.g.: "04/02/2015"')), ) -IMPORTER_TYPES_CHOICES = {'TypeFormater': TARGET_MODELS, - 'DateFormater': DATE_FORMATS} +IMPORTER_TYPES_CHOICES = {"TypeFormater": TARGET_MODELS, "DateFormater": DATE_FORMATS} class FormaterTypeManager(models.Manager): def get_by_natural_key(self, formater_type, options, many_split): - return self.get(formater_type=formater_type, - options=options, many_split=many_split) + return self.get( + formater_type=formater_type, options=options, many_split=many_split + ) class FormaterType(models.Model): - formater_type = models.CharField("Formater type", max_length=20, - choices=IMPORTER_TYPES) - options = models.CharField(_("Options"), max_length=500, blank=True, - null=True) - many_split = models.CharField(_("Split character(s)"), max_length=10, - blank=True, null=True) + formater_type = models.CharField( + "Formater type", max_length=20, choices=IMPORTER_TYPES + ) + options = models.CharField(_("Options"), max_length=500, blank=True, null=True) + many_split = models.CharField( + _("Split character(s)"), max_length=10, blank=True, null=True + ) objects = FormaterTypeManager() class Meta: verbose_name = _("Importer - Formater type") verbose_name_plural = _("Importer - Formater types") - unique_together = ('formater_type', 'options', 'many_split') - ordering = ('formater_type', 'options') + unique_together = ("formater_type", "options", "many_split") + ordering = ("formater_type", "options") def natural_key(self): return self.formater_type, self.options, self.many_split def __str__(self): return " - ".join( - [str(dict(IMPORTER_TYPES)[self.formater_type]) - if self.formater_type in IMPORTER_TYPES_DCT else ''] + - [getattr(self, k) for k in ('options', 'many_split') - if getattr(self, k)]) + [ + str(dict(IMPORTER_TYPES)[self.formater_type]) + if self.formater_type in IMPORTER_TYPES_DCT + else "" + ] + + [getattr(self, k) for k in ("options", "many_split") if getattr(self, k)] + ) def get_choices(self): if self.format_type in IMPORTER_TYPES_CHOICES: @@ -867,35 +968,36 @@ class FormaterType(models.Model): def get_formater_type(self, target, import_instance=None): if self.formater_type not in IMPORTER_TYPES_DCT.keys(): return - kwargs = {'db_target': target, 'import_instance': import_instance} + kwargs = {"db_target": target, "import_instance": import_instance} if self.many_split: - kwargs['many_split'] = self.many_split - if self.formater_type == 'TypeFormater': + kwargs["many_split"] = self.many_split + if self.formater_type == "TypeFormater": if self.options not in TARGET_MODELS_KEYS: logger.warning( "**WARN FormaterType.get_formater_type**: {} " - "is not in TARGET_MODELS_KEYS".format(self.options)) + "is not in TARGET_MODELS_KEYS".format(self.options) + ) return if self.options in dir(): model = dir()[self.options] else: model = import_class(self.options) return TypeFormater(model, **kwargs) - elif self.formater_type == 'UnicodeFormater': + elif self.formater_type == "UnicodeFormater": if self.options: try: return UnicodeFormater(int(self.options.strip()), **kwargs) except ValueError: pass return UnicodeFormater(**kwargs) - elif self.formater_type == 'DateFormater': + elif self.formater_type == "DateFormater": date_formats = self.options if self.many_split: - date_formats = self.options.split(kwargs.pop('many_split')) + date_formats = self.options.split(kwargs.pop("many_split")) return DateFormater(date_formats, **kwargs) - elif self.formater_type == 'StrToBoolean': + elif self.formater_type == "StrToBoolean": return StrToBoolean(**kwargs) - elif self.formater_type == 'UnknowType': + elif self.formater_type == "UnknowType": return else: return IMPORTER_TYPES_DCT[self.formater_type](**kwargs) @@ -916,13 +1018,13 @@ IMPORT_STATE = ( ) IMPORT_STATE_DCT = dict(IMPORT_STATE) -ENCODINGS = [(settings.ENCODING, settings.ENCODING), - (settings.ALT_ENCODING, settings.ALT_ENCODING), - ('utf-8', 'utf-8')] +ENCODINGS = [ + (settings.ENCODING, settings.ENCODING), + (settings.ALT_ENCODING, settings.ALT_ENCODING), + ("utf-8", "utf-8"), +] -CSV_SEPS = ((",", ","), - (";", ";"), - ("|", "|")) +CSV_SEPS = ((",", ","), (";", ";"), ("|", "|")) @task() @@ -944,68 +1046,117 @@ def delayed_check(import_pk): class Import(models.Model): - user = models.ForeignKey('IshtarUser', blank=True, null=True, - on_delete=models.SET_NULL) + user = models.ForeignKey( + "IshtarUser", blank=True, null=True, on_delete=models.SET_NULL + ) name = models.CharField(_("Name"), max_length=500, null=True) - importer_type = models.ForeignKey(ImporterType) + importer_type = models.ForeignKey(ImporterType, on_delete=models.CASCADE) imported_file = models.FileField( - _("Imported file"), upload_to="upload/imports/%Y/%m/", max_length=220, - help_text=max_size_help(), blank=True, null=True) + _("Imported file"), + upload_to="upload/imports/%Y/%m/", + max_length=220, + help_text=max_size_help(), + blank=True, + null=True, + ) imported_images = models.FileField( - _("Associated images (zip file)"), upload_to="upload/imports/%Y/%m/", - blank=True, null=True, max_length=220, help_text=max_size_help()) + _("Associated images (zip file)"), + upload_to="upload/imports/%Y/%m/", + blank=True, + null=True, + max_length=220, + help_text=max_size_help(), + ) associated_group = models.ForeignKey( - TargetKeyGroup, blank=True, null=True, + TargetKeyGroup, + blank=True, + null=True, on_delete=models.SET_NULL, - help_text=_("If a group is selected, target key saved in this group " - "will be used.") + help_text=_( + "If a group is selected, target key saved in this group " "will be used." + ), + ) + encoding = models.CharField( + _("Encoding"), choices=ENCODINGS, default="utf-8", max_length=15 ) - encoding = models.CharField(_("Encoding"), choices=ENCODINGS, - default='utf-8', max_length=15) csv_sep = models.CharField( - _("CSV separator"), choices=CSV_SEPS, default=',', max_length=1, - help_text=_("Separator for CSV file. Standard is comma but Microsoft " - "Excel do not follow this standard and use semi-colon.") + _("CSV separator"), + choices=CSV_SEPS, + default=",", + max_length=1, + help_text=_( + "Separator for CSV file. Standard is comma but Microsoft " + "Excel do not follow this standard and use semi-colon." + ), ) skip_lines = models.IntegerField( - _("Skip lines"), default=1, - help_text=_("Number of header lines in your file (can be 0).")) + _("Skip lines"), + default=1, + help_text=_("Number of header lines in your file (can be 0)."), + ) error_file = models.FileField( - _("Error file"), upload_to="upload/imports/%Y/%m/", - blank=True, null=True, max_length=255, help_text=max_size_help()) + _("Error file"), + upload_to="upload/imports/%Y/%m/", + blank=True, + null=True, + max_length=255, + help_text=max_size_help(), + ) result_file = models.FileField( - _("Result file"), upload_to="upload/imports/%Y/%m/", - blank=True, null=True, max_length=255, help_text=max_size_help()) + _("Result file"), + upload_to="upload/imports/%Y/%m/", + blank=True, + null=True, + max_length=255, + help_text=max_size_help(), + ) match_file = models.FileField( - _("Match file"), upload_to="upload/imports/%Y/%m/", blank=True, - null=True, max_length=255, help_text=max_size_help()) + _("Match file"), + upload_to="upload/imports/%Y/%m/", + blank=True, + null=True, + max_length=255, + help_text=max_size_help(), + ) archive_file = models.FileField( - _("Archive file"), upload_to="upload/imports/%Y/%m/", blank=True, - null=True, max_length=255, help_text=max_size_help()) - state = models.CharField(_("State"), max_length=2, choices=IMPORT_STATE, - default='C') + _("Archive file"), + upload_to="upload/imports/%Y/%m/", + blank=True, + null=True, + max_length=255, + help_text=max_size_help(), + ) + state = models.CharField( + _("State"), max_length=2, choices=IMPORT_STATE, default="C" + ) conservative_import = models.BooleanField( - _("Conservative import"), default=False, - help_text=_('If set to true, do not overload existing values.')) + _("Conservative import"), + default=False, + help_text=_("If set to true, do not overload existing values."), + ) creation_date = models.DateTimeField( - _("Creation date"), auto_now_add=True, blank=True, null=True) - end_date = models.DateTimeField(_("End date"), auto_now_add=True, - blank=True, null=True, editable=False) + _("Creation date"), auto_now_add=True, blank=True, null=True + ) + end_date = models.DateTimeField( + _("End date"), auto_now_add=True, blank=True, null=True, editable=False + ) seconds_remaining = models.IntegerField( - _("Remaining seconds"), blank=True, null=True, editable=False) - current_line = models.IntegerField(_("Current line"), blank=True, - null=True) - number_of_line = models.IntegerField(_("Number of line"), blank=True, - null=True) + _("Remaining seconds"), blank=True, null=True, editable=False + ) + current_line = models.IntegerField(_("Current line"), blank=True, null=True) + number_of_line = models.IntegerField(_("Number of line"), blank=True, null=True) imported_line_numbers = models.TextField( - _("Imported line numbers"), blank=True, null=True, - validators=[validate_comma_separated_integer_list] + _("Imported line numbers"), + blank=True, + null=True, + validators=[validate_comma_separated_integer_list], ) - changed_checked = models.BooleanField(_("Changed have been checked"), - default=False) + changed_checked = models.BooleanField(_("Changed have been checked"), default=False) changed_line_numbers = models.TextField( - _("Changed line numbers"), blank=True, null=True, - validators=[validate_comma_separated_integer_list] + _("Changed line numbers"), + blank=True, + null=True, + validators=[validate_comma_separated_integer_list], ) class Meta: @@ -1016,17 +1167,17 @@ class Import(models.Model): return "{} | {}".format(self.name or "-", self.importer_type) def need_matching(self): - return bool(TargetKey.objects.filter(associated_import=self, - is_set=False).count()) + return bool( + TargetKey.objects.filter(associated_import=self, is_set=False).count() + ) @property def errors(self): if not self.error_file: return [] errors = [] - with open(self.error_file.path, 'rt') as csvfile: - reader = csv.DictReader( - csvfile, fieldnames=['line', 'column', 'error']) + with open(self.error_file.path, "rt") as csvfile: + reader = csv.DictReader(csvfile, fieldnames=["line", "column", "error"]) for idx, row in enumerate(reader): if not idx: # pass the header continue @@ -1040,11 +1191,10 @@ class Import(models.Model): return filename = self.imported_file.path encodings = [self.encoding] - encodings += [coding for coding, c in ENCODINGS - if coding != self.encoding] + encodings += [coding for coding, c in ENCODINGS if coding != self.encoding] for encoding in encodings: try: - with open(filename, 'r', encoding=encoding) as f: + with open(filename, "r", encoding=encoding) as f: reader = csv.reader(f, delimiter=self.csv_sep) nb = sum(1 for __ in reader) - self.skip_lines except UnicodeDecodeError: @@ -1059,14 +1209,14 @@ class Import(models.Model): def progress_percent(self): if not self.current_line or not self.number_of_line: return 0 - return int((float(self.current_line) / float(self.number_of_line)) - * 100) + return int((float(self.current_line) / float(self.number_of_line)) * 100) def add_imported_line(self, idx_line): if not self.number_of_line: self.get_number_of_lines() - if self.imported_line_numbers and \ - str(idx_line) in self.imported_line_numbers.split(','): + if self.imported_line_numbers and str( + idx_line + ) in self.imported_line_numbers.split(","): return if self.imported_line_numbers: self.imported_line_numbers += "," @@ -1077,8 +1227,9 @@ class Import(models.Model): self.save() def add_changed_line(self, idx_line): - if self.changed_line_numbers and \ - str(idx_line) in self.changed_line_numbers.split(','): + if self.changed_line_numbers and str( + idx_line + ) in self.changed_line_numbers.split(","): return if self.changed_line_numbers: self.changed_line_numbers += "," @@ -1090,7 +1241,7 @@ class Import(models.Model): def remove_changed_line(self, idx_line): if not self.changed_line_numbers: return - line_numbers = self.changed_line_numbers.split(',') + line_numbers = self.changed_line_numbers.split(",") if str(idx_line) not in line_numbers: return line_numbers.pop(line_numbers.index(str(idx_line))) @@ -1102,12 +1253,13 @@ class Import(models.Model): return True if not self.changed_line_numbers: return - line_numbers = self.changed_line_numbers.split(',') + line_numbers = self.changed_line_numbers.split(",") return str(idx_line) in line_numbers def line_is_imported(self, idx_line): - return self.imported_line_numbers and \ - str(idx_line) in self.imported_line_numbers.split(',') + return self.imported_line_numbers and str( + idx_line + ) in self.imported_line_numbers.split(",") def get_actions(self): """ @@ -1125,24 +1277,24 @@ class Import(models.Model): actions.append(('I', _("Launch import"))) if profile.experimental_feature: if self.changed_checked: - actions.append(('IS', _("Step by step import"))) - actions.append(('CH', _("Re-check for changes"))) + actions.append(("IS", _("Step by step import"))) + actions.append(("CH", _("Re-check for changes"))) else: - actions.append(('CH', _("Check for changes"))) - if self.state in ('F', 'FE'): - actions.append(('A', _("Re-analyse"))) - actions.append(('I', _("Re-import"))) + actions.append(("CH", _("Check for changes"))) + if self.state in ("F", "FE"): + actions.append(("A", _("Re-analyse"))) + actions.append(("I", _("Re-import"))) if profile.experimental_feature: if self.changed_checked: - actions.append(('IS', _("Step by step re-import"))) - actions.append(('CH', _("Re-check for changes"))) + actions.append(("IS", _("Step by step re-import"))) + actions.append(("CH", _("Re-check for changes"))) else: - actions.append(('CH', _("Check for changes"))) - actions.append(('AC', _("Archive"))) - if self.state == 'AC': + actions.append(("CH", _("Check for changes"))) + actions.append(("AC", _("Archive"))) + if self.state == "AC": state = "FE" if self.error_file else "F" actions.append((state, _("Unarchive"))) - actions.append(('D', _("Delete"))) + actions.append(("D", _("Delete"))) return actions @property @@ -1157,8 +1309,10 @@ class Import(models.Model): def get_importer_instance(self): return self.importer_type.get_importer_class(import_instance=self)( - skip_lines=self.skip_lines, import_instance=self, - conservative_import=self.conservative_import) + skip_lines=self.skip_lines, + import_instance=self, + conservative_import=self.conservative_import, + ) @property def data_table(self): @@ -1169,23 +1323,21 @@ class Import(models.Model): filename = None for name in z.namelist(): # get first CSV file found - if name.endswith('.csv'): + if name.endswith(".csv"): filename = name break if not filename: return [] - tmpdir = tempfile.mkdtemp(prefix='tmp-ishtar-') + tmpdir = tempfile.mkdtemp(prefix="tmp-ishtar-") imported_file = z.extract(filename, tmpdir) encodings = [self.encoding] - encodings += [coding for coding, c in ENCODINGS - if coding != self.encoding] + encodings += [coding for coding, c in ENCODINGS if coding != self.encoding] for encoding in encodings: try: with open(imported_file, encoding=encoding) as csv_file: vals = [ - line for line in csv.reader(csv_file, - delimiter=self.csv_sep) + line for line in csv.reader(csv_file, delimiter=self.csv_sep) ] if tmpdir: shutil.rmtree(tmpdir) @@ -1199,19 +1351,20 @@ class Import(models.Model): return [] def initialize(self, user=None, session_key=None): - self.state = 'AP' + self.state = "AP" self.end_date = datetime.datetime.now() self.save() try: self.get_importer_instance().initialize( - self.data_table, user=user, output='db') + self.data_table, user=user, output="db" + ) except ImporterError as e: if session_key: put_session_message(session_key, e.msg, "danger") - self.state = 'C' + self.state = "C" self.save() return - self.state = 'A' + self.state = "A" self.end_date = datetime.datetime.now() self.save() @@ -1220,17 +1373,16 @@ class Import(models.Model): return self.check_modified(session_key=session_key) put_session_message( session_key, - str( - _("Modification check {} added to the queue") - ).format(self.name), - "info") - self.state = 'HQ' + str(_("Modification check {} added to the queue")).format(self.name), + "info", + ) + self.state = "HQ" self.end_date = datetime.datetime.now() self.save() return delayed_check.delay(self.pk) def check_modified(self, session_key=None): - self.state = 'HP' + self.state = "HP" self.end_date = datetime.datetime.now() self.changed_line_numbers = "" self.changed_checked = False @@ -1239,9 +1391,7 @@ class Import(models.Model): for idx in range(self.skip_lines, self.get_number_of_lines() + 1): try: imprt, data = self.importation( - simulate=True, - line_to_process=idx, - return_importer_and_data=True + simulate=True, line_to_process=idx, return_importer_and_data=True ) except IOError as e: # error is identified as a change @@ -1268,7 +1418,7 @@ class Import(models.Model): break current_value = getattr(obj, k) updated_value = updated_values[k] - if hasattr(current_value, 'all'): + if hasattr(current_value, "all"): current_value = list(current_value.all()) changed = False for v in updated_value: @@ -1292,85 +1442,95 @@ class Import(models.Model): put_session_message( session_key, str(_("Import {} added to the queue")).format(self.name), - "info") - self.state = 'IQ' + "info", + ) + self.state = "IQ" self.end_date = datetime.datetime.now() self.save() return delayed_import.delay(self.pk) - def importation(self, session_key=None, line_to_process=None, - simulate=False, return_importer_and_data=False, - request=None): - self.state = 'IP' + def importation( + self, + session_key=None, + line_to_process=None, + simulate=False, + return_importer_and_data=False, + request=None, + ): + self.state = "IP" self.end_date = datetime.datetime.now() if not line_to_process: # full import - self.imported_line_numbers = '' + self.imported_line_numbers = "" self.current_line = 0 self.save() importer = self.get_importer_instance() try: data = importer.importation( - self.data_table, user=self.user, - line_to_process=line_to_process, simulate=simulate) + self.data_table, + user=self.user, + line_to_process=line_to_process, + simulate=simulate, + ) except IOError: error_message = str(_("Error on imported file: {}")).format( - self.imported_file) + self.imported_file + ) importer.errors = [error_message] if session_key: put_session_message(session_key, error_message, "warning") - ids = get_session_var(session_key, 'current_import_id') + ids = get_session_var(session_key, "current_import_id") if not ids: ids = [] ids.append(self.pk) - put_session_var(session_key, 'current_import_id', ids) + put_session_var(session_key, "current_import_id", ids) if line_to_process: - self.state = 'PI' + self.state = "PI" else: - self.state = 'FE' + self.state = "FE" self.save() if not return_importer_and_data: return return importer, None # result file filename = slugify(self.importer_type.name) - now = datetime.datetime.now().isoformat('-').replace(':', '') + now = datetime.datetime.now().isoformat("-").replace(":", "") result_file = filename + "_result_%s.csv" % now self.result_file.save( - result_file, ContentFile(importer.get_csv_result().encode('utf-8'))) + result_file, ContentFile(importer.get_csv_result().encode("utf-8")) + ) if importer.errors: if line_to_process: - self.state = 'PI' + self.state = "PI" else: - self.state = 'FE' + self.state = "FE" error_file = filename + "_errors_%s.csv" % now self.error_file.save( - error_file, - ContentFile(importer.get_csv_errors().encode('utf-8')) + error_file, ContentFile(importer.get_csv_errors().encode("utf-8")) ) - msg = str(_("Import {} finished with errors")).format( - self.name) + msg = str(_("Import {} finished with errors")).format(self.name) msg_cls = "warning" else: if line_to_process: - self.state = 'PI' + self.state = "PI" else: - self.state = 'F' + self.state = "F" self.error_file = None - msg = str(_("Import {} finished with no errors")).format( - self.name) + msg = str(_("Import {} finished with no errors")).format(self.name) msg_cls = "primary" if session_key and request: put_session_message(session_key, msg, msg_cls) - ids = request.session['current_import_id'] \ - if 'current_import_id' in request.session else [] + ids = ( + request.session["current_import_id"] + if "current_import_id" in request.session + else [] + ) ids.append(self.pk) - put_session_var(session_key, 'current_import_id', ids) + put_session_var(session_key, "current_import_id", ids) if importer.match_table: match_file = filename + "_match_%s.csv" % now self.match_file.save( - match_file, - ContentFile(importer.get_csv_matches().encode('utf-8')) + match_file, ContentFile(importer.get_csv_matches().encode("utf-8")) ) self.end_date = datetime.datetime.now() self.save() @@ -1382,7 +1542,7 @@ class Import(models.Model): return with tempfile.TemporaryDirectory() as tmp_dir_name: # extract the current archive - current_zip = zipfile.ZipFile(self.archive_file.path, 'r') + current_zip = zipfile.ZipFile(self.archive_file.path, "r") name_list = current_zip.namelist() if "content.json" not in name_list: return @@ -1402,19 +1562,19 @@ class Import(models.Model): with open(full_filename, "rb") as raw_file: getattr(self, attr).save( "upload/imports/{}/{:02d}/{}".format( - today.year, today.month, filename), - File(raw_file) + today.year, today.month, filename + ), + File(raw_file), ) os.remove(self.archive_file.path) - setattr(self, 'archive_file', None) + setattr(self, "archive_file", None) self.state = "FE" if self.error_file else "F" self.save() return True def _archive(self): - file_attr = ["imported_file", "error_file", "result_file", - "match_file"] + file_attr = ["imported_file", "error_file", "result_file", "match_file"] files = [ (k, getattr(self, k).path, getattr(self, k).name.split(os.sep)[-1]) for k in file_attr @@ -1438,11 +1598,15 @@ class Import(models.Model): current_zip.write(content_name, arcname="content.json") today = datetime.date.today() - with open(archive_name, "rb", ) as raw_file: + with open( + archive_name, + "rb", + ) as raw_file: self.archive_file.save( "upload/imports/{}/{:02d}/{}".format( - today.year, today.month, base_name), - File(raw_file) + today.year, today.month, base_name + ), + File(raw_file), ) IshtarSiteProfile = apps.get_model("ishtar_common", "IshtarSiteProfile") profile = IshtarSiteProfile.get_current_profile() @@ -1460,7 +1624,7 @@ class Import(models.Model): self._archive_pending = False def archive(self): - self.state = 'AC' + self.state = "AC" self.end_date = datetime.datetime.now() self._archive() @@ -1473,20 +1637,22 @@ class Import(models.Model): imported = [] for related, zorg in get_all_related_m2m_objects_with_model(self): accessor = related.get_accessor_name() - imported += [(accessor, obj) - for obj in getattr(self, accessor).all()] + imported += [(accessor, obj) for obj in getattr(self, accessor).all()] return imported def save(self, *args, **kwargs): super(Import, self).save(*args, **kwargs) - if self.state == "AC" and not getattr( - self, "_archive_pending", False) and not self.archive_file: + if ( + self.state == "AC" + and not getattr(self, "_archive_pending", False) + and not self.archive_file + ): self._archive() def pre_delete_import(sender, **kwargs): # deleted imported items when an import is delete - instance = kwargs.get('instance') + instance = kwargs.get("instance") if not instance: return to_delete = [] |