summaryrefslogtreecommitdiff
path: root/ishtar_common/data_importer.py
diff options
context:
space:
mode:
authorÉtienne Loks <etienne.loks@iggdrasil.net>2026-02-24 15:16:56 +0100
committerÉtienne Loks <etienne.loks@iggdrasil.net>2026-02-25 10:15:33 +0100
commit53b408380649f39880763b35a2a9d24eb7cc014a (patch)
tree4a932ebc46229239d7bcb9e21f36ce84efa969c9 /ishtar_common/data_importer.py
parent13e838988e0a32e0ef54c15d90c8df2f92dad853 (diff)
downloadIshtar-53b408380649f39880763b35a2a9d24eb7cc014a.tar.bz2
Ishtar-53b408380649f39880763b35a2a9d24eb7cc014a.zip
♻️ imports: simplify, refactor and clean
Diffstat (limited to 'ishtar_common/data_importer.py')
-rw-r--r--ishtar_common/data_importer.py320
1 files changed, 75 insertions, 245 deletions
diff --git a/ishtar_common/data_importer.py b/ishtar_common/data_importer.py
index 8e0671cd8..f3eb79a30 100644
--- a/ishtar_common/data_importer.py
+++ b/ishtar_common/data_importer.py
@@ -149,7 +149,7 @@ class ImportFormater:
return
def init(
- self, vals, output=None, choose_default=False, import_instance=None, user=None
+ self, vals, import_instance=None, user=None
):
try:
lst = iter(self.formater)
@@ -159,15 +159,13 @@ class ImportFormater:
if formater:
formater.check(
vals,
- output,
self.comment,
- choose_default=choose_default,
import_instance=import_instance,
user=user,
)
def post_process(self, obj, context, value, owner=None):
- raise NotImplemented()
+ raise NotImplementedError()
class ImporterError(Exception):
@@ -199,9 +197,7 @@ class Formater:
def check(
self,
values,
- output=None,
comment="",
- choose_default=False,
import_instance=None,
user=None,
):
@@ -237,88 +233,42 @@ class Formater:
return q
-class ChoiceChecker(object):
- def report_new(self, comment):
- if not self.new_keys:
- return
- msg = 'For "%s" these new associations have been made:\n' % comment
- sys.stderr.write(msg.encode("utf-8"))
- for k in self.new_keys:
- msg = '"%s";"%s"\n' % (k, self.new_keys[k])
- sys.stderr.write(msg.encode("utf-8"))
-
-
class UnicodeFormater(Formater):
def __init__(
self,
+ import_instance,
+ db_target,
max_length=None,
- clean=False,
- re_filter=None,
- notnull=False,
- prefix="",
- db_target=None,
- import_instance=None,
- many_split=None,
):
self.max_length = max_length
self.db_target = db_target
- self.clean = clean
- self.re_filter = re_filter
- self.notnull = notnull
- self.prefix = prefix
self.import_instance = import_instance
- self.many_split = many_split
def format(self, value):
try:
- if type(value) != str:
- value = str(value.strip())
+ if not isinstance(value, str):
+ value = str(value)
vals = []
for v in value.split("\n"):
v = v.strip()
if v:
vals.append(v)
value = "\n".join(vals)
- if self.re_filter:
- m = self.re_filter.match(value)
- if m:
- value = "".join(m.groups())
- if self.clean:
- if value.startswith(","):
- value = value[1:]
- if value.endswith(","):
- value = value[:-1]
- value = value.replace(", , ", ", ")
except UnicodeDecodeError:
return
if self.max_length and len(value) > self.max_length:
raise ValueError(
- _(
+ str(_(
'"%(value)s" is too long. The max length is %(length)d '
"characters."
- )
- % {"value": value, "length": self.max_length}
+ )) % {"value": value, "length": self.max_length}
)
- if self.notnull and not value:
- return
- if value:
- value = self.prefix + value
return value
def __str__(self):
return f"{self.__class__.__name__}|{self.max_length or 0}"
-class BooleanFormater(Formater):
- def format(self, value):
- value = value.strip().upper()
- if value in ("1", "OUI", "VRAI", "YES", "TRUE"):
- return True
- if value in ("", "0", "NON", "FAUX", "NO", "FALSE"):
- return False
- raise ValueError(_('"%(value)s" not equal to yes or no') % {"value": value})
-
-
class FloatFormater(Formater):
def format(self, value):
value = value.strip().replace(",", ".")
@@ -425,53 +375,35 @@ class IntegerFormater(Formater):
raise ValueError(_('"%(value)s" is not an integer') % {"value": value})
-class StrChoiceFormater(Formater, ChoiceChecker):
+class TypeFormater(Formater):
def __init__(
self,
- choices,
- strict=False,
- equiv_dict=None,
- model=None,
- cli=False,
- many_split="",
- db_target=None,
- import_instance=None,
+ model,
+ import_instance,
+ db_target,
+ many_split=False,
):
- if not equiv_dict:
- equiv_dict = {}
- self.choices = list(choices)
- self.strict = strict
- self.equiv_dict = copy.deepcopy(equiv_dict)
- self.cli = cli
+ defaults = {}
self.model = model
+ self.defaults = defaults
+ self.many_split = many_split
self.db_target = db_target
- self.create = False
self.missings = set()
- self.new_keys = {}
self.match_table = {}
- self.many_split = many_split
- self.import_instance = None
- for key, value in self.choices:
- value = str(value)
- if not self.strict:
- value = slugify(value)
- if value not in self.equiv_dict:
- v = key
- if model and v:
- v = model.objects.get(pk=v)
- self.equiv_dict[value] = v
+ self.equiv_dict, self.choices = {}, []
+ self.import_instance = import_instance
+ if import_instance:
+ for item in model.objects.all():
+ self.choices.append((item.pk, str(item)))
+ for key in item.get_keys(current_import=import_instance):
+ self.equiv_dict[key] = item
self.init_db_target()
def init_db_target(self, user=None):
- if not self.db_target:
- return
-
q = self.get_db_target_query(user)
-
for target_key in list(q.all()):
key = target_key.key
- if not self.strict:
- key = slugify(key)
+ key = slugify(key, allow_unicode=True)
if key in self.equiv_dict:
continue
v = target_key.value
@@ -486,114 +418,60 @@ class StrChoiceFormater(Formater, ChoiceChecker):
self.equiv_dict[key] = v
def prepare(self, value):
- return str(value).strip()
+ return slugify(str(value).strip(), allow_unicode=True)
- def _get_choices(self, comment=""):
- msgstr = comment + " - "
- msgstr += str(
- _('Choice for "%s" is not available. ' "Which one is relevant?\n")
- )
- idx = -1
- for idx, choice in enumerate(self.choices):
- msgstr += "%d. %s\n" % (idx + 1, choice[1])
- idx += 2
- if self.create:
- msgstr += str(_("%d. None of the above - create new")) % idx + "\n"
- idx += 1
- msgstr += str(_("%d. None of the above - skip")) % idx + "\n"
- return msgstr, idx
+ def add_key(self, obj, value, ishtar_import=None):
+ obj.add_key(slugify(value), force=True, ishtar_import=ishtar_import)
+
+ def format(self, value):
+ origin_value = value
+ value = self.prepare(value)
+ if value in self.equiv_dict:
+ self.match_table[origin_value] = self.equiv_dict[value] or ""
+ return self.equiv_dict[value]
def check(
self,
values,
- output=None,
comment="",
- choose_default=False,
import_instance=None,
user=None,
):
self.init_db_target(user)
- if (not output or output == "silent") and not choose_default:
- return
if self.many_split:
new_values = []
r = re.compile(self.many_split)
for value in values:
new_values += r.split(value)
values = new_values
+
TargetKey = apps.get_model("ishtar_common", "TargetKey")
for value in set(values):
value = self.prepare(value)
if value in self.equiv_dict:
continue
self.missings.add(value)
- if output == "db" and self.db_target:
- for missing in self.missings:
- q = {"target": self.db_target, "key": missing}
- query = TargetKey.objects.filter(**q)
- query_clean = query.filter(
- associated_import__isnull=True,
- associated_user__isnull=True,
- associated_group__isnull=True,
- is_set=False
- )
- if query_clean.count(): # bad keys for this target
- query_clean.delete()
- query = query.filter(self._base_target_filter(user))
- if query.count():
- continue
- with transaction.atomic():
- q["associated_import"] = import_instance
- try:
- TargetKey.objects.create(**q)
- except IntegrityError:
- pass
-
- def new(self, value):
- return
-
- def add_key(self, obj, value, ishtar_import=None):
- return
-
- def format(self, value):
- origin_value = value
- value = self.prepare(value)
- if not self.strict:
- value = slugify(value)
- if value in self.equiv_dict:
- self.match_table[origin_value] = self.equiv_dict[value] or ""
- return self.equiv_dict[value]
-
-
-class TypeFormater(StrChoiceFormater):
- def __init__(
- self,
- model,
- cli=False,
- defaults=None,
- many_split=False,
- db_target=None,
- import_instance=None,
- ):
- if not defaults:
- defaults = {}
- self.create = True
- self.strict = False
- self.model = model
- self.defaults = defaults
- self.many_split = many_split
- self.db_target = db_target
- self.missings = set()
- self.equiv_dict, self.choices = {}, []
- self.match_table = {}
- self.new_keys = {}
- self.import_instance = import_instance
- if self.import_instance:
- for item in model.objects.all():
- self.choices.append((item.pk, str(item)))
- for key in item.get_keys(current_import=import_instance):
- self.equiv_dict[key] = item
+ for missing in self.missings:
+ q = {"target": self.db_target, "key": missing}
+ query = TargetKey.objects.filter(**q)
+ query_clean = query.filter(
+ associated_import__isnull=True,
+ associated_user__isnull=True,
+ associated_group__isnull=True,
+ is_set=False
+ )
+ if query_clean.count(): # bad keys for this target
+ query_clean.delete()
+ query = query.filter(self._base_target_filter(user))
+ if query.count():
+ continue
+ with transaction.atomic():
+ q["associated_import"] = import_instance
+ try:
+ TargetKey.objects.create(**q)
+ except IntegrityError:
+ pass
@property
def slug(self):
@@ -605,24 +483,6 @@ class TypeFormater(StrChoiceFormater):
slug += f"|{self.many_split}"
return slug
- def prepare(self, value):
- return slugify(str(value).strip(), allow_unicode=True)
-
- def add_key(self, obj, value, ishtar_import=None):
- obj.add_key(slugify(value), force=True, ishtar_import=ishtar_import)
-
- def new(self, value):
- values = copy.copy(self.defaults)
- values["label"] = value
- values["txt_idx"] = slugify(value)
- if "order" in get_all_field_names(self.model):
- order = 1
- q = self.model.objects.values("order").order_by("-order")
- if q.count():
- order = q.all()[0]["order"] or 1
- values["order"] = order
- return self.model.objects.create(**values)
-
class DateFormater(Formater):
def __init__(self, date_formats=None, db_target=None, import_instance=None):
@@ -699,30 +559,21 @@ class FileFormater(Formater):
)
-class StrToBoolean(Formater, ChoiceChecker):
+class StrToBoolean(Formater):
def __init__(
self,
+ import_instance,
+ db_target,
choices=None,
- cli=False,
- strict=False,
- db_target=None,
- import_instance=None,
):
- if not choices:
- choices = {}
- self.dct = copy.copy(choices)
- self.cli = cli
- self.strict = strict
+ self.dct = {}
self.db_target = db_target
self.missings = set()
self.match_table = {}
- self.new_keys = {}
self.import_instance = import_instance
self.init_db_target()
def init_db_target(self, user=None):
- if not self.db_target:
- return
q = self.get_db_target_query(user)
for target_key in q.all():
key = self.prepare(target_key.key)
@@ -733,40 +584,34 @@ class StrToBoolean(Formater, ChoiceChecker):
def prepare(self, value):
value = str(value).strip()
- if not self.strict:
- value = slugify(value, allow_unicode=True)
+ value = slugify(value, allow_unicode=True)
return value
def check(
self,
values,
- output=None,
comment="",
- choose_default=False,
import_instance=None,
user=None,
):
- if (not output or output == "silent") and not choose_default:
- return
for value in values:
value = self.prepare(value)
if value in self.dct:
continue
self.missings.add(value)
- if output == "db" and self.db_target:
- TargetKey = apps.get_model("ishtar_common", "TargetKey")
- for missing in self.missings:
- try:
- q = {
- "target": self.db_target,
- "key": missing,
- "associated_import": import_instance,
- }
- if not TargetKey.objects.filter(**q).count():
- TargetKey.objects.create(**q)
- except IntegrityError:
- pass
+ TargetKey = apps.get_model("ishtar_common", "TargetKey")
+ for missing in self.missings:
+ try:
+ q = {
+ "target": self.db_target,
+ "key": missing,
+ "associated_import": import_instance,
+ }
+ if not TargetKey.objects.filter(**q).count():
+ TargetKey.objects.create(**q)
+ except IntegrityError:
+ pass
def format(self, value):
origin_value = value
@@ -788,7 +633,7 @@ def get_object_from_path(obj, path):
return obj
-class Importer(object):
+class Importer:
SLUG = ""
NAME = ""
DESC = ""
@@ -848,7 +693,6 @@ class Importer(object):
check_col_num=False,
test=False,
history_modifier=None,
- output="silent",
import_instance=None,
conservative_import=False,
):
@@ -894,7 +738,6 @@ class Importer(object):
self._defaults = self.DEFAULTS.copy()
self._pre_import_values = self.PRE_IMPORT_VALUES.copy()
self.history_modifier = history_modifier
- self.output = output
self.debug = []
if not self.history_modifier:
if self.import_instance and self.import_instance.user:
@@ -1009,18 +852,10 @@ class Importer(object):
except cls.DoesNotExist:
pass
- def initialize(self, table, output="silent", choose_default=False, user=None):
+ def initialize(self, table, user=None):
"""
copy vals in columns and initialize formaters
- * output:
- - silent: no associations
- - cli: output by command line interface and stocked in the database
- - db: output on the database with no interactive association
- (further exploitation by web interface)
- - user: associated user
"""
- if output not in ("silent", "cli", "db"):
- raise ValueError("initialize called with a bad output option")
vals = []
for idx_line, line in enumerate(table):
if self.skip_lines > idx_line:
@@ -1047,8 +882,6 @@ class Importer(object):
formater.init(
vals[idx],
- output,
- choose_default=choose_default,
import_instance=self.import_instance,
user=user,
)
@@ -1060,16 +893,13 @@ class Importer(object):
self,
table,
initialize=True,
- choose_default=False,
user=None,
line_to_process=None,
simulate=False,
verbose=False
):
if initialize:
- self.initialize(
- table, self.output, choose_default=choose_default, user=user
- )
+ self.initialize(table, user=user)
self.simulate = simulate
self.line_to_process = line_to_process
return self._importation(table, verbose=verbose)