diff options
| author | Étienne Loks <etienne.loks@iggdrasil.net> | 2026-02-24 15:16:56 +0100 |
|---|---|---|
| committer | Étienne Loks <etienne.loks@iggdrasil.net> | 2026-02-25 10:15:33 +0100 |
| commit | 53b408380649f39880763b35a2a9d24eb7cc014a (patch) | |
| tree | 4a932ebc46229239d7bcb9e21f36ce84efa969c9 | |
| parent | 13e838988e0a32e0ef54c15d90c8df2f92dad853 (diff) | |
| download | Ishtar-53b408380649f39880763b35a2a9d24eb7cc014a.tar.bz2 Ishtar-53b408380649f39880763b35a2a9d24eb7cc014a.zip | |
♻️ imports: simplify, refactor and clean
| -rw-r--r-- | archaeological_operations/tests.py | 14 | ||||
| -rw-r--r-- | ishtar_common/data_importer.py | 320 | ||||
| -rw-r--r-- | ishtar_common/management/commands/process_initialize_item_keys.py | 18 | ||||
| -rw-r--r-- | ishtar_common/models_imports.py | 8 | ||||
| -rw-r--r-- | ishtar_common/tests.py | 26 |
5 files changed, 117 insertions, 269 deletions
diff --git a/archaeological_operations/tests.py b/archaeological_operations/tests.py index 3621469ef..b239b789d 100644 --- a/archaeological_operations/tests.py +++ b/archaeological_operations/tests.py @@ -182,6 +182,17 @@ class ImportTest(BaseImportTest): tg.save() def init_ope_import(self, filename="MCC-operations-example.csv", sep=","): + add = ( + ("fouille-programmée", models.OperationType, "prog_excavation"), + # ("age-du-fer", models.Period, "iron-age"), + ) + add = [] + for key, model, txt_idx in add: + ItemKey.objects.get_or_create( + key=key, + content_type=ContentType.objects.get_for_model(model), + object_id=model.objects.get(txt_idx=txt_idx).pk + ) mcc_operation = ImporterType.objects.get(name="MCC - Opérations") with open( settings.LIB_BASE_PATH + "archaeological_operations/tests/" + filename, @@ -226,7 +237,8 @@ class ImportTest(BaseImportTest): target.save() # target for all users - tgs = list(TargetKey.objects.filter(key="gallo-romain").all()) + q = TargetKey.objects.filter(key="gallo-romain") + tgs = list(q.all()) for tg in tgs[1:]: tg.delete() target2 = tgs[0] diff --git a/ishtar_common/data_importer.py b/ishtar_common/data_importer.py index 8e0671cd8..f3eb79a30 100644 --- a/ishtar_common/data_importer.py +++ b/ishtar_common/data_importer.py @@ -149,7 +149,7 @@ class ImportFormater: return def init( - self, vals, output=None, choose_default=False, import_instance=None, user=None + self, vals, import_instance=None, user=None ): try: lst = iter(self.formater) @@ -159,15 +159,13 @@ class ImportFormater: if formater: formater.check( vals, - output, self.comment, - choose_default=choose_default, import_instance=import_instance, user=user, ) def post_process(self, obj, context, value, owner=None): - raise NotImplemented() + raise NotImplementedError() class ImporterError(Exception): @@ -199,9 +197,7 @@ class Formater: def check( self, values, - output=None, comment="", - choose_default=False, import_instance=None, user=None, ): @@ -237,88 +233,42 @@ class Formater: return q -class ChoiceChecker(object): - def report_new(self, comment): - if not self.new_keys: - return - msg = 'For "%s" these new associations have been made:\n' % comment - sys.stderr.write(msg.encode("utf-8")) - for k in self.new_keys: - msg = '"%s";"%s"\n' % (k, self.new_keys[k]) - sys.stderr.write(msg.encode("utf-8")) - - class UnicodeFormater(Formater): def __init__( self, + import_instance, + db_target, max_length=None, - clean=False, - re_filter=None, - notnull=False, - prefix="", - db_target=None, - import_instance=None, - many_split=None, ): self.max_length = max_length self.db_target = db_target - self.clean = clean - self.re_filter = re_filter - self.notnull = notnull - self.prefix = prefix self.import_instance = import_instance - self.many_split = many_split def format(self, value): try: - if type(value) != str: - value = str(value.strip()) + if not isinstance(value, str): + value = str(value) vals = [] for v in value.split("\n"): v = v.strip() if v: vals.append(v) value = "\n".join(vals) - if self.re_filter: - m = self.re_filter.match(value) - if m: - value = "".join(m.groups()) - if self.clean: - if value.startswith(","): - value = value[1:] - if value.endswith(","): - value = value[:-1] - value = value.replace(", , ", ", ") except UnicodeDecodeError: return if self.max_length and len(value) > self.max_length: raise ValueError( - _( + str(_( '"%(value)s" is too long. The max length is %(length)d ' "characters." - ) - % {"value": value, "length": self.max_length} + )) % {"value": value, "length": self.max_length} ) - if self.notnull and not value: - return - if value: - value = self.prefix + value return value def __str__(self): return f"{self.__class__.__name__}|{self.max_length or 0}" -class BooleanFormater(Formater): - def format(self, value): - value = value.strip().upper() - if value in ("1", "OUI", "VRAI", "YES", "TRUE"): - return True - if value in ("", "0", "NON", "FAUX", "NO", "FALSE"): - return False - raise ValueError(_('"%(value)s" not equal to yes or no') % {"value": value}) - - class FloatFormater(Formater): def format(self, value): value = value.strip().replace(",", ".") @@ -425,53 +375,35 @@ class IntegerFormater(Formater): raise ValueError(_('"%(value)s" is not an integer') % {"value": value}) -class StrChoiceFormater(Formater, ChoiceChecker): +class TypeFormater(Formater): def __init__( self, - choices, - strict=False, - equiv_dict=None, - model=None, - cli=False, - many_split="", - db_target=None, - import_instance=None, + model, + import_instance, + db_target, + many_split=False, ): - if not equiv_dict: - equiv_dict = {} - self.choices = list(choices) - self.strict = strict - self.equiv_dict = copy.deepcopy(equiv_dict) - self.cli = cli + defaults = {} self.model = model + self.defaults = defaults + self.many_split = many_split self.db_target = db_target - self.create = False self.missings = set() - self.new_keys = {} self.match_table = {} - self.many_split = many_split - self.import_instance = None - for key, value in self.choices: - value = str(value) - if not self.strict: - value = slugify(value) - if value not in self.equiv_dict: - v = key - if model and v: - v = model.objects.get(pk=v) - self.equiv_dict[value] = v + self.equiv_dict, self.choices = {}, [] + self.import_instance = import_instance + if import_instance: + for item in model.objects.all(): + self.choices.append((item.pk, str(item))) + for key in item.get_keys(current_import=import_instance): + self.equiv_dict[key] = item self.init_db_target() def init_db_target(self, user=None): - if not self.db_target: - return - q = self.get_db_target_query(user) - for target_key in list(q.all()): key = target_key.key - if not self.strict: - key = slugify(key) + key = slugify(key, allow_unicode=True) if key in self.equiv_dict: continue v = target_key.value @@ -486,114 +418,60 @@ class StrChoiceFormater(Formater, ChoiceChecker): self.equiv_dict[key] = v def prepare(self, value): - return str(value).strip() + return slugify(str(value).strip(), allow_unicode=True) - def _get_choices(self, comment=""): - msgstr = comment + " - " - msgstr += str( - _('Choice for "%s" is not available. ' "Which one is relevant?\n") - ) - idx = -1 - for idx, choice in enumerate(self.choices): - msgstr += "%d. %s\n" % (idx + 1, choice[1]) - idx += 2 - if self.create: - msgstr += str(_("%d. None of the above - create new")) % idx + "\n" - idx += 1 - msgstr += str(_("%d. None of the above - skip")) % idx + "\n" - return msgstr, idx + def add_key(self, obj, value, ishtar_import=None): + obj.add_key(slugify(value), force=True, ishtar_import=ishtar_import) + + def format(self, value): + origin_value = value + value = self.prepare(value) + if value in self.equiv_dict: + self.match_table[origin_value] = self.equiv_dict[value] or "" + return self.equiv_dict[value] def check( self, values, - output=None, comment="", - choose_default=False, import_instance=None, user=None, ): self.init_db_target(user) - if (not output or output == "silent") and not choose_default: - return if self.many_split: new_values = [] r = re.compile(self.many_split) for value in values: new_values += r.split(value) values = new_values + TargetKey = apps.get_model("ishtar_common", "TargetKey") for value in set(values): value = self.prepare(value) if value in self.equiv_dict: continue self.missings.add(value) - if output == "db" and self.db_target: - for missing in self.missings: - q = {"target": self.db_target, "key": missing} - query = TargetKey.objects.filter(**q) - query_clean = query.filter( - associated_import__isnull=True, - associated_user__isnull=True, - associated_group__isnull=True, - is_set=False - ) - if query_clean.count(): # bad keys for this target - query_clean.delete() - query = query.filter(self._base_target_filter(user)) - if query.count(): - continue - with transaction.atomic(): - q["associated_import"] = import_instance - try: - TargetKey.objects.create(**q) - except IntegrityError: - pass - - def new(self, value): - return - - def add_key(self, obj, value, ishtar_import=None): - return - - def format(self, value): - origin_value = value - value = self.prepare(value) - if not self.strict: - value = slugify(value) - if value in self.equiv_dict: - self.match_table[origin_value] = self.equiv_dict[value] or "" - return self.equiv_dict[value] - - -class TypeFormater(StrChoiceFormater): - def __init__( - self, - model, - cli=False, - defaults=None, - many_split=False, - db_target=None, - import_instance=None, - ): - if not defaults: - defaults = {} - self.create = True - self.strict = False - self.model = model - self.defaults = defaults - self.many_split = many_split - self.db_target = db_target - self.missings = set() - self.equiv_dict, self.choices = {}, [] - self.match_table = {} - self.new_keys = {} - self.import_instance = import_instance - if self.import_instance: - for item in model.objects.all(): - self.choices.append((item.pk, str(item))) - for key in item.get_keys(current_import=import_instance): - self.equiv_dict[key] = item + for missing in self.missings: + q = {"target": self.db_target, "key": missing} + query = TargetKey.objects.filter(**q) + query_clean = query.filter( + associated_import__isnull=True, + associated_user__isnull=True, + associated_group__isnull=True, + is_set=False + ) + if query_clean.count(): # bad keys for this target + query_clean.delete() + query = query.filter(self._base_target_filter(user)) + if query.count(): + continue + with transaction.atomic(): + q["associated_import"] = import_instance + try: + TargetKey.objects.create(**q) + except IntegrityError: + pass @property def slug(self): @@ -605,24 +483,6 @@ class TypeFormater(StrChoiceFormater): slug += f"|{self.many_split}" return slug - def prepare(self, value): - return slugify(str(value).strip(), allow_unicode=True) - - def add_key(self, obj, value, ishtar_import=None): - obj.add_key(slugify(value), force=True, ishtar_import=ishtar_import) - - def new(self, value): - values = copy.copy(self.defaults) - values["label"] = value - values["txt_idx"] = slugify(value) - if "order" in get_all_field_names(self.model): - order = 1 - q = self.model.objects.values("order").order_by("-order") - if q.count(): - order = q.all()[0]["order"] or 1 - values["order"] = order - return self.model.objects.create(**values) - class DateFormater(Formater): def __init__(self, date_formats=None, db_target=None, import_instance=None): @@ -699,30 +559,21 @@ class FileFormater(Formater): ) -class StrToBoolean(Formater, ChoiceChecker): +class StrToBoolean(Formater): def __init__( self, + import_instance, + db_target, choices=None, - cli=False, - strict=False, - db_target=None, - import_instance=None, ): - if not choices: - choices = {} - self.dct = copy.copy(choices) - self.cli = cli - self.strict = strict + self.dct = {} self.db_target = db_target self.missings = set() self.match_table = {} - self.new_keys = {} self.import_instance = import_instance self.init_db_target() def init_db_target(self, user=None): - if not self.db_target: - return q = self.get_db_target_query(user) for target_key in q.all(): key = self.prepare(target_key.key) @@ -733,40 +584,34 @@ class StrToBoolean(Formater, ChoiceChecker): def prepare(self, value): value = str(value).strip() - if not self.strict: - value = slugify(value, allow_unicode=True) + value = slugify(value, allow_unicode=True) return value def check( self, values, - output=None, comment="", - choose_default=False, import_instance=None, user=None, ): - if (not output or output == "silent") and not choose_default: - return for value in values: value = self.prepare(value) if value in self.dct: continue self.missings.add(value) - if output == "db" and self.db_target: - TargetKey = apps.get_model("ishtar_common", "TargetKey") - for missing in self.missings: - try: - q = { - "target": self.db_target, - "key": missing, - "associated_import": import_instance, - } - if not TargetKey.objects.filter(**q).count(): - TargetKey.objects.create(**q) - except IntegrityError: - pass + TargetKey = apps.get_model("ishtar_common", "TargetKey") + for missing in self.missings: + try: + q = { + "target": self.db_target, + "key": missing, + "associated_import": import_instance, + } + if not TargetKey.objects.filter(**q).count(): + TargetKey.objects.create(**q) + except IntegrityError: + pass def format(self, value): origin_value = value @@ -788,7 +633,7 @@ def get_object_from_path(obj, path): return obj -class Importer(object): +class Importer: SLUG = "" NAME = "" DESC = "" @@ -848,7 +693,6 @@ class Importer(object): check_col_num=False, test=False, history_modifier=None, - output="silent", import_instance=None, conservative_import=False, ): @@ -894,7 +738,6 @@ class Importer(object): self._defaults = self.DEFAULTS.copy() self._pre_import_values = self.PRE_IMPORT_VALUES.copy() self.history_modifier = history_modifier - self.output = output self.debug = [] if not self.history_modifier: if self.import_instance and self.import_instance.user: @@ -1009,18 +852,10 @@ class Importer(object): except cls.DoesNotExist: pass - def initialize(self, table, output="silent", choose_default=False, user=None): + def initialize(self, table, user=None): """ copy vals in columns and initialize formaters - * output: - - silent: no associations - - cli: output by command line interface and stocked in the database - - db: output on the database with no interactive association - (further exploitation by web interface) - - user: associated user """ - if output not in ("silent", "cli", "db"): - raise ValueError("initialize called with a bad output option") vals = [] for idx_line, line in enumerate(table): if self.skip_lines > idx_line: @@ -1047,8 +882,6 @@ class Importer(object): formater.init( vals[idx], - output, - choose_default=choose_default, import_instance=self.import_instance, user=user, ) @@ -1060,16 +893,13 @@ class Importer(object): self, table, initialize=True, - choose_default=False, user=None, line_to_process=None, simulate=False, verbose=False ): if initialize: - self.initialize( - table, self.output, choose_default=choose_default, user=user - ) + self.initialize(table, user=user) self.simulate = simulate self.line_to_process = line_to_process return self._importation(table, verbose=verbose) diff --git a/ishtar_common/management/commands/process_initialize_item_keys.py b/ishtar_common/management/commands/process_initialize_item_keys.py index 4ccaadc0a..e493066f1 100644 --- a/ishtar_common/management/commands/process_initialize_item_keys.py +++ b/ishtar_common/management/commands/process_initialize_item_keys.py @@ -16,12 +16,13 @@ from ishtar_common.utils import get_log_time, get_percent, get_eta, BColors def write_output(base_lbl, idx, total, ref_time): lbl = f"\r{BColors.OKBLUE}[{get_percent(idx, total)}] {base_lbl} {idx + 1}/{total}" - lbl += f" ({get_eta(idx, total, ref_time, datetime.datetime.now())} left){BColors.ENDC}" + lbl += f" ({get_eta(idx, total, ref_time, datetime.datetime.now())} left)" + lbl += "{BColors.ENDC}" sys.stdout.write(lbl) sys.stdout.flush() -def migrate_item_key(clean_old=False): +def migrate_item_key(clean_old=False, quiet=False): """ clean_old=False: set to True to migrate from non unicode to unicode """ @@ -34,13 +35,14 @@ def migrate_item_key(clean_old=False): if any(1 for attr in ("available", "txt_idx", "comment", "available") if not hasattr(model, attr)): continue # not a general type - content_type = ContentType.objects.get(app_label=app, - model=model._meta.model_name) + content_type, __ = ContentType.objects.get_or_create( + app_label=app, model=model._meta.model_name) ref_time = datetime.datetime.now() q = model.objects nb = q.count() for idx, item in enumerate(q.all()): - write_output(model._meta.verbose_name, idx, nb, ref_time) + if not quiet: + write_output(model._meta.verbose_name, idx, nb, ref_time) if clean_old: ItemKey.objects.filter( key=slugify(item.label), @@ -52,7 +54,8 @@ def migrate_item_key(clean_old=False): importer_type=None, ishtar_import=None, user=None, group=None) lbl = f"\r{BColors.OKGREEN}* {model._meta.verbose_name} - OK{SPACE}{BColors.ENDC}\n" - sys.stdout.write(lbl) + if not quiet: + sys.stdout.write(lbl) class Command(BaseCommand): @@ -73,5 +76,4 @@ class Command(BaseCommand): if not quiet: sys.stdout.write(f"{BColors.OKGREEN}[{get_log_time()}] Processing{BColors.ENDC}") settings.USE_BACKGROUND_TASK = False - migrate_item_key(clean_old=options["clean_old"]) - sys.exit(1) + migrate_item_key(clean_old=options["clean_old"], quiet=quiet) diff --git a/ishtar_common/models_imports.py b/ishtar_common/models_imports.py index 6db0e1d1d..e576d18de 100644 --- a/ishtar_common/models_imports.py +++ b/ishtar_common/models_imports.py @@ -1419,12 +1419,12 @@ class FormaterType(models.Model): "is not in valid.".format(self.options) ) return - return TypeFormater(model, **kwargs) elif self.formater_type == "UnicodeFormater": if self.options: try: - return UnicodeFormater(int(self.options.strip()), **kwargs) + kwargs["max_length"] = int(self.options.strip()) + return UnicodeFormater(**kwargs) except ValueError: pass return UnicodeFormater(**kwargs) @@ -2799,9 +2799,7 @@ class Import(BaseImport): self.user = user self.save() try: - self.get_importer_instance().initialize( - self.data_table, user=user, output="db" - ) + self.get_importer_instance().initialize(self.data_table, user=user) except ImporterError as e: if session_key: put_session_message(session_key, e.msg, "danger") diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py index 304353efc..022783f64 100644 --- a/ishtar_common/tests.py +++ b/ishtar_common/tests.py @@ -54,11 +54,11 @@ from django.db import transaction from django.db.models.fields import BooleanField from django.db.models.fields.related import ForeignKey from django.db.utils import IntegrityError -from django.template.defaultfilters import slugify from django.test import TestCase as BaseTestCase from django.test.client import Client from django.test.runner import DiscoverRunner from django.utils import timezone +from django.utils.text import slugify from django.utils.translation import gettext_lazy as _ from django.urls import reverse @@ -3358,7 +3358,8 @@ class ImportTest(BaseImportTest): ot = models.OrganizationType.objects.create(label=label) self.assertEqual( models.ItemKey.objects.filter( - object_id=ot.pk, key=slugify(label), content_type=content_type + object_id=ot.pk, key=slugify(label, allow_unicode=True), + content_type=content_type ).count(), 1, ) @@ -3366,24 +3367,26 @@ class ImportTest(BaseImportTest): ot_2 = models.OrganizationType.objects.create(label=label_2) self.assertEqual( models.ItemKey.objects.filter( - object_id=ot_2.pk, key=slugify(label_2), content_type=content_type + object_id=ot_2.pk, key=slugify(label_2, allow_unicode=True), + content_type=content_type ).count(), 1, ) # replace key - ot_2.add_key(slugify(label), force=True) + ot_2.add_key(slugify(label, allow_unicode=True), force=True) # one key point to only one item self.assertEqual( models.ItemKey.objects.filter( - key=slugify(label), content_type=content_type + key=slugify(label, allow_unicode=True), content_type=content_type ).count(), 1, ) # this key point to the right item self.assertEqual( models.ItemKey.objects.filter( - object_id=ot_2.pk, key=slugify(label), content_type=content_type + object_id=ot_2.pk, key=slugify(label, allow_unicode=True), + content_type=content_type ).count(), 1, ) @@ -3391,12 +3394,13 @@ class ImportTest(BaseImportTest): # modification label_3 = "Yop" ot_2.label = label_3 - ot_2.txt_idx = slugify(label_3) + ot_2.txt_idx = slugify(label_3, allow_unicode=True) ot_2.save() # old label not referenced anymore self.assertEqual( models.ItemKey.objects.filter( - object_id=ot_2.pk, key=slugify(label_2), content_type=content_type + object_id=ot_2.pk, key=slugify(label_2, allow_unicode=True), + content_type=content_type ).count(), 0, ) @@ -3404,13 +3408,15 @@ class ImportTest(BaseImportTest): # new key is here self.assertEqual( models.ItemKey.objects.filter( - object_id=ot_2.pk, key=slugify(label), content_type=content_type + object_id=ot_2.pk, key=slugify(label, allow_unicode=True), + content_type=content_type ).count(), 1, ) self.assertEqual( models.ItemKey.objects.filter( - object_id=ot_2.pk, key=slugify(label_3), content_type=content_type + object_id=ot_2.pk, key=slugify(label_3, allow_unicode=True), + content_type=content_type ).count(), 1, ) |
