summaryrefslogtreecommitdiff
path: root/ishtar_common/models_imports.py
diff options
context:
space:
mode:
authorÉtienne Loks <etienne.loks@iggdrasil.net>2021-09-01 17:58:56 +0200
committerÉtienne Loks <etienne.loks@iggdrasil.net>2022-12-12 12:20:58 +0100
commiteec69099f0c6dfdcef5acc15d57dadc7cd04813b (patch)
treeaa16f6f20916eecc7a99d9eed661d8bd1ddf9522 /ishtar_common/models_imports.py
parent12ee58b40c9ac73ee42ab08a18246399485adb9a (diff)
downloadIshtar-eec69099f0c6dfdcef5acc15d57dadc7cd04813b.tar.bz2
Ishtar-eec69099f0c6dfdcef5acc15d57dadc7cd04813b.zip
Migration to Django 2.2 - missing on_delete - django.urls import reverse
Diffstat (limited to 'ishtar_common/models_imports.py')
-rw-r--r--ishtar_common/models_imports.py1024
1 files changed, 595 insertions, 429 deletions
diff --git a/ishtar_common/models_imports.py b/ishtar_common/models_imports.py
index 3779b656e..54409e2c9 100644
--- a/ishtar_common/models_imports.py
+++ b/ishtar_common/models_imports.py
@@ -48,13 +48,32 @@ except (AssertionError, ImportError):
UnoCalc = None
from ishtar_common.model_managers import SlugModelManager
-from ishtar_common.utils import create_slug, \
- get_all_related_m2m_objects_with_model, put_session_message, \
- put_session_var, get_session_var, num2col, max_size_help, import_class
-from ishtar_common.data_importer import Importer, ImportFormater, \
- IntegerFormater, FloatFormater, UnicodeFormater, DateFormater, \
- TypeFormater, YearFormater, StrToBoolean, FileFormater, InseeFormater, \
- ImporterError, UpperCaseFormater, LowerCaseFormater
+from ishtar_common.utils import (
+ create_slug,
+ get_all_related_m2m_objects_with_model,
+ put_session_message,
+ put_session_var,
+ get_session_var,
+ num2col,
+ max_size_help,
+ import_class,
+)
+from ishtar_common.data_importer import (
+ Importer,
+ ImportFormater,
+ IntegerFormater,
+ FloatFormater,
+ UnicodeFormater,
+ DateFormater,
+ TypeFormater,
+ YearFormater,
+ StrToBoolean,
+ FileFormater,
+ InseeFormater,
+ ImporterError,
+ UpperCaseFormater,
+ LowerCaseFormater,
+)
from ishtar_common.utils import task
@@ -86,13 +105,13 @@ class ImporterModel(models.Model):
class Meta:
verbose_name = _("Model")
verbose_name_plural = _("Models")
- ordering = ('name',)
+ ordering = ("name",)
def __str__(self):
return self.name
def natural_key(self):
- return (self.klass, )
+ return (self.klass,)
class ImporterTypeManager(models.Manager):
@@ -104,23 +123,32 @@ class ImporterType(models.Model):
"""
Description of a table to be mapped with ishtar database
"""
+
name = models.CharField(_("Name"), max_length=200)
slug = models.SlugField(_("Slug"), unique=True, max_length=100)
- description = models.CharField(_("Description"), blank=True, null=True,
- max_length=500)
- users = models.ManyToManyField('IshtarUser', verbose_name=_("Users"),
- blank=True)
+ description = models.CharField(
+ _("Description"), blank=True, null=True, max_length=500
+ )
+ users = models.ManyToManyField("IshtarUser", verbose_name=_("Users"), blank=True)
associated_models = models.ForeignKey(
- ImporterModel, verbose_name=_("Associated model"),
+ ImporterModel,
+ verbose_name=_("Associated model"),
on_delete=models.SET_NULL,
- related_name='importer_type_associated', blank=True, null=True)
+ related_name="importer_type_associated",
+ blank=True,
+ null=True,
+ )
created_models = models.ManyToManyField(
- ImporterModel, verbose_name=_("Models that can accept new items"),
- blank=True, help_text=_("Leave blank for no restrictions"),
- related_name='importer_type_created')
+ ImporterModel,
+ verbose_name=_("Models that can accept new items"),
+ blank=True,
+ help_text=_("Leave blank for no restrictions"),
+ related_name="importer_type_created",
+ )
is_template = models.BooleanField(_("Can be exported"), default=False)
- unicity_keys = models.CharField(_("Unicity keys (separator \";\")"),
- blank=True, null=True, max_length=500)
+ unicity_keys = models.CharField(
+ _('Unicity keys (separator ";")'), blank=True, null=True, max_length=500
+ )
available = models.BooleanField(_("Available"), default=True)
objects = ImporterTypeManager()
SERIALIZATION_EXCLUDE = ["users"]
@@ -128,10 +156,10 @@ class ImporterType(models.Model):
class Meta:
verbose_name = _("Importer - Type")
verbose_name_plural = _("Importer - Types")
- ordering = ('name',)
+ ordering = ("name",)
def natural_key(self):
- return (self.slug, )
+ return (self.slug,)
def __str__(self):
return self.name
@@ -150,7 +178,7 @@ class ImporterType(models.Model):
return
col_number = 1 # user number so we start with 1
lst_col_number = 0
- for column in self.columns.order_by('col_number').all():
+ for column in self.columns.order_by("col_number").all():
while column.col_number > col_number:
col_number += 1
# header
@@ -171,7 +199,7 @@ class ImporterType(models.Model):
if not ft:
continue
# first we only manage TypeFormater
- if ft.formater_type != 'TypeFormater':
+ if ft.formater_type != "TypeFormater":
continue
if not ft.options: # bad configuration
continue
@@ -181,11 +209,18 @@ class ImporterType(models.Model):
lst = []
for typ in model.get_types(instances=True):
lst.append(str(typ))
- end_row = uno.create_list(lst_sheet, lst_col_number, 0,
- str(model._meta.verbose_name), lst)
+ end_row = uno.create_list(
+ lst_sheet, lst_col_number, 0, str(model._meta.verbose_name), lst
+ )
uno.set_cell_validation_list(
- main_sheet, col_number, 2, ROW_NUMBER + 2,
- lst_sheet, lst_col_number, [1, end_row])
+ main_sheet,
+ col_number,
+ 2,
+ ROW_NUMBER + 2,
+ lst_sheet,
+ lst_col_number,
+ [1, end_row],
+ )
lst_col_number += 1
tmpdir = tempfile.mkdtemp(prefix="ishtar-templates-")
dest_filename = "{}{}{}.ods".format(tmpdir, os.sep, self.name)
@@ -194,12 +229,13 @@ class ImporterType(models.Model):
def get_importer_class(self, import_instance=None):
OBJECT_CLS = import_class(self.associated_models.klass)
- DEFAULTS = dict([(default.keys, default.values)
- for default in self.defaults.all()])
+ DEFAULTS = dict(
+ [(default.keys, default.values) for default in self.defaults.all()]
+ )
LINE_FORMAT = []
LINE_EXPORT_FORMAT = []
idx = 0
- for column in self.columns.order_by('col_number').all():
+ for column in self.columns.order_by("col_number").all():
idx += 1
while column.col_number > idx:
LINE_FORMAT.append(None)
@@ -212,8 +248,7 @@ class ImporterType(models.Model):
LINE_FORMAT.append(None)
if column.export_field_name:
LINE_EXPORT_FORMAT.append(
- ImportFormater(column.export_field_name,
- label=column.label)
+ ImportFormater(column.export_field_name, label=column.label)
)
continue
force_news = []
@@ -221,7 +256,8 @@ class ImporterType(models.Model):
concat = []
for target in column.targets.order_by("pk").all():
ft = target.formater_type.get_formater_type(
- target, import_instance=import_instance)
+ target, import_instance=import_instance
+ )
if not ft:
continue
formater_types.append(ft)
@@ -231,42 +267,46 @@ class ImporterType(models.Model):
concat.append(target.concat)
formater_kwargs = {}
if column.regexp_pre_filter:
- formater_kwargs['regexp'] = re.compile(
- column.regexp_pre_filter.regexp)
+ formater_kwargs["regexp"] = re.compile(column.regexp_pre_filter.regexp)
if column.value_format:
- formater_kwargs['value_format'] = \
- column.value_format.format_string
- formater_kwargs['concat'] = concat
- formater_kwargs['concat_str'] = concat_str
- formater_kwargs['duplicate_fields'] = [
- (field.field_name, field.force_new, field.concat,
- field.concat_str)
- for field in column.duplicate_fields.all()]
- formater_kwargs['label'] = column.label
- formater_kwargs['required'] = column.required
- formater_kwargs['force_new'] = force_news
- formater_kwargs['comment'] = column.description
+ formater_kwargs["value_format"] = column.value_format.format_string
+ formater_kwargs["concat"] = concat
+ formater_kwargs["concat_str"] = concat_str
+ formater_kwargs["duplicate_fields"] = [
+ (field.field_name, field.force_new, field.concat, field.concat_str)
+ for field in column.duplicate_fields.all()
+ ]
+ formater_kwargs["label"] = column.label
+ formater_kwargs["required"] = column.required
+ formater_kwargs["force_new"] = force_news
+ formater_kwargs["comment"] = column.description
if column.export_field_name:
- formater_kwargs['export_field_name'] = [
- column.export_field_name]
- formater = ImportFormater(targets, formater_types,
- **formater_kwargs)
+ formater_kwargs["export_field_name"] = [column.export_field_name]
+ formater = ImportFormater(targets, formater_types, **formater_kwargs)
LINE_FORMAT.append(formater)
LINE_EXPORT_FORMAT.append(formater)
UNICITY_KEYS = []
if self.unicity_keys:
- UNICITY_KEYS = [un.strip() for un in self.unicity_keys.split(';')]
+ UNICITY_KEYS = [un.strip() for un in self.unicity_keys.split(";")]
MODEL_CREATION_LIMIT = []
for modls in self.created_models.all():
MODEL_CREATION_LIMIT.append(import_class(modls.klass))
- args = {'OBJECT_CLS': OBJECT_CLS, 'DESC': self.description,
- 'DEFAULTS': DEFAULTS, 'LINE_FORMAT': LINE_FORMAT,
- 'UNICITY_KEYS': UNICITY_KEYS,
- 'LINE_EXPORT_FORMAT': LINE_EXPORT_FORMAT,
- 'MODEL_CREATION_LIMIT': MODEL_CREATION_LIMIT}
- name = str(''.join(
- x for x in slugify(self.name).replace('-', ' ').title()
- if not x.isspace()))
+ args = {
+ "OBJECT_CLS": OBJECT_CLS,
+ "DESC": self.description,
+ "DEFAULTS": DEFAULTS,
+ "LINE_FORMAT": LINE_FORMAT,
+ "UNICITY_KEYS": UNICITY_KEYS,
+ "LINE_EXPORT_FORMAT": LINE_EXPORT_FORMAT,
+ "MODEL_CREATION_LIMIT": MODEL_CREATION_LIMIT,
+ }
+ name = str(
+ "".join(
+ x
+ for x in slugify(self.name).replace("-", " ").title()
+ if not x.isspace()
+ )
+ )
newclass = type(name, (Importer,), args)
return newclass
@@ -289,25 +329,32 @@ def get_associated_model(parent_model, keys):
elif not idx:
if item not in fields:
raise ImporterError(
- str(_("Importer configuration error: "
- "\"{}\" is not available for \"{}\"."
- " Check your default and column "
- "configuration")).format(
- item, OBJECT_CLS.__name__))
+ str(
+ _(
+ "Importer configuration error: "
+ '"{}" is not available for "{}".'
+ " Check your default and column "
+ "configuration"
+ )
+ ).format(item, OBJECT_CLS.__name__)
+ )
field = fields[item]
- if hasattr(field, 'rel') and hasattr(field.rel, 'to'):
+ if hasattr(field, "rel") and hasattr(field.rel, "to"):
model = field.rel.to
if type(field) == ModelBase:
model = field
else:
if not model:
raise ImporterError(
- str(_("Importer configuration error: "
- "\"{}\" is not available for \"{}\"."
- " Check your default and column "
- "configuration")).format(
- "__".join(keys[1:]),
- OBJECT_CLS.__name__))
+ str(
+ _(
+ "Importer configuration error: "
+ '"{}" is not available for "{}".'
+ " Check your default and column "
+ "configuration"
+ )
+ ).format("__".join(keys[1:]), OBJECT_CLS.__name__)
+ )
return get_associated_model(model, keys[1:])
return model
@@ -321,13 +368,17 @@ class ImporterDefault(models.Model):
"""
Targets of default values in an import
"""
- importer_type = models.ForeignKey(ImporterType, related_name='defaults')
+
+ importer_type = models.ForeignKey(
+ ImporterType, related_name="defaults", on_delete=models.CASCADE
+ )
target = models.CharField("Target", max_length=500)
class Meta:
verbose_name = _("Importer - Default")
verbose_name_plural = _("Importer - Defaults")
- unique_together = ('importer_type', 'target')
+ unique_together = ("importer_type", "target")
+
objects = ImporterDefaultManager()
def __str__(self):
@@ -338,14 +389,15 @@ class ImporterDefault(models.Model):
@property
def keys(self):
- return tuple(t for t in self.target.split('__') if t not in ("-", ""))
+ return tuple(t for t in self.target.split("__") if t not in ("-", ""))
@property
def associated_model(self):
if not self.keys:
return import_class(self.importer_type.associated_models.klass)
- return get_associated_model(self.importer_type.associated_models.klass,
- self.keys)
+ return get_associated_model(
+ self.importer_type.associated_models.klass, self.keys
+ )
@property
def values(self):
@@ -360,17 +412,21 @@ class ImporterDefault(models.Model):
class ImporterDefaultValuesManager(models.Manager):
def get_by_natural_key(self, def_target_type, def_target, target):
- return self.get(default_target__importer_type__slug=def_target_type,
- default_target__target=def_target,
- target=target)
+ return self.get(
+ default_target__importer_type__slug=def_target_type,
+ default_target__target=def_target,
+ target=target,
+ )
class ImporterDefaultValues(models.Model):
"""
Default values in an import
"""
- default_target = models.ForeignKey(ImporterDefault,
- related_name='default_values')
+
+ default_target = models.ForeignKey(
+ ImporterDefault, related_name="default_values", on_delete=models.CASCADE
+ )
target = models.CharField("Target", max_length=500)
value = models.CharField("Value", max_length=500)
objects = ImporterDefaultValuesManager()
@@ -378,12 +434,14 @@ class ImporterDefaultValues(models.Model):
class Meta:
verbose_name = _("Importer - Default value")
verbose_name_plural = _("Importer - Default values")
- unique_together = ('default_target', 'target')
+ unique_together = ("default_target", "target")
def natural_key(self):
- return (self.default_target.importer_type.slug,
- self.default_target.target,
- self.target)
+ return (
+ self.default_target.importer_type.slug,
+ self.default_target.target,
+ self.target,
+ )
def __str__(self):
return "{} - {}".format(self.default_target, self.target, self.value)
@@ -397,7 +455,7 @@ class ImporterDefaultValues(models.Model):
if target not in fields:
return
field = fields[target]
- if not hasattr(field, 'rel') or not hasattr(field.rel, 'to'):
+ if not hasattr(field, "rel") or not hasattr(field.rel, "to"):
return self.value
model = field.rel.to
# if value is an id
@@ -415,40 +473,52 @@ class ImporterDefaultValues(models.Model):
class ImporterColumnManager(models.Manager):
def get_by_natural_key(self, importer_type, col_number):
- return self.get(importer_type__slug=importer_type,
- col_number=col_number)
+ return self.get(importer_type__slug=importer_type, col_number=col_number)
class ImporterColumn(models.Model):
"""
Import file column description
"""
- label = models.CharField(_("Label"), blank=True, null=True,
- max_length=200)
- importer_type = models.ForeignKey(ImporterType, related_name='columns')
+
+ label = models.CharField(_("Label"), blank=True, null=True, max_length=200)
+ importer_type = models.ForeignKey(
+ ImporterType, related_name="columns", on_delete=models.CASCADE
+ )
col_number = models.IntegerField(_("Column number"), default=1)
description = models.TextField(_("Description"), blank=True, null=True)
regexp_pre_filter = models.ForeignKey(
- "Regexp", blank=True, null=True, on_delete=models.SET_NULL,
+ "Regexp",
+ blank=True,
+ null=True,
+ on_delete=models.SET_NULL,
related_name="columns",
)
value_format = models.ForeignKey(
- "ValueFormater", blank=True, null=True, on_delete=models.SET_NULL,
- related_name="columns"
+ "ValueFormater",
+ blank=True,
+ null=True,
+ on_delete=models.SET_NULL,
+ related_name="columns",
)
required = models.BooleanField(_("Required"), default=False)
export_field_name = models.CharField(
- _("Export field name"), blank=True, null=True, max_length=200,
- help_text=_("Fill this field if the field name is ambiguous for "
- "export. For instance: concatenated fields.")
+ _("Export field name"),
+ blank=True,
+ null=True,
+ max_length=200,
+ help_text=_(
+ "Fill this field if the field name is ambiguous for "
+ "export. For instance: concatenated fields."
+ ),
)
objects = ImporterColumnManager()
class Meta:
verbose_name = _("Importer - Column")
verbose_name_plural = _("Importer - Columns")
- ordering = ('importer_type', 'col_number')
- unique_together = ('importer_type', 'col_number')
+ ordering = ("importer_type", "col_number")
+ unique_together = ("importer_type", "col_number")
def __str__(self):
return "{} - {}".format(self.importer_type, self.col_number)
@@ -461,11 +531,10 @@ class ImporterColumn(models.Model):
return self.importer_type.slug, self.col_number
def targets_lbl(self):
- return ', '.join([target.target for target in self.targets.all()])
+ return ", ".join([target.target for target in self.targets.all()])
def duplicate_fields_lbl(self):
- return ', '.join([dp.field_name or ""
- for dp in self.duplicate_fields.all()])
+ return ", ".join([dp.field_name or "" for dp in self.duplicate_fields.all()])
def formater_type_lbl(self):
return ', '.join([str(target.formater_type) for target in self.targets.all()])
@@ -473,35 +542,39 @@ class ImporterColumn(models.Model):
class ImporterDuplicateFieldManager(models.Manager):
def get_by_natural_key(self, importer_type, col_number, field_name):
- return self.get(column__importer_type__slug=importer_type,
- column__col_number=col_number,
- field_name=field_name)
+ return self.get(
+ column__importer_type__slug=importer_type,
+ column__col_number=col_number,
+ field_name=field_name,
+ )
class ImporterDuplicateField(models.Model):
"""
Direct copy of result in other fields
"""
- column = models.ForeignKey(ImporterColumn, related_name='duplicate_fields')
- field_name = models.CharField(_("Field name"), blank=True, null=True,
- max_length=200)
- force_new = models.BooleanField(_("Force creation of new items"),
- default=False)
- concat = models.BooleanField(_("Concatenate with existing"),
- default=False)
- concat_str = models.CharField(_("Concatenate character"), max_length=5,
- blank=True, null=True)
+
+ column = models.ForeignKey(
+ ImporterColumn, related_name="duplicate_fields", on_delete=models.CASCADE
+ )
+ field_name = models.CharField(
+ _("Field name"), blank=True, null=True, max_length=200
+ )
+ force_new = models.BooleanField(_("Force creation of new items"), default=False)
+ concat = models.BooleanField(_("Concatenate with existing"), default=False)
+ concat_str = models.CharField(
+ _("Concatenate character"), max_length=5, blank=True, null=True
+ )
objects = ImporterDuplicateFieldManager()
class Meta:
verbose_name = _("Importer - Duplicate field")
verbose_name_plural = _("Importer - Duplicate fields")
- ordering = ('column', 'field_name')
- unique_together = ('column', 'field_name')
+ ordering = ("column", "field_name")
+ unique_together = ("column", "field_name")
def natural_key(self):
- return self.column.importer_type.slug, self.column.col_number, \
- self.field_name
+ return self.column.importer_type.slug, self.column.col_number, self.field_name
class NamedManager(models.Manager):
@@ -523,7 +596,7 @@ class Regexp(models.Model):
return self.name
def natural_key(self):
- return (self.name, )
+ return (self.name,)
class ValueFormater(models.Model):
@@ -531,11 +604,14 @@ class ValueFormater(models.Model):
slug = models.SlugField(_("Slug"), unique=True, max_length=100)
description = models.TextField(_("Description"), blank=True, null=True)
format_string = models.CharField(
- _("Format string"), max_length=100,
- help_text=_("A string used to format a value using the Python "
- "\"format()\" method. The site https://pyformat.info/ "
- "provide good examples of usage. Only one \"{}\" entry "
- "is managed. The input is assumed to be a string.")
+ _("Format string"),
+ max_length=100,
+ help_text=_(
+ "A string used to format a value using the Python "
+ '"format()" method. The site https://pyformat.info/ '
+ 'provide good examples of usage. Only one "{}" entry '
+ "is managed. The input is assumed to be a string."
+ ),
)
objects = SlugModelManager()
@@ -551,41 +627,50 @@ class ValueFormater(models.Model):
self.format_string.format("sample value")
except ValueError:
raise ValidationError(
- {'format_string': _("The string provided generate an error. "
- "Fix it.")}
+ {
+ "format_string": _(
+ "The string provided generate an error. " "Fix it."
+ )
+ }
)
def natural_key(self):
- return (self.slug, )
+ return (self.slug,)
class ImportTargetManager(models.Manager):
def get_by_natural_key(self, importer_type, col_number, target):
- return self.get(column__importer_type__slug=importer_type,
- column__col_number=col_number,
- target=target)
+ return self.get(
+ column__importer_type__slug=importer_type,
+ column__col_number=col_number,
+ target=target,
+ )
class ImportTarget(models.Model):
"""
Ishtar database target for a column
"""
- column = models.ForeignKey(ImporterColumn, related_name='targets')
+
+ column = models.ForeignKey(
+ ImporterColumn, related_name="targets", on_delete=models.CASCADE
+ )
target = models.CharField("Target", max_length=500)
- formater_type = models.ForeignKey("FormaterType", related_name='targets')
- force_new = models.BooleanField(_("Force creation of new items"),
- default=False)
- concat = models.BooleanField(_("Concatenate with existing"),
- default=False)
- concat_str = models.CharField(_("Concatenate character"), max_length=5,
- blank=True, null=True)
+ formater_type = models.ForeignKey(
+ "FormaterType", related_name="targets", on_delete=models.CASCADE
+ )
+ force_new = models.BooleanField(_("Force creation of new items"), default=False)
+ concat = models.BooleanField(_("Concatenate with existing"), default=False)
+ concat_str = models.CharField(
+ _("Concatenate character"), max_length=5, blank=True, null=True
+ )
comment = models.TextField(_("Comment"), blank=True, null=True)
objects = ImportTargetManager()
class Meta:
verbose_name = _("Importer - Target")
verbose_name_plural = _("Importer - Targets")
- unique_together = ('column', 'target')
+ unique_together = ("column", "target")
def __str__(self):
return self.target[:50] if self.target else self.comment
@@ -599,8 +684,7 @@ class ImportTarget(models.Model):
return "{} - {}".format(self.target[:50], desc)
def natural_key(self):
- return self.column.importer_type.slug, self.column.col_number, \
- self.target
+ return self.column.importer_type.slug, self.column.col_number, self.target
@property
def associated_model(self):
@@ -609,24 +693,24 @@ class ImportTarget(models.Model):
try:
return get_associated_model(
self.column.importer_type.associated_models.klass,
- self.target.split('__'))
+ self.target.split("__"),
+ )
except KeyError:
return
def get_choices(self):
- if self.formater_type.formater_type == 'UnknowType' \
- and self.column.importer_type.slug:
+ if (
+ self.formater_type.formater_type == "UnknowType"
+ and self.column.importer_type.slug
+ ):
cls = self.column.importer_type.get_importer_class()
formt = cls().line_format[self.column.col_number - 1]
- if hasattr(formt.formater, 'choices'):
- return [('', '--' * 8)] + list(formt.formater.choices)
- return [('', '--' * 8)]
- if self.formater_type.formater_type == 'StrToBoolean':
- return [('', '--' * 8),
- ('True', _("True")),
- ('False', _("False"))]
- if not self.associated_model or not hasattr(self.associated_model,
- 'get_types'):
+ if hasattr(formt.formater, "choices"):
+ return [("", "--" * 8)] + list(formt.formater.choices)
+ return [("", "--" * 8)]
+ if self.formater_type.formater_type == "StrToBoolean":
+ return [("", "--" * 8), ("True", _("True")), ("False", _("False"))]
+ if not self.associated_model or not hasattr(self.associated_model, "get_types"):
return []
return self.associated_model.get_types()
@@ -635,11 +719,12 @@ class TargetKeyGroup(models.Model):
"""
Group of target keys for imports.
"""
+
name = models.TextField(_("Name"), unique=True)
- all_user_can_use = models.BooleanField(_("All users can use it"),
- default=False)
- all_user_can_modify = models.BooleanField(_("All users can modify it"),
- default=False)
+ all_user_can_use = models.BooleanField(_("All users can use it"), default=False)
+ all_user_can_modify = models.BooleanField(
+ _("All users can modify it"), default=False
+ )
available = models.BooleanField(_("Available"), default=True)
class Meta:
@@ -660,20 +745,33 @@ class TargetKey(models.Model):
one particular group (associated_group) or to all imports
(associated_import, associated_user and associated_group are empty).
"""
- target = models.ForeignKey(ImportTarget, related_name='keys')
+
+ target = models.ForeignKey(
+ ImportTarget, related_name="keys", on_delete=models.CASCADE
+ )
key = models.TextField(_("Key"))
value = models.TextField(_("Value"), blank=True, null=True)
is_set = models.BooleanField(_("Is set"), default=False)
- associated_import = models.ForeignKey('Import', blank=True, null=True)
- associated_user = models.ForeignKey('IshtarUser', blank=True, null=True)
- associated_group = models.ForeignKey(TargetKeyGroup, blank=True, null=True)
+ associated_import = models.ForeignKey(
+ "Import", blank=True, null=True, on_delete=models.SET_NULL
+ )
+ associated_user = models.ForeignKey(
+ "IshtarUser", blank=True, null=True, on_delete=models.SET_NULL
+ )
+ associated_group = models.ForeignKey(
+ TargetKeyGroup, blank=True, null=True, on_delete=models.SET_NULL
+ )
class Meta:
- unique_together = ('target', 'key', 'associated_user',
- 'associated_import',)
+ unique_together = (
+ "target",
+ "key",
+ "associated_user",
+ "associated_import",
+ )
verbose_name = _("Importer - Target key")
verbose_name_plural = _("Importer - Targets keys")
- ordering = ('target', 'key')
+ ordering = ("target", "key")
def __str__(self):
return " - ".join([str(self.target), self.key[:50]])
@@ -689,8 +787,8 @@ class TargetKey(models.Model):
def format(self):
if not self.is_set:
return None
- if self.target.formater_type.formater_type == 'StrToBoolean':
- if self.value in ('False', '0'):
+ if self.target.formater_type.formater_type == "StrToBoolean":
+ if self.value in ("False", "0"):
return False
elif self.value:
return True
@@ -703,162 +801,165 @@ class TargetKey(models.Model):
return obj
v = None
associated_model = self.target.associated_model
- if associated_model and hasattr(self.target.associated_model,
- "add_key"):
+ if associated_model and hasattr(self.target.associated_model, "add_key"):
# pk is given
try:
- v = self.target.associated_model.objects.get(
- pk=str(int(self.value)))
+ v = self.target.associated_model.objects.get(pk=str(int(self.value)))
except (ValueError, self.target.associated_model.DoesNotExist):
# try with txt_idx
try:
v = self.target.associated_model.objects.get(
- txt_idx=str(self.value))
+ txt_idx=str(self.value)
+ )
except self.target.associated_model.DoesNotExist:
pass
if v:
keys = {}
if self.associated_group:
- keys['group'] = self.associated_group
+ keys["group"] = self.associated_group
if self.associated_user:
- keys['user'] = self.associated_user
+ keys["user"] = self.associated_user
else:
- keys['importer'] = self.associated_import
+ keys["importer"] = self.associated_import
v.add_key(self.key, **keys)
return obj
TARGET_MODELS = [
- ('OrganizationType', _("Organization type")),
- ('ishtar_common.models.OrganizationType', _("Organization type")),
- ('ishtar_common.models.PersonType', _("Person type")),
- ('TitleType', _("Title")),
- ('SourceType', _("Source type")),
- ('ishtar_common.models.SourceType', _("Source type")),
- ('AuthorType', _("Author type")),
- ('Format', _("Format")),
- ('ishtar_common.models.Format', _("Format")),
- ('ishtar_common.models.LicenseType', _("License type")),
- ('ishtar_common.models.DocumentTag', _("Document tag")),
- ('ishtar_common.models.Language', _("Language")),
- ('ishtar_common.models.SupportType', _("Support type")),
- ('archaeological_operations.models.OperationType', _("Operation type")),
- ('archaeological_operations.models.Period', _("Period")),
- ('archaeological_operations.models.ReportState', _("Report state")),
- ('archaeological_operations.models.RemainType', _("Remain type")),
- ('archaeological_operations.models.RelationType',
- _("Operation relation type")),
+ ("OrganizationType", _("Organization type")),
+ ("ishtar_common.models.OrganizationType", _("Organization type")),
+ ("ishtar_common.models.PersonType", _("Person type")),
+ ("TitleType", _("Title")),
+ ("SourceType", _("Source type")),
+ ("ishtar_common.models.SourceType", _("Source type")),
+ ("AuthorType", _("Author type")),
+ ("Format", _("Format")),
+ ("ishtar_common.models.Format", _("Format")),
+ ("ishtar_common.models.LicenseType", _("License type")),
+ ("ishtar_common.models.DocumentTag", _("Document tag")),
+ ("ishtar_common.models.Language", _("Language")),
+ ("ishtar_common.models.SupportType", _("Support type")),
+ ("archaeological_operations.models.OperationType", _("Operation type")),
+ ("archaeological_operations.models.Period", _("Period")),
+ ("archaeological_operations.models.ReportState", _("Report state")),
+ ("archaeological_operations.models.RemainType", _("Remain type")),
+ ("archaeological_operations.models.RelationType", _("Operation relation type")),
("archaeological_operations.models.ActType", _("Act type")),
- ('archaeological_context_records.models.Unit', _("Unit")),
- ('archaeological_context_records.models.ActivityType',
- _("Activity type")),
- ('archaeological_context_records.models.DocumentationType',
- _("Documentation type")),
- ("archaeological_context_records.models.DatingQuality",
- _("Dating quality")),
- ('archaeological_finds.models.MaterialType', _("Material")),
- ('archaeological_finds.models.ConservatoryState',
- _("Conservatory state")),
- ('archaeological_warehouse.models.ContainerType', _("Container type")),
- ('archaeological_warehouse.models.WarehouseDivision',
- _("Warehouse division")),
- ('archaeological_warehouse.models.WarehouseType', _("Warehouse type")),
- ('archaeological_finds.models.TreatmentType', _("Treatment type")),
- ('archaeological_finds.models.TreatmentEmergencyType',
- _("Treatment emergency type")),
- ('archaeological_finds.models.ObjectType', _("Object type")),
- ('archaeological_finds.models.IntegrityType', _("Integrity type")),
- ('archaeological_finds.models.RemarkabilityType',
- _("Remarkability type")),
- ('archaeological_finds.models.AlterationType', _("Alteration type")),
- ('archaeological_finds.models.AlterationCauseType',
- _("Alteration cause type")),
- ('archaeological_finds.models.BatchType', _("Batch type")),
- ('archaeological_finds.models.CheckedType', _("Checked type")),
- ('archaeological_finds.models.MaterialTypeQualityType',
- _("Material type quality")),
+ ("archaeological_context_records.models.Unit", _("Unit")),
+ ("archaeological_context_records.models.ActivityType", _("Activity type")),
+ (
+ "archaeological_context_records.models.DocumentationType",
+ _("Documentation type"),
+ ),
+ ("archaeological_context_records.models.DatingQuality", _("Dating quality")),
+ ("archaeological_finds.models.MaterialType", _("Material")),
+ ("archaeological_finds.models.ConservatoryState", _("Conservatory state")),
+ ("archaeological_warehouse.models.ContainerType", _("Container type")),
+ ("archaeological_warehouse.models.WarehouseDivision", _("Warehouse division")),
+ ("archaeological_warehouse.models.WarehouseType", _("Warehouse type")),
+ ("archaeological_finds.models.TreatmentType", _("Treatment type")),
+ (
+ "archaeological_finds.models.TreatmentEmergencyType",
+ _("Treatment emergency type"),
+ ),
+ ("archaeological_finds.models.ObjectType", _("Object type")),
+ ("archaeological_finds.models.IntegrityType", _("Integrity type")),
+ ("archaeological_finds.models.RemarkabilityType", _("Remarkability type")),
+ ("archaeological_finds.models.AlterationType", _("Alteration type")),
+ ("archaeological_finds.models.AlterationCauseType", _("Alteration cause type")),
+ ("archaeological_finds.models.BatchType", _("Batch type")),
+ ("archaeological_finds.models.CheckedType", _("Checked type")),
+ ("archaeological_finds.models.MaterialTypeQualityType", _("Material type quality")),
("archaeological_finds.models.FunctionalArea", _("Functional area")),
- ('archaeological_context_records.models.IdentificationType',
- _("Identification type")),
- ('archaeological_context_records.models.RelationType',
- _("Context record relation type")),
- ('SpatialReferenceSystem', _("Spatial reference system")),
- ('SupportType', _("Support type")),
- ('TitleType', _("Title type")),
+ (
+ "archaeological_context_records.models.IdentificationType",
+ _("Identification type"),
+ ),
+ (
+ "archaeological_context_records.models.RelationType",
+ _("Context record relation type"),
+ ),
+ ("SpatialReferenceSystem", _("Spatial reference system")),
+ ("SupportType", _("Support type")),
+ ("TitleType", _("Title type")),
]
TARGET_MODELS_KEYS = [tm[0] for tm in TARGET_MODELS]
IMPORTER_TYPES = (
- ('IntegerFormater', _("Integer")),
- ('FloatFormater', _("Float")),
- ('UnicodeFormater', _("String")),
- ('DateFormater', _("Date")),
- ('TypeFormater', _("Type")),
- ('YearFormater', _("Year")),
- ('InseeFormater', _("INSEE code")),
- ('UpperFormater', _("Upper case")),
- ('LowerFormater', _("Lower case")),
- ('StrToBoolean', _("String to boolean")),
- ('FileFormater', pgettext_lazy("filesystem", "File")),
- ('UnknowType', _("Unknow type"))
+ ("IntegerFormater", _("Integer")),
+ ("FloatFormater", _("Float")),
+ ("UnicodeFormater", _("String")),
+ ("DateFormater", _("Date")),
+ ("TypeFormater", _("Type")),
+ ("YearFormater", _("Year")),
+ ("InseeFormater", _("INSEE code")),
+ ("UpperFormater", _("Upper case")),
+ ("LowerFormater", _("Lower case")),
+ ("StrToBoolean", _("String to boolean")),
+ ("FileFormater", pgettext_lazy("filesystem", "File")),
+ ("UnknowType", _("Unknow type")),
)
IMPORTER_TYPES_DCT = {
- 'IntegerFormater': IntegerFormater,
- 'FloatFormater': FloatFormater,
- 'UnicodeFormater': UnicodeFormater,
- 'DateFormater': DateFormater,
- 'TypeFormater': TypeFormater,
- 'YearFormater': YearFormater,
- 'StrToBoolean': StrToBoolean,
- 'FileFormater': FileFormater,
- 'InseeFormater': InseeFormater,
- 'UpperFormater': UpperCaseFormater,
- 'LowerFormater': LowerCaseFormater,
- 'UnknowType': None,
+ "IntegerFormater": IntegerFormater,
+ "FloatFormater": FloatFormater,
+ "UnicodeFormater": UnicodeFormater,
+ "DateFormater": DateFormater,
+ "TypeFormater": TypeFormater,
+ "YearFormater": YearFormater,
+ "StrToBoolean": StrToBoolean,
+ "FileFormater": FileFormater,
+ "InseeFormater": InseeFormater,
+ "UpperFormater": UpperCaseFormater,
+ "LowerFormater": LowerCaseFormater,
+ "UnknowType": None,
}
DATE_FORMATS = (
- ('%Y', _("4 digit year. e.g.: \"2015\"")),
- ('%Y/%m/%d', _("4 digit year/month/day. e.g.: \"2015/02/04\"")),
- ('%d/%m/%Y', _("Day/month/4 digit year. e.g.: \"04/02/2015\"")),
+ ("%Y", _('4 digit year. e.g.: "2015"')),
+ ("%Y/%m/%d", _('4 digit year/month/day. e.g.: "2015/02/04"')),
+ ("%d/%m/%Y", _('Day/month/4 digit year. e.g.: "04/02/2015"')),
)
-IMPORTER_TYPES_CHOICES = {'TypeFormater': TARGET_MODELS,
- 'DateFormater': DATE_FORMATS}
+IMPORTER_TYPES_CHOICES = {"TypeFormater": TARGET_MODELS, "DateFormater": DATE_FORMATS}
class FormaterTypeManager(models.Manager):
def get_by_natural_key(self, formater_type, options, many_split):
- return self.get(formater_type=formater_type,
- options=options, many_split=many_split)
+ return self.get(
+ formater_type=formater_type, options=options, many_split=many_split
+ )
class FormaterType(models.Model):
- formater_type = models.CharField("Formater type", max_length=20,
- choices=IMPORTER_TYPES)
- options = models.CharField(_("Options"), max_length=500, blank=True,
- null=True)
- many_split = models.CharField(_("Split character(s)"), max_length=10,
- blank=True, null=True)
+ formater_type = models.CharField(
+ "Formater type", max_length=20, choices=IMPORTER_TYPES
+ )
+ options = models.CharField(_("Options"), max_length=500, blank=True, null=True)
+ many_split = models.CharField(
+ _("Split character(s)"), max_length=10, blank=True, null=True
+ )
objects = FormaterTypeManager()
class Meta:
verbose_name = _("Importer - Formater type")
verbose_name_plural = _("Importer - Formater types")
- unique_together = ('formater_type', 'options', 'many_split')
- ordering = ('formater_type', 'options')
+ unique_together = ("formater_type", "options", "many_split")
+ ordering = ("formater_type", "options")
def natural_key(self):
return self.formater_type, self.options, self.many_split
def __str__(self):
return " - ".join(
- [str(dict(IMPORTER_TYPES)[self.formater_type])
- if self.formater_type in IMPORTER_TYPES_DCT else ''] +
- [getattr(self, k) for k in ('options', 'many_split')
- if getattr(self, k)])
+ [
+ str(dict(IMPORTER_TYPES)[self.formater_type])
+ if self.formater_type in IMPORTER_TYPES_DCT
+ else ""
+ ]
+ + [getattr(self, k) for k in ("options", "many_split") if getattr(self, k)]
+ )
def get_choices(self):
if self.format_type in IMPORTER_TYPES_CHOICES:
@@ -867,35 +968,36 @@ class FormaterType(models.Model):
def get_formater_type(self, target, import_instance=None):
if self.formater_type not in IMPORTER_TYPES_DCT.keys():
return
- kwargs = {'db_target': target, 'import_instance': import_instance}
+ kwargs = {"db_target": target, "import_instance": import_instance}
if self.many_split:
- kwargs['many_split'] = self.many_split
- if self.formater_type == 'TypeFormater':
+ kwargs["many_split"] = self.many_split
+ if self.formater_type == "TypeFormater":
if self.options not in TARGET_MODELS_KEYS:
logger.warning(
"**WARN FormaterType.get_formater_type**: {} "
- "is not in TARGET_MODELS_KEYS".format(self.options))
+ "is not in TARGET_MODELS_KEYS".format(self.options)
+ )
return
if self.options in dir():
model = dir()[self.options]
else:
model = import_class(self.options)
return TypeFormater(model, **kwargs)
- elif self.formater_type == 'UnicodeFormater':
+ elif self.formater_type == "UnicodeFormater":
if self.options:
try:
return UnicodeFormater(int(self.options.strip()), **kwargs)
except ValueError:
pass
return UnicodeFormater(**kwargs)
- elif self.formater_type == 'DateFormater':
+ elif self.formater_type == "DateFormater":
date_formats = self.options
if self.many_split:
- date_formats = self.options.split(kwargs.pop('many_split'))
+ date_formats = self.options.split(kwargs.pop("many_split"))
return DateFormater(date_formats, **kwargs)
- elif self.formater_type == 'StrToBoolean':
+ elif self.formater_type == "StrToBoolean":
return StrToBoolean(**kwargs)
- elif self.formater_type == 'UnknowType':
+ elif self.formater_type == "UnknowType":
return
else:
return IMPORTER_TYPES_DCT[self.formater_type](**kwargs)
@@ -916,13 +1018,13 @@ IMPORT_STATE = (
)
IMPORT_STATE_DCT = dict(IMPORT_STATE)
-ENCODINGS = [(settings.ENCODING, settings.ENCODING),
- (settings.ALT_ENCODING, settings.ALT_ENCODING),
- ('utf-8', 'utf-8')]
+ENCODINGS = [
+ (settings.ENCODING, settings.ENCODING),
+ (settings.ALT_ENCODING, settings.ALT_ENCODING),
+ ("utf-8", "utf-8"),
+]
-CSV_SEPS = ((",", ","),
- (";", ";"),
- ("|", "|"))
+CSV_SEPS = ((",", ","), (";", ";"), ("|", "|"))
@task()
@@ -944,68 +1046,117 @@ def delayed_check(import_pk):
class Import(models.Model):
- user = models.ForeignKey('IshtarUser', blank=True, null=True,
- on_delete=models.SET_NULL)
+ user = models.ForeignKey(
+ "IshtarUser", blank=True, null=True, on_delete=models.SET_NULL
+ )
name = models.CharField(_("Name"), max_length=500, null=True)
- importer_type = models.ForeignKey(ImporterType)
+ importer_type = models.ForeignKey(ImporterType, on_delete=models.CASCADE)
imported_file = models.FileField(
- _("Imported file"), upload_to="upload/imports/%Y/%m/", max_length=220,
- help_text=max_size_help(), blank=True, null=True)
+ _("Imported file"),
+ upload_to="upload/imports/%Y/%m/",
+ max_length=220,
+ help_text=max_size_help(),
+ blank=True,
+ null=True,
+ )
imported_images = models.FileField(
- _("Associated images (zip file)"), upload_to="upload/imports/%Y/%m/",
- blank=True, null=True, max_length=220, help_text=max_size_help())
+ _("Associated images (zip file)"),
+ upload_to="upload/imports/%Y/%m/",
+ blank=True,
+ null=True,
+ max_length=220,
+ help_text=max_size_help(),
+ )
associated_group = models.ForeignKey(
- TargetKeyGroup, blank=True, null=True,
+ TargetKeyGroup,
+ blank=True,
+ null=True,
on_delete=models.SET_NULL,
- help_text=_("If a group is selected, target key saved in this group "
- "will be used.")
+ help_text=_(
+ "If a group is selected, target key saved in this group " "will be used."
+ ),
+ )
+ encoding = models.CharField(
+ _("Encoding"), choices=ENCODINGS, default="utf-8", max_length=15
)
- encoding = models.CharField(_("Encoding"), choices=ENCODINGS,
- default='utf-8', max_length=15)
csv_sep = models.CharField(
- _("CSV separator"), choices=CSV_SEPS, default=',', max_length=1,
- help_text=_("Separator for CSV file. Standard is comma but Microsoft "
- "Excel do not follow this standard and use semi-colon.")
+ _("CSV separator"),
+ choices=CSV_SEPS,
+ default=",",
+ max_length=1,
+ help_text=_(
+ "Separator for CSV file. Standard is comma but Microsoft "
+ "Excel do not follow this standard and use semi-colon."
+ ),
)
skip_lines = models.IntegerField(
- _("Skip lines"), default=1,
- help_text=_("Number of header lines in your file (can be 0)."))
+ _("Skip lines"),
+ default=1,
+ help_text=_("Number of header lines in your file (can be 0)."),
+ )
error_file = models.FileField(
- _("Error file"), upload_to="upload/imports/%Y/%m/",
- blank=True, null=True, max_length=255, help_text=max_size_help())
+ _("Error file"),
+ upload_to="upload/imports/%Y/%m/",
+ blank=True,
+ null=True,
+ max_length=255,
+ help_text=max_size_help(),
+ )
result_file = models.FileField(
- _("Result file"), upload_to="upload/imports/%Y/%m/",
- blank=True, null=True, max_length=255, help_text=max_size_help())
+ _("Result file"),
+ upload_to="upload/imports/%Y/%m/",
+ blank=True,
+ null=True,
+ max_length=255,
+ help_text=max_size_help(),
+ )
match_file = models.FileField(
- _("Match file"), upload_to="upload/imports/%Y/%m/", blank=True,
- null=True, max_length=255, help_text=max_size_help())
+ _("Match file"),
+ upload_to="upload/imports/%Y/%m/",
+ blank=True,
+ null=True,
+ max_length=255,
+ help_text=max_size_help(),
+ )
archive_file = models.FileField(
- _("Archive file"), upload_to="upload/imports/%Y/%m/", blank=True,
- null=True, max_length=255, help_text=max_size_help())
- state = models.CharField(_("State"), max_length=2, choices=IMPORT_STATE,
- default='C')
+ _("Archive file"),
+ upload_to="upload/imports/%Y/%m/",
+ blank=True,
+ null=True,
+ max_length=255,
+ help_text=max_size_help(),
+ )
+ state = models.CharField(
+ _("State"), max_length=2, choices=IMPORT_STATE, default="C"
+ )
conservative_import = models.BooleanField(
- _("Conservative import"), default=False,
- help_text=_('If set to true, do not overload existing values.'))
+ _("Conservative import"),
+ default=False,
+ help_text=_("If set to true, do not overload existing values."),
+ )
creation_date = models.DateTimeField(
- _("Creation date"), auto_now_add=True, blank=True, null=True)
- end_date = models.DateTimeField(_("End date"), auto_now_add=True,
- blank=True, null=True, editable=False)
+ _("Creation date"), auto_now_add=True, blank=True, null=True
+ )
+ end_date = models.DateTimeField(
+ _("End date"), auto_now_add=True, blank=True, null=True, editable=False
+ )
seconds_remaining = models.IntegerField(
- _("Remaining seconds"), blank=True, null=True, editable=False)
- current_line = models.IntegerField(_("Current line"), blank=True,
- null=True)
- number_of_line = models.IntegerField(_("Number of line"), blank=True,
- null=True)
+ _("Remaining seconds"), blank=True, null=True, editable=False
+ )
+ current_line = models.IntegerField(_("Current line"), blank=True, null=True)
+ number_of_line = models.IntegerField(_("Number of line"), blank=True, null=True)
imported_line_numbers = models.TextField(
- _("Imported line numbers"), blank=True, null=True,
- validators=[validate_comma_separated_integer_list]
+ _("Imported line numbers"),
+ blank=True,
+ null=True,
+ validators=[validate_comma_separated_integer_list],
)
- changed_checked = models.BooleanField(_("Changed have been checked"),
- default=False)
+ changed_checked = models.BooleanField(_("Changed have been checked"), default=False)
changed_line_numbers = models.TextField(
- _("Changed line numbers"), blank=True, null=True,
- validators=[validate_comma_separated_integer_list]
+ _("Changed line numbers"),
+ blank=True,
+ null=True,
+ validators=[validate_comma_separated_integer_list],
)
class Meta:
@@ -1016,17 +1167,17 @@ class Import(models.Model):
return "{} | {}".format(self.name or "-", self.importer_type)
def need_matching(self):
- return bool(TargetKey.objects.filter(associated_import=self,
- is_set=False).count())
+ return bool(
+ TargetKey.objects.filter(associated_import=self, is_set=False).count()
+ )
@property
def errors(self):
if not self.error_file:
return []
errors = []
- with open(self.error_file.path, 'rt') as csvfile:
- reader = csv.DictReader(
- csvfile, fieldnames=['line', 'column', 'error'])
+ with open(self.error_file.path, "rt") as csvfile:
+ reader = csv.DictReader(csvfile, fieldnames=["line", "column", "error"])
for idx, row in enumerate(reader):
if not idx: # pass the header
continue
@@ -1040,11 +1191,10 @@ class Import(models.Model):
return
filename = self.imported_file.path
encodings = [self.encoding]
- encodings += [coding for coding, c in ENCODINGS
- if coding != self.encoding]
+ encodings += [coding for coding, c in ENCODINGS if coding != self.encoding]
for encoding in encodings:
try:
- with open(filename, 'r', encoding=encoding) as f:
+ with open(filename, "r", encoding=encoding) as f:
reader = csv.reader(f, delimiter=self.csv_sep)
nb = sum(1 for __ in reader) - self.skip_lines
except UnicodeDecodeError:
@@ -1059,14 +1209,14 @@ class Import(models.Model):
def progress_percent(self):
if not self.current_line or not self.number_of_line:
return 0
- return int((float(self.current_line) / float(self.number_of_line))
- * 100)
+ return int((float(self.current_line) / float(self.number_of_line)) * 100)
def add_imported_line(self, idx_line):
if not self.number_of_line:
self.get_number_of_lines()
- if self.imported_line_numbers and \
- str(idx_line) in self.imported_line_numbers.split(','):
+ if self.imported_line_numbers and str(
+ idx_line
+ ) in self.imported_line_numbers.split(","):
return
if self.imported_line_numbers:
self.imported_line_numbers += ","
@@ -1077,8 +1227,9 @@ class Import(models.Model):
self.save()
def add_changed_line(self, idx_line):
- if self.changed_line_numbers and \
- str(idx_line) in self.changed_line_numbers.split(','):
+ if self.changed_line_numbers and str(
+ idx_line
+ ) in self.changed_line_numbers.split(","):
return
if self.changed_line_numbers:
self.changed_line_numbers += ","
@@ -1090,7 +1241,7 @@ class Import(models.Model):
def remove_changed_line(self, idx_line):
if not self.changed_line_numbers:
return
- line_numbers = self.changed_line_numbers.split(',')
+ line_numbers = self.changed_line_numbers.split(",")
if str(idx_line) not in line_numbers:
return
line_numbers.pop(line_numbers.index(str(idx_line)))
@@ -1102,12 +1253,13 @@ class Import(models.Model):
return True
if not self.changed_line_numbers:
return
- line_numbers = self.changed_line_numbers.split(',')
+ line_numbers = self.changed_line_numbers.split(",")
return str(idx_line) in line_numbers
def line_is_imported(self, idx_line):
- return self.imported_line_numbers and \
- str(idx_line) in self.imported_line_numbers.split(',')
+ return self.imported_line_numbers and str(
+ idx_line
+ ) in self.imported_line_numbers.split(",")
def get_actions(self):
"""
@@ -1125,24 +1277,24 @@ class Import(models.Model):
actions.append(('I', _("Launch import")))
if profile.experimental_feature:
if self.changed_checked:
- actions.append(('IS', _("Step by step import")))
- actions.append(('CH', _("Re-check for changes")))
+ actions.append(("IS", _("Step by step import")))
+ actions.append(("CH", _("Re-check for changes")))
else:
- actions.append(('CH', _("Check for changes")))
- if self.state in ('F', 'FE'):
- actions.append(('A', _("Re-analyse")))
- actions.append(('I', _("Re-import")))
+ actions.append(("CH", _("Check for changes")))
+ if self.state in ("F", "FE"):
+ actions.append(("A", _("Re-analyse")))
+ actions.append(("I", _("Re-import")))
if profile.experimental_feature:
if self.changed_checked:
- actions.append(('IS', _("Step by step re-import")))
- actions.append(('CH', _("Re-check for changes")))
+ actions.append(("IS", _("Step by step re-import")))
+ actions.append(("CH", _("Re-check for changes")))
else:
- actions.append(('CH', _("Check for changes")))
- actions.append(('AC', _("Archive")))
- if self.state == 'AC':
+ actions.append(("CH", _("Check for changes")))
+ actions.append(("AC", _("Archive")))
+ if self.state == "AC":
state = "FE" if self.error_file else "F"
actions.append((state, _("Unarchive")))
- actions.append(('D', _("Delete")))
+ actions.append(("D", _("Delete")))
return actions
@property
@@ -1157,8 +1309,10 @@ class Import(models.Model):
def get_importer_instance(self):
return self.importer_type.get_importer_class(import_instance=self)(
- skip_lines=self.skip_lines, import_instance=self,
- conservative_import=self.conservative_import)
+ skip_lines=self.skip_lines,
+ import_instance=self,
+ conservative_import=self.conservative_import,
+ )
@property
def data_table(self):
@@ -1169,23 +1323,21 @@ class Import(models.Model):
filename = None
for name in z.namelist():
# get first CSV file found
- if name.endswith('.csv'):
+ if name.endswith(".csv"):
filename = name
break
if not filename:
return []
- tmpdir = tempfile.mkdtemp(prefix='tmp-ishtar-')
+ tmpdir = tempfile.mkdtemp(prefix="tmp-ishtar-")
imported_file = z.extract(filename, tmpdir)
encodings = [self.encoding]
- encodings += [coding for coding, c in ENCODINGS
- if coding != self.encoding]
+ encodings += [coding for coding, c in ENCODINGS if coding != self.encoding]
for encoding in encodings:
try:
with open(imported_file, encoding=encoding) as csv_file:
vals = [
- line for line in csv.reader(csv_file,
- delimiter=self.csv_sep)
+ line for line in csv.reader(csv_file, delimiter=self.csv_sep)
]
if tmpdir:
shutil.rmtree(tmpdir)
@@ -1199,19 +1351,20 @@ class Import(models.Model):
return []
def initialize(self, user=None, session_key=None):
- self.state = 'AP'
+ self.state = "AP"
self.end_date = datetime.datetime.now()
self.save()
try:
self.get_importer_instance().initialize(
- self.data_table, user=user, output='db')
+ self.data_table, user=user, output="db"
+ )
except ImporterError as e:
if session_key:
put_session_message(session_key, e.msg, "danger")
- self.state = 'C'
+ self.state = "C"
self.save()
return
- self.state = 'A'
+ self.state = "A"
self.end_date = datetime.datetime.now()
self.save()
@@ -1220,17 +1373,16 @@ class Import(models.Model):
return self.check_modified(session_key=session_key)
put_session_message(
session_key,
- str(
- _("Modification check {} added to the queue")
- ).format(self.name),
- "info")
- self.state = 'HQ'
+ str(_("Modification check {} added to the queue")).format(self.name),
+ "info",
+ )
+ self.state = "HQ"
self.end_date = datetime.datetime.now()
self.save()
return delayed_check.delay(self.pk)
def check_modified(self, session_key=None):
- self.state = 'HP'
+ self.state = "HP"
self.end_date = datetime.datetime.now()
self.changed_line_numbers = ""
self.changed_checked = False
@@ -1239,9 +1391,7 @@ class Import(models.Model):
for idx in range(self.skip_lines, self.get_number_of_lines() + 1):
try:
imprt, data = self.importation(
- simulate=True,
- line_to_process=idx,
- return_importer_and_data=True
+ simulate=True, line_to_process=idx, return_importer_and_data=True
)
except IOError as e:
# error is identified as a change
@@ -1268,7 +1418,7 @@ class Import(models.Model):
break
current_value = getattr(obj, k)
updated_value = updated_values[k]
- if hasattr(current_value, 'all'):
+ if hasattr(current_value, "all"):
current_value = list(current_value.all())
changed = False
for v in updated_value:
@@ -1292,85 +1442,95 @@ class Import(models.Model):
put_session_message(
session_key,
str(_("Import {} added to the queue")).format(self.name),
- "info")
- self.state = 'IQ'
+ "info",
+ )
+ self.state = "IQ"
self.end_date = datetime.datetime.now()
self.save()
return delayed_import.delay(self.pk)
- def importation(self, session_key=None, line_to_process=None,
- simulate=False, return_importer_and_data=False,
- request=None):
- self.state = 'IP'
+ def importation(
+ self,
+ session_key=None,
+ line_to_process=None,
+ simulate=False,
+ return_importer_and_data=False,
+ request=None,
+ ):
+ self.state = "IP"
self.end_date = datetime.datetime.now()
if not line_to_process: # full import
- self.imported_line_numbers = ''
+ self.imported_line_numbers = ""
self.current_line = 0
self.save()
importer = self.get_importer_instance()
try:
data = importer.importation(
- self.data_table, user=self.user,
- line_to_process=line_to_process, simulate=simulate)
+ self.data_table,
+ user=self.user,
+ line_to_process=line_to_process,
+ simulate=simulate,
+ )
except IOError:
error_message = str(_("Error on imported file: {}")).format(
- self.imported_file)
+ self.imported_file
+ )
importer.errors = [error_message]
if session_key:
put_session_message(session_key, error_message, "warning")
- ids = get_session_var(session_key, 'current_import_id')
+ ids = get_session_var(session_key, "current_import_id")
if not ids:
ids = []
ids.append(self.pk)
- put_session_var(session_key, 'current_import_id', ids)
+ put_session_var(session_key, "current_import_id", ids)
if line_to_process:
- self.state = 'PI'
+ self.state = "PI"
else:
- self.state = 'FE'
+ self.state = "FE"
self.save()
if not return_importer_and_data:
return
return importer, None
# result file
filename = slugify(self.importer_type.name)
- now = datetime.datetime.now().isoformat('-').replace(':', '')
+ now = datetime.datetime.now().isoformat("-").replace(":", "")
result_file = filename + "_result_%s.csv" % now
self.result_file.save(
- result_file, ContentFile(importer.get_csv_result().encode('utf-8')))
+ result_file, ContentFile(importer.get_csv_result().encode("utf-8"))
+ )
if importer.errors:
if line_to_process:
- self.state = 'PI'
+ self.state = "PI"
else:
- self.state = 'FE'
+ self.state = "FE"
error_file = filename + "_errors_%s.csv" % now
self.error_file.save(
- error_file,
- ContentFile(importer.get_csv_errors().encode('utf-8'))
+ error_file, ContentFile(importer.get_csv_errors().encode("utf-8"))
)
- msg = str(_("Import {} finished with errors")).format(
- self.name)
+ msg = str(_("Import {} finished with errors")).format(self.name)
msg_cls = "warning"
else:
if line_to_process:
- self.state = 'PI'
+ self.state = "PI"
else:
- self.state = 'F'
+ self.state = "F"
self.error_file = None
- msg = str(_("Import {} finished with no errors")).format(
- self.name)
+ msg = str(_("Import {} finished with no errors")).format(self.name)
msg_cls = "primary"
if session_key and request:
put_session_message(session_key, msg, msg_cls)
- ids = request.session['current_import_id'] \
- if 'current_import_id' in request.session else []
+ ids = (
+ request.session["current_import_id"]
+ if "current_import_id" in request.session
+ else []
+ )
ids.append(self.pk)
- put_session_var(session_key, 'current_import_id', ids)
+ put_session_var(session_key, "current_import_id", ids)
if importer.match_table:
match_file = filename + "_match_%s.csv" % now
self.match_file.save(
- match_file,
- ContentFile(importer.get_csv_matches().encode('utf-8'))
+ match_file, ContentFile(importer.get_csv_matches().encode("utf-8"))
)
self.end_date = datetime.datetime.now()
self.save()
@@ -1382,7 +1542,7 @@ class Import(models.Model):
return
with tempfile.TemporaryDirectory() as tmp_dir_name:
# extract the current archive
- current_zip = zipfile.ZipFile(self.archive_file.path, 'r')
+ current_zip = zipfile.ZipFile(self.archive_file.path, "r")
name_list = current_zip.namelist()
if "content.json" not in name_list:
return
@@ -1402,19 +1562,19 @@ class Import(models.Model):
with open(full_filename, "rb") as raw_file:
getattr(self, attr).save(
"upload/imports/{}/{:02d}/{}".format(
- today.year, today.month, filename),
- File(raw_file)
+ today.year, today.month, filename
+ ),
+ File(raw_file),
)
os.remove(self.archive_file.path)
- setattr(self, 'archive_file', None)
+ setattr(self, "archive_file", None)
self.state = "FE" if self.error_file else "F"
self.save()
return True
def _archive(self):
- file_attr = ["imported_file", "error_file", "result_file",
- "match_file"]
+ file_attr = ["imported_file", "error_file", "result_file", "match_file"]
files = [
(k, getattr(self, k).path, getattr(self, k).name.split(os.sep)[-1])
for k in file_attr
@@ -1438,11 +1598,15 @@ class Import(models.Model):
current_zip.write(content_name, arcname="content.json")
today = datetime.date.today()
- with open(archive_name, "rb", ) as raw_file:
+ with open(
+ archive_name,
+ "rb",
+ ) as raw_file:
self.archive_file.save(
"upload/imports/{}/{:02d}/{}".format(
- today.year, today.month, base_name),
- File(raw_file)
+ today.year, today.month, base_name
+ ),
+ File(raw_file),
)
IshtarSiteProfile = apps.get_model("ishtar_common", "IshtarSiteProfile")
profile = IshtarSiteProfile.get_current_profile()
@@ -1460,7 +1624,7 @@ class Import(models.Model):
self._archive_pending = False
def archive(self):
- self.state = 'AC'
+ self.state = "AC"
self.end_date = datetime.datetime.now()
self._archive()
@@ -1473,20 +1637,22 @@ class Import(models.Model):
imported = []
for related, zorg in get_all_related_m2m_objects_with_model(self):
accessor = related.get_accessor_name()
- imported += [(accessor, obj)
- for obj in getattr(self, accessor).all()]
+ imported += [(accessor, obj) for obj in getattr(self, accessor).all()]
return imported
def save(self, *args, **kwargs):
super(Import, self).save(*args, **kwargs)
- if self.state == "AC" and not getattr(
- self, "_archive_pending", False) and not self.archive_file:
+ if (
+ self.state == "AC"
+ and not getattr(self, "_archive_pending", False)
+ and not self.archive_file
+ ):
self._archive()
def pre_delete_import(sender, **kwargs):
# deleted imported items when an import is delete
- instance = kwargs.get('instance')
+ instance = kwargs.get("instance")
if not instance:
return
to_delete = []