#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2013-2025 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. from collections.abc import Iterable import copy import datetime import os import logging import re import sys import zipfile from django.apps import apps from django.conf import settings from django.contrib.auth.models import User from django.contrib.contenttypes.models import ContentType from django.contrib.gis.geos.error import GEOSException from django.core.exceptions import FieldDoesNotExist, FieldError, MultipleObjectsReturned from django.core.files import File from django.db import IntegrityError, DatabaseError, transaction from django.db.models import Q from django.template.defaultfilters import slugify from django.utils.translation import gettext_lazy as _ from ishtar_common.utils import ( BColors, get_all_field_names, get_current_profile, get_file_from_link, IMPORT_GEOMETRY, update_data, ) NEW_LINE_BREAK = "#####@@@#####" def post_importer_action(func): def wrapper(self, context, value): return func(self, context, value) wrapper.importer_trigger = "post" return wrapper def pre_importer_action(func): def wrapper(self, context, value): return func(self, context, value) wrapper.importer_trigger = "pre" return wrapper class ImportFormater: def __init__( self, field_name, formater=None, required=True, through=None, through_key=None, through_dict=None, through_unicity_keys=None, duplicate_fields=None, regexp=None, regexp_formater_args=None, force_value=None, post_processing=False, concat=False, concat_str=False, comment="", force_new=None, export_field_name=None, value_format=None, label="", ): self.field_name = field_name if export_field_name: self.export_field_name = export_field_name else: self.export_field_name = field_name self.formater = formater self.required = required self.through = through self.through_key = through_key self.through_dict = through_dict self.through_unicity_keys = through_unicity_keys self.duplicate_fields = duplicate_fields or [] self.regexp = regexp self.value_format = value_format self.regexp_formater_args = regexp_formater_args or [] # write this value even if a value exists self.force_value = force_value # post process after import self.post_processing = post_processing # concatenate with existing value self.concat = concat self.concat_str = concat_str self.comment = comment self.force_new = force_new self.label = label def reinit_db_target(self, db_target, nb=0, user=None): if not self.formater: return if type(db_target) in (list, tuple): db_target = db_target[nb] if type(self.formater) not in (list, tuple): self.formater.db_target = db_target self.formater.init_db_target(user=user) else: for idx, formater in enumerate(self.formater): formater.db_target = db_target formater.init_db_target(user=user) def init_db_target(self, user=None): pass def __str__(self): return self.field_name def report_succes(self, *args): return def report_error(self, *args): return def init( self, vals, output=None, choose_default=False, import_instance=None, user=None ): try: lst = iter(self.formater) except TypeError: lst = [self.formater] for formater in lst: if formater: formater.check( vals, output, self.comment, choose_default=choose_default, import_instance=import_instance, user=user, ) def post_process(self, obj, context, value, owner=None): raise NotImplemented() class ImporterError(Exception): STANDARD = "S" HEADER = "H" def __init__(self, message, type="S"): self.msg = message self.type = type def __str__(self): return str(self.msg) class Formater: def __init__(self, *args, **kwargs): self.db_target = kwargs.get("db_target", None) def format(self, value): return value def __str__(self): return self.__class__.__name__ @property def slug(self): return str(self) def check( self, values, output=None, comment="", choose_default=False, import_instance=None, user=None, ): return def init_db_target(self, user=None): pass def _base_target_filter(self, user=None): # set for all users q_or = ( Q(associated_import__isnull=True) & Q(associated_user__isnull=True) & Q(associated_group__isnull=True) ) if hasattr(self, "import_instance") and self.import_instance: # set for current import q_or = q_or | Q(associated_import=self.import_instance) if self.import_instance.associated_group: # set for associated group q_or = q_or | Q(associated_group=self.import_instance.associated_group) if user: # set for current user q_or = q_or | Q(associated_user=user) return q_or def get_db_target_query(self, user=None): if not self.db_target: return q = self.db_target.keys.filter(is_set=True) q_or = self._base_target_filter(user) q = q.filter(q_or) return q class ChoiceChecker(object): def report_new(self, comment): if not self.new_keys: return msg = 'For "%s" these new associations have been made:\n' % comment sys.stderr.write(msg.encode("utf-8")) for k in self.new_keys: msg = '"%s";"%s"\n' % (k, self.new_keys[k]) sys.stderr.write(msg.encode("utf-8")) class UnicodeFormater(Formater): def __init__( self, max_length=None, clean=False, re_filter=None, notnull=False, prefix="", db_target=None, import_instance=None, many_split=None, ): self.max_length = max_length self.db_target = db_target self.clean = clean self.re_filter = re_filter self.notnull = notnull self.prefix = prefix self.import_instance = import_instance self.many_split = many_split def format(self, value): try: if type(value) != str: value = str(value.strip()) vals = [] for v in value.split("\n"): v = v.strip() if v: vals.append(v) value = "\n".join(vals) if self.re_filter: m = self.re_filter.match(value) if m: value = "".join(m.groups()) if self.clean: if value.startswith(","): value = value[1:] if value.endswith(","): value = value[:-1] value = value.replace(", , ", ", ") except UnicodeDecodeError: return if self.max_length and len(value) > self.max_length: raise ValueError( _( '"%(value)s" is too long. The max length is %(length)d ' "characters." ) % {"value": value, "length": self.max_length} ) if self.notnull and not value: return if value: value = self.prefix + value return value def __str__(self): return f"{self.__class__.__name__}|{self.max_length or 0}" class BooleanFormater(Formater): def format(self, value): value = value.strip().upper() if value in ("1", "OUI", "VRAI", "YES", "TRUE"): return True if value in ("", "0", "NON", "FAUX", "NO", "FALSE"): return False raise ValueError(_('"%(value)s" not equal to yes or no') % {"value": value}) class FloatFormater(Formater): def format(self, value): value = value.strip().replace(",", ".") value = value.replace(" ", "") if not value: return try: return float(value) except ValueError: raise ValueError(_('"%(value)s" is not a float') % {"value": value}) class InseeFormater(Formater): """ Formater for "code INSEE" (statistic institute ine France) The syntax "CodeINSEE-Year" is accepted (Ishtar trick) in order to manage old INSEE (year is the date of creation) """ ERROR = _('"{value}" is not an appropriate INSEE code') def format(self, value): value = value.strip() exp = value.split("-") code = exp[0] try: int(code) except ValueError: raise ValueError(str(self.ERROR).format(value)) while len(code) < 5: code = "0" + code if len(exp) > 2: raise ValueError(str(self.ERROR).format(value)) elif len(exp) == 1: return code try: datetime.datetime.strptime(exp[1], "%Y") except ValueError: raise ValueError(str(self.ERROR).format(value)) return code + "-" + exp[1] class UpperCaseFormater(UnicodeFormater): """ To upper case """ def format(self, value): value = super(UpperCaseFormater, self).format(value) if not value: return value return value.upper() class LowerCaseFormater(UnicodeFormater): """ To lower case """ def format(self, value): value = super(LowerCaseFormater, self).format(value) if not value: return value return value.lower() class YearFormater(Formater): def format(self, value): value = value.strip() if not value: return try: value = int(value) if value <= 0 or value > (datetime.date.today().year + 30): raise ValueError() except ValueError: raise ValueError(_('"%(value)s" is not a valid date') % {"value": value}) return value class YearNoFuturFormater(Formater): def format(self, value): value = value.strip() if not value: return try: value = int(value) if value <= 0 or value > datetime.date.today().year: raise ValueError() except ValueError: raise ValueError(_('"%(value)s" is not a valid date') % {"value": value}) return value class IntegerFormater(Formater): def format(self, value): value = value.strip() value = value.replace(" ", "") if not value: return try: return int(value) except ValueError: raise ValueError(_('"%(value)s" is not an integer') % {"value": value}) class StrChoiceFormater(Formater, ChoiceChecker): def __init__( self, choices, strict=False, equiv_dict=None, model=None, cli=False, many_split="", db_target=None, import_instance=None, ): if not equiv_dict: equiv_dict = {} self.choices = list(choices) self.strict = strict self.equiv_dict = copy.deepcopy(equiv_dict) self.cli = cli self.model = model self.db_target = db_target self.create = False self.missings = set() self.new_keys = {} self.match_table = {} self.many_split = many_split self.import_instance = None for key, value in self.choices: value = str(value) if not self.strict: value = slugify(value) if value not in self.equiv_dict: v = key if model and v: v = model.objects.get(pk=v) self.equiv_dict[value] = v self.init_db_target() def init_db_target(self, user=None): if not self.db_target: return q = self.get_db_target_query(user) for target_key in list(q.all()): key = target_key.key if not self.strict: key = slugify(key) if key in self.equiv_dict: continue v = target_key.value if self.model and v and type(v) in (int, str): q = self.model.objects.filter(txt_idx=v) if not q.count(): q = self.model.objects.filter(pk=v) if not q.count(): target_key.delete() continue v = q.all()[0] self.equiv_dict[key] = v def prepare(self, value): return str(value).strip() def _get_choices(self, comment=""): msgstr = comment + " - " msgstr += str( _('Choice for "%s" is not available. ' "Which one is relevant?\n") ) idx = -1 for idx, choice in enumerate(self.choices): msgstr += "%d. %s\n" % (idx + 1, choice[1]) idx += 2 if self.create: msgstr += str(_("%d. None of the above - create new")) % idx + "\n" idx += 1 msgstr += str(_("%d. None of the above - skip")) % idx + "\n" return msgstr, idx def check( self, values, output=None, comment="", choose_default=False, import_instance=None, user=None, ): self.init_db_target(user) if (not output or output == "silent") and not choose_default: return if self.many_split: new_values = [] r = re.compile(self.many_split) for value in values: new_values += r.split(value) values = new_values TargetKey = apps.get_model("ishtar_common", "TargetKey") for value in set(values): value = self.prepare(value) if value in self.equiv_dict: continue self.missings.add(value) if output == "db" and self.db_target: for missing in self.missings: q = {"target": self.db_target, "key": missing} query = TargetKey.objects.filter(**q) query_clean = query.filter( associated_import__isnull=True, associated_user__isnull=True, associated_group__isnull=True, is_set=False ) if query_clean.count(): # bad keys for this target query_clean.delete() query = query.filter(self._base_target_filter(user)) if query.count(): continue with transaction.atomic(): q["associated_import"] = import_instance try: TargetKey.objects.create(**q) except IntegrityError: pass def new(self, value): return def add_key(self, obj, value, ishtar_import=None): return def format(self, value): origin_value = value value = self.prepare(value) if not self.strict: value = slugify(value) if value in self.equiv_dict: self.match_table[origin_value] = self.equiv_dict[value] or "" return self.equiv_dict[value] class TypeFormater(StrChoiceFormater): def __init__( self, model, cli=False, defaults=None, many_split=False, db_target=None, import_instance=None, ): if not defaults: defaults = {} self.create = True self.strict = False self.model = model self.defaults = defaults self.many_split = many_split self.db_target = db_target self.missings = set() self.equiv_dict, self.choices = {}, [] self.match_table = {} self.new_keys = {} self.import_instance = import_instance if self.import_instance: for item in model.objects.all(): self.choices.append((item.pk, str(item))) for key in item.get_keys(current_import=import_instance): self.equiv_dict[key] = item @property def slug(self): slug = "TypeFormater" if not self.model: return slug return slug + f"|{self.model._meta.app_label}|{self.model._meta.model_name}" def prepare(self, value): return slugify(str(value).strip()) def add_key(self, obj, value, ishtar_import=None): obj.add_key(slugify(value), force=True, ishtar_import=ishtar_import) def new(self, value): values = copy.copy(self.defaults) values["label"] = value values["txt_idx"] = slugify(value) if "order" in get_all_field_names(self.model): order = 1 q = self.model.objects.values("order").order_by("-order") if q.count(): order = q.all()[0]["order"] or 1 values["order"] = order return self.model.objects.create(**values) class DateFormater(Formater): def __init__(self, date_formats=None, db_target=None, import_instance=None): if not date_formats: date_formats = ["%d/%m/%Y"] self.date_formats = date_formats if type(date_formats) not in (list, tuple): self.date_formats = [self.date_formats] self.db_target = db_target self.import_instance = import_instance def format(self, value): value = value.strip() if not value: return for date_format in self.date_formats: try: return datetime.datetime.strptime(value, date_format).date() except ValueError: continue raise ValueError(_('"%(value)s" is not a valid date') % {"value": value}) class FileFormater(Formater): need_archive = True def format(self, value, archive): value = value.strip() if not value: return if isinstance(archive, str) and (archive.startswith("http://") or archive.startswith("https://")): return self._format_url(value, archive) return self._format_zip(value, archive) def _format_url(self, value, link): if not link.endswith("/"): link += "/" full_link = link + value try: filename, tmp_file = get_file_from_link(full_link) except ValueError: raise ValueError( _('"%(full_link)s" is not a valid path') % {"full_link": full_link} ) my_file = File(tmp_file, name=filename) # manually set the file size because of an issue with TempFile my_file.size = os.stat(tmp_file.name).st_size return my_file def _format_zip(self, value, archive): value = value.strip().replace("\\", "/") items = value.replace("/", "_").split(".") base_dir = settings.MEDIA_ROOT + "imported" if not os.path.isdir(base_dir): os.mkdir(base_dir) filename = base_dir + os.sep + ".".join(items[:-1]) + "." + items[-1] try: with zipfile.ZipFile(archive) as zp: with open(filename, "wb") as f: with zp.open(value) as z: f.write(z.read()) f = open(filename, "rb") my_file = File(f) # manually set the file size because of an issue with TempFile my_file.size = os.stat(filename).st_size return my_file except KeyError: raise ValueError( _('"%(value)s" is not a valid path for the given archive') % {"value": value} ) class StrToBoolean(Formater, ChoiceChecker): def __init__( self, choices=None, cli=False, strict=False, db_target=None, import_instance=None, ): if not choices: choices = {} self.dct = copy.copy(choices) self.cli = cli self.strict = strict self.db_target = db_target self.missings = set() self.match_table = {} self.new_keys = {} self.import_instance = import_instance self.init_db_target() def init_db_target(self, user=None): if not self.db_target: return q = self.get_db_target_query(user) for target_key in q.all(): key = self.prepare(target_key.key) if key in self.dct: continue v = target_key.format() self.dct[key] = v def prepare(self, value): value = str(value).strip() if not self.strict: value = slugify(value) return value def check( self, values, output=None, comment="", choose_default=False, import_instance=None, user=None, ): if (not output or output == "silent") and not choose_default: return for value in values: value = self.prepare(value) if value in self.dct: continue self.missings.add(value) if output == "db" and self.db_target: TargetKey = apps.get_model("ishtar_common", "TargetKey") for missing in self.missings: try: q = { "target": self.db_target, "key": missing, "associated_import": import_instance, } if not TargetKey.objects.filter(**q).count(): TargetKey.objects.create(**q) except IntegrityError: pass def format(self, value): origin_value = value value = self.prepare(value) if value in self.dct: val = self.dct[value] and "True" or "False" self.match_table[origin_value] = _(val) return self.dct[value] logger = logging.getLogger(__name__) def get_object_from_path(obj, path): for k in path.split("__")[:-1]: if not hasattr(obj, k): return obj = getattr(obj, k) return obj class Importer(object): SLUG = "" NAME = "" DESC = "" TYPE = "" MAIN_GEO = False LINE_FORMAT = [] OBJECT_CLS = None DEBUG = False UNICITY_KEYS = [] # if set only models inside this list can be created MODEL_CREATION_LIMIT = [] DEFAULTS = {} PRE_IMPORT_VALUES = {} # values from a form before the import IGNORE_ERRORS = tuple() ERRORS = { "header_check": _( "The given file is not correct. Check the file " "format. If you use a CSV file: check that column separator " "and encoding are similar to the ones used by the reference " "file." ), "too_many_cols": _( "Too many cols (%(user_col)d) when " "maximum is %(ref_col)d" ), "no_data": _("No data provided"), "value_required": _("Value is required"), "not_enough_cols": _("At least %d columns must be filled"), "regex_not_match": _("The regexp doesn't match."), "improperly_configured": _( "Forced creation is set for model {} but this model is not in the " "list of models allowed to be created." ), "does_not_exist_in_db": _( "{} with values {} doesn't exist in the " "database. Create it first or fix your source file." ), "gis_missing_obj": _( "Related object not found. Can not create the geographic item. Context: {}." ) } def _get_improperly_conf_error(self, model): cls_name = str(model._meta.verbose_name) return ImporterError(str(self.ERRORS["improperly_configured"]).format(cls_name)) def _get_does_not_exist_in_db_error(self, model, data): cls_name = str(model._meta.verbose_name) values = ", ".join(["{}: {}".format(k, data[k]) for k in data]) raise ImporterError( str(self.ERRORS["does_not_exist_in_db"]).format(cls_name, values) ) def __init__( self, skip_lines=0, reference_header=None, check_col_num=False, test=False, history_modifier=None, output="silent", import_instance=None, conservative_import=False, ): """ * skip_line must be set if the data provided has got headers lines. * a reference_header can be provided to perform a data compliance check. It can be useful to warn about bad parsing. * test doesn't write in the database """ self.skip_lines = skip_lines self.reference_header = reference_header self.test = test self.errors = [] # list of (line, col, message) self.validity = [] # list of (line, col, message) # list used for simulation self.simulate = False self.new_objects, self.updated_objects = [], [] self.ambiguous_objects, self.not_find_objects = [], [] self.number_updated = 0 self.number_created = 0 self.check_col_num = check_col_num self.line_format = copy.copy(self.LINE_FORMAT) self.import_instance = import_instance self.archive = None self.current_csv_line = None self.conservative_import = conservative_import # for a conservative_import UNICITY_KEYS should be defined if self.conservative_import and not bool(self.UNICITY_KEYS): raise ValueError("A conservative import should have unicity key defined") self.DB_TARGETS = {} self.match_table = {} self.concats = set() self.concat_str = {} self.to_be_close = [] if import_instance: imported_images = import_instance.get_imported_images() if imported_images: self.archive = imported_images elif import_instance.imported_media_link: self.archive = import_instance.imported_media_link self._defaults = self.DEFAULTS.copy() self._pre_import_values = self.PRE_IMPORT_VALUES.copy() self.history_modifier = history_modifier self.output = output self.debug = [] if not self.history_modifier: if self.import_instance and self.import_instance.user: self.history_modifier = self.import_instance.user.user_ptr else: # import made by the CLI: get the first admin self.history_modifier = User.objects.filter(is_superuser=True).order_by( "pk" )[0] def post_processing(self, idx_line, item): # force django based post-processing for the item item = item.__class__.objects.get(pk=item.pk) for cls, func, context, value in self._item_post_processing: context["import_object"] = self.import_instance if cls != item.__class__: # try to get associated item current_item = None for id_key in ("id", "pk", "external_id"): if context.get(id_key, None): try: current_item = cls.objects.get(**{id_key: context[id_key]}) except (cls.DoesNotExist, MultipleObjectsReturned) as __: break if not current_item: self.errors.append( (idx_line, None, str(_("Item {} with values: {} not identified for post-treatment - need a non ambiguous key")).format( cls.__name__, str(context) )) ) continue else: current_item = item try: f = getattr(current_item, func) returned = f(context, value) if returned and not getattr(f, "_no_repost_save", False): if not isinstance(returned, Iterable): returned = [returned] for rel in returned: self._add_to_post_save(rel.__class__, rel.pk, idx_line) except IntegrityError as e: self.errors.append((idx_line, None, str(e))) except ImporterError as import_error: msg = str(import_error) if not any(1 for error in self.IGNORE_ERRORS if error in msg): self.errors.append((idx_line, None, msg)) return item def post_import(self, verbose=False): related_list = {} if self.import_instance and not self.line_to_process: self.import_instance.state = "PP" self.import_instance.imported_line_numbers = "" self.import_instance.save() self.timestamp = int(datetime.datetime.now().timestamp()) items = self.post_save_items.items() start = datetime.datetime.now() total = len(items) for cls_pk, idx_line in items: if verbose: txt = BColors.OKBLUE + f"\r\t- post-import: {idx_line + 1}/{total}" left = self._get_eta(idx_line, total, start) if left: txt += f" ({left} seconds left)" txt += BColors.ENDC sys.stdout.write(txt) sys.stdout.flush() if self.import_instance and not self.line_to_process: self.import_instance.add_imported_line(idx_line) cls, pk = cls_pk # force django based post-processing for the item try: item = cls.objects.get(pk=pk) except cls.DoesNotExist: continue if cls != self.OBJECT_CLS: cls._no_down_model_update = True item._timestamp = self.timestamp item._queue = "low_priority" item.save() if hasattr(item, "post_save_geo"): # force geo recheck - needed for background task item = cls.objects.get(pk=pk) item._queue = "low_priority" item.post_save_geo() if hasattr(item, "RELATED_POST_PROCESS"): for related_key in item.RELATED_POST_PROCESS: for related in getattr(item, related_key).all(): k = (related.__class__, related.pk) if k not in related_list: related_list[k] = idx_line if hasattr(item, "fix"): # post save/m2m specific fix item.fix() if verbose: elapsed = datetime.datetime.now() - start txt = BColors.OKBLUE + f"\r\t- import: {total} items post treated in {elapsed}\n" txt += BColors.ENDC sys.stdout.write(txt) sys.stdout.flush() for cls, pk in related_list.keys(): try: item = cls.objects.get(pk=pk) item._timestamp = self.timestamp item._queue = "low_priority" item.save() if hasattr(item, "fix"): # post save/m2m specific fix item.fix() except cls.DoesNotExist: pass def initialize(self, table, output="silent", choose_default=False, user=None): """ copy vals in columns and initialize formaters * output: - silent: no associations - cli: output by command line interface and stocked in the database - db: output on the database with no interactive association (further exploitation by web interface) - user: associated user """ if output not in ("silent", "cli", "db"): raise ValueError("initialize called with a bad output option") vals = [] for idx_line, line in enumerate(table): if self.skip_lines > idx_line: continue for idx_col, val in enumerate(line): if idx_col >= len(self.line_format): break if idx_col >= len(vals): vals.append([]) vals[idx_col].append(val) for idx, formater in enumerate(self.line_format): if formater and idx < len(vals): formater.import_instance = self.import_instance if self.DB_TARGETS: field_names = formater.field_name if type(field_names) not in (list, tuple): field_names = [field_names] db_targets = [] for field_name in field_names: db_targets.append( self.DB_TARGETS["{}-{}".format(idx + 1, field_name)] ) formater.reinit_db_target(db_targets, user=user) formater.init( vals[idx], output, choose_default=choose_default, import_instance=self.import_instance, user=user, ) def get_formaters(self): return self.line_format def importation( self, table, initialize=True, choose_default=False, user=None, line_to_process=None, simulate=False, verbose=False ): if initialize: self.initialize( table, self.output, choose_default=choose_default, user=user ) self.simulate = simulate self.line_to_process = line_to_process return self._importation(table, verbose=verbose) def get_current_values(self, obj): return obj @classmethod def _field_name_to_data_dict( cls, field_name, value, data, force_value=False, concat=False, concat_str="", force_new=False, ): field_names = field_name if type(field_names) not in (list, tuple): field_names = [field_name] for field_name in field_names: keys = field_name.split("__") current_data = data for idx, key in enumerate(keys): if idx == (len(keys) - 1): # last if concat: if isinstance(value, (list, tuple)): if key not in current_data: current_data[key] = [] if not value: continue current_data[key] += value else: if key not in current_data: current_data[key] = "" if not value: continue current_data[key] = ( (str(current_data[key]) + (concat_str or "")) if current_data[key] else "" ) current_data[key] += str(value) elif force_value and value: if concat_str and key in current_data and current_data[key]: current_data[key] = ( str(current_data[key]) + concat_str + str(value) ) else: current_data[key] = value elif key not in current_data or not current_data[key]: current_data[key] = value elif concat_str: current_data[key] = ( str(current_data[key]) + concat_str + str(value) ) if force_new: current_data["__force_new"] = True elif key not in current_data: current_data[key] = {} current_data = current_data[key] return data def _get_eta(self, idx_line, total, start): left = None if idx_line > 10: elapsed = datetime.datetime.now() - start time_by_item = elapsed / idx_line if time_by_item: left = ((total - idx_line) * time_by_item).seconds return left def _importation(self, table, verbose=False): self.match_table = {} table = list(table) if not table or not table[0]: raise ImporterError(self.ERRORS["no_data"], ImporterError.HEADER) if self.check_col_num and len(table[0]) > len(self.line_format): raise ImporterError( self.ERRORS["too_many_cols"] % {"user_col": len(table[0]), "ref_col": len(self.line_format)} ) self.errors = [] self.validity = [] # a dict with (cls, item.pk) key and import line number as value -> mostly used as an ordered set self.post_save_items = {} self.number_imported = 0 idx_last_col = 0 # index of the last required column for idx_last_col, formater in enumerate(reversed(self.line_format)): if formater and formater.required: break else: idx_last_col += 1 # min col number to be filled self.min_col_number = len(self.line_format) - idx_last_col # check the conformity with the reference header if ( self.reference_header and self.skip_lines and self.reference_header != table[0] ): raise ImporterError(self.ERRORS["header_check"], type=ImporterError.HEADER) self.now = datetime.datetime.now() start = datetime.datetime.now() total = len(table) results = [] for idx_line, line in enumerate(table): self.idx_line = idx_line if verbose: left = self._get_eta(idx_line, total, start) txt = BColors.OKBLUE + f"\r\t- import: {idx_line + 1}/{total}" if left: txt += f" ({left} seconds left)" sys.stdout.write(txt + BColors.ENDC) sys.stdout.flush() if self.line_to_process is not None: if self.line_to_process != idx_line: continue if idx_line > self.line_to_process: return results try: results.append(self._line_processing(idx_line, line)) except ImporterError as import_error: msg = str(import_error) if not any(1 for error in self.IGNORE_ERRORS if error in msg): self.errors.append((idx_line, None, msg)) if verbose: elapsed = datetime.datetime.now() - start txt = BColors.OKBLUE + f"\r\t- import: {total} items imported in {elapsed}\n" sys.stdout.write(txt + BColors.ENDC) sys.stdout.flush() self.post_import(verbose=verbose) for item in self.to_be_close: item.close() return results def _add_to_post_save(self, cls, pk, idx_line): post_save_k = (cls, pk) c_idx_line = idx_line if post_save_k in self.post_save_items: c_idx_line = self.post_save_items.pop( post_save_k ) # change the order self.post_save_items[post_save_k] = c_idx_line def _create_item(self, cls, dct, idx_line): obj = cls(**dct) obj._no_post_save = True # delayed at the end of the import if hasattr(obj, "no_post_process"): obj.no_post_process(history=True) obj._queue = "low_priority" obj.save() self._add_to_post_save(cls, obj.pk, idx_line) return obj def _manage_geodata(self, default_srs, geodata, data, key="geodata"): if key not in data: # no GIS information provided return geodata geodata.update(data.pop(key)) # manage geodata SRS has_srs_field = False for gk in geodata: if gk.startswith("spatial_reference_system") and geodata[gk]: has_srs_field = True if isinstance(geodata[gk], dict) and "srid" in geodata[gk]: srid = geodata[gk]["srid"] if isinstance(srid, (str, int)): SpatialReferenceSystem = apps.get_model("ishtar_common", "SpatialReferenceSystem") q = SpatialReferenceSystem.objects.filter(srid=geodata[gk]["srid"]) if q.count(): geodata[gk] = q.all()[0] break if default_srs and not has_srs_field: geodata["spatial_reference_system"] = default_srs return geodata def _line_processing(self, idx_line, line): self.timestamp = int(datetime.datetime.now().timestamp()) for item in self.to_be_close: item.close() self.to_be_close = [] self.idx_line = idx_line if self.skip_lines > idx_line: return if not line: self.validity.append([]) return if ( not self.simulate and self.import_instance and not self.import_instance.has_changes(idx_line) ): self.validity.append(line) return self._throughs = [] # list of (formater, value) self._post_processing = [] # list of (formater, value) self._item_post_processing = [] # cls, attribute, data, value data = {} self.current_csv_line = None # raw line for simulation if self.simulate: self.current_csv_line = line n = datetime.datetime.now() logger.debug("%s - Processing line %d" % (str(n - self.now), idx_line)) self.now = n n2 = n self.c_errors = False c_row = [] idx_col = 0 for idx_col, val in enumerate(line): try: data = self._row_processing(c_row, idx_col, idx_line, val, data) # nosec: no catch to force continue processing of lines except: # nosec pass data = update_data(self._pre_import_values, data) # put default values only if relevant for mandatory_keys, defaults in self._defaults: test_dict = data.copy() nok = False for k in mandatory_keys: # test have keys if k not in test_dict: nok = True break test_dict = test_dict[k] if nok or (not isinstance(test_dict, dict) and not test_dict) or ( isinstance(test_dict, dict) and not any(1 for k in test_dict if test_dict[k]) # test empty dict ): continue data = update_data(defaults, data) self.validity.append(c_row) if self.DEBUG: debug_data = copy.deepcopy(data) debug_data["_debug_current_line"] = idx_line + 1 self.debug.append(debug_data) if not self.c_errors and (idx_col + 1) < self.min_col_number: self.c_errors = True self.errors.append( ( idx_line + 1, idx_col + 1, self.ERRORS["not_enough_cols"] % self.min_col_number, ) ) if self.c_errors: return n = datetime.datetime.now() logger.debug("* %s - Cols read" % (str(n - n2))) n2 = n if self.test: return # manage unicity of items (mainly for updates) if "history_modifier" in get_all_field_names(self.OBJECT_CLS): data["history_modifier"] = self.history_modifier self.new_objects, self.updated_objects = [], [] self.ambiguous_objects, self.not_find_objects = [], [] geodata, main_geodata = {}, {} self.geo_prefix = "base_finds" if self.OBJECT_CLS.__name__ == "Find" else "" if self.TYPE in ("gis", "qgs"): profile = get_current_profile() default_srs = profile.srs if profile.srs else None alt_data = data if self.geo_prefix and self.geo_prefix in data: alt_data = data[self.geo_prefix] if "geodata" in alt_data: geodata = self._manage_geodata(default_srs, geodata, alt_data) # do not import if not geometry is set geom_fields = list(IMPORT_GEOMETRY.values()) + ["x", "z"] if not any(1 for k in geom_fields if (k in geodata) and geodata[k]): geodata = {} if "main_geodata" in alt_data: main_geodata = self._manage_geodata(default_srs, main_geodata, alt_data, key="main_geodata") # TODO: main_geodata not used? obj, created = self.get_object(self.OBJECT_CLS, data, idx_line=idx_line) if self.simulate: return data if self.import_instance: self.import_instance.add_imported_line(self.idx_line) if not obj: return if self.import_instance: if created: obj.imports.add(self.import_instance) else: obj.imports_updated.add(self.import_instance) if created: self.number_created += 1 else: self.number_updated += 1 if hasattr(obj, "no_post_process"): obj.no_post_process(history=True) if not created and "defaults" in data: for k in data["defaults"]: setattr(obj, k, data["defaults"][k]) obj._no_post_save = True obj._timestamp = self.timestamp obj._queue = "low_priority" obj.save() self._add_to_post_save(obj.__class__, obj.pk, idx_line) GeoVectorData = apps.get_model("ishtar_common", "GeoVectorData") if self.TYPE in ("gis", "qgs") and geodata and not isinstance(obj, GeoVectorData): # create GIS data and attach to the created object if the object is not GIS geo_obj = obj if self.OBJECT_CLS.__name__ == "Find": geo_obj = obj.get_first_base_find() if not geo_obj and not self.c_errors: self.c_errors = True self.errors.append( ( idx_line + 1, "-", self.ERRORS["gis_missing_obj"].format(str(data)), ) ) return content_type = ContentType.objects.get( app_label=geo_obj.__class__._meta.app_label, model=geo_obj.__class__.__name__.lower() ) geodata.update({ "source_id": geo_obj.pk, "source_content_type": content_type, }) item = None created = False if "import_key" in geodata: q = GeoVectorData.objects.filter( import_key=geodata["import_key"], source_id=geo_obj.pk, source_content_type=content_type, ) if q.count(): item = q.all()[0] try: if item: for k in geodata: setattr(item, k, geodata[k]) item._timestamp = self.timestamp item._queue = "low_priority" if hasattr(item, "no_post_process"): item.no_post_process(history=True) item.save() else: item = GeoVectorData.objects.create(**geodata) created = True except Exception as import_error: msg = str(import_error) if not any(1 for error in self.IGNORE_ERRORS if error in msg): self.errors.append((self.idx_line, None, msg)) return if self.import_instance and created: item.imports.add(self.import_instance) if self.MAIN_GEO: geo_obj._timestamp = self.timestamp geo_obj.main_geodata = item geo_obj._post_saved_geo = True geo_obj._no_move = True geo_obj.skip_history_when_saving = True geo_obj._queue = "low_priority" if hasattr(geo_obj, "no_post_process"): geo_obj.no_post_process(history=True) geo_obj.save() n = datetime.datetime.now() logger.debug("* %s - Item saved" % (str(n - n2))) n2 = n for formater, value in self._throughs: n = datetime.datetime.now() logger.debug( "* %s - Processing formater %s" % (str(n - n2), formater.field_name) ) n2 = n data = {} if formater.through_dict: data = formater.through_dict.copy() if formater.through_key: data[formater.through_key] = obj data[formater.field_name] = value through_cls = formater.through if formater.through_unicity_keys: data["defaults"] = {} for k in list(data.keys()): if k not in formater.through_unicity_keys and k != "defaults": data["defaults"][k] = data.pop(k) created = False if "__force_new" in data: if ( self.MODEL_CREATION_LIMIT and through_cls not in self.MODEL_CREATION_LIMIT ): raise self._get_improperly_conf_error(through_cls) created = data.pop("__force_new") new_data = data.copy() if "defaults" in data: default = new_data.pop("defaults") for k in default: if k not in new_data: new_data[k] = default[k] t_obj = self._create_item(through_cls, new_data, idx_line) else: if ( not self.MODEL_CREATION_LIMIT or through_cls in self.MODEL_CREATION_LIMIT ): new_data = data.copy() if "defaults" in data: default = new_data.pop("defaults") else: default = {} q = through_cls.objects.filter(**new_data) if q.count(): t_obj = through_cls.objects.get(**new_data) else: for k in default: if k not in new_data: new_data[k] = default[k] t_obj = self._create_item(through_cls, new_data, idx_line) else: get_data = data.copy() if "defaults" in get_data: get_data.pop("defaults") try: t_obj = through_cls.objects.get(**get_data) except through_cls.DoesNotExist: raise self._get_does_not_exist_in_db_error( through_cls, get_data ) if not created and "defaults" in data: t_obj = t_obj.__class__.objects.get(pk=t_obj.pk) for k in data["defaults"]: setattr(t_obj, k, data["defaults"][k]) t_obj._no_post_save = True t_obj._timestamp = self.timestamp t_obj._queue = "low_priority" if hasattr(t_obj, "no_post_process"): t_obj.no_post_process(history=True) t_obj.save() self._add_to_post_save(t_obj.__class__, t_obj.pk, idx_line) if self.import_instance and hasattr(t_obj, "imports") and created: t_obj.imports.add(self.import_instance) if not obj: return data for formater, val in self._post_processing: formater.post_process(obj, data, val, owner=self.history_modifier) try: self.post_processing(idx_line, obj) except ImporterError as import_error: msg = str(import_error) if not any(1 for error in self.IGNORE_ERRORS if error in msg): self.errors.append((self.idx_line, None, msg)) return data def _row_processing(self, c_row, idx_col, idx_line, val, data): if idx_col >= len(self.line_format): return data formater = self.line_format[idx_col] if formater and formater.post_processing: self._post_processing.append((formater, val)) if not formater or not formater.field_name: c_row.append(_("Not imported")) return data if formater.regexp: # multiline regexp is a mess... val = val.replace("\n", NEW_LINE_BREAK) match = formater.regexp.match(val) if not match: if formater.required: self.errors.append( (idx_line + 1, idx_col + 1, self.ERRORS["value_required"]) ) self.c_errors = True elif not val.strip(): c_row.append("") return data val = val.replace(NEW_LINE_BREAK, "\n") msg = str(self.ERRORS["regex_not_match"]) + val if not any(1 for error in self.IGNORE_ERRORS if error in msg): self.errors.append( ( idx_line + 1, idx_col + 1, msg, ) ) c_row.append("") return data val_group = [] for g in formater.regexp.findall(val): if isinstance(g, (tuple, list)): g = "".join(g) val_group.append(g.replace(NEW_LINE_BREAK, "\n") if g else "") val = "".join(val_group) field_names = formater.field_name if not isinstance(field_names, (list, tuple)): field_names = [field_names] c_values = [] for idx_fields, field_name in enumerate(field_names): func = formater.formater if type(func) in (list, tuple): func = func[idx_fields] if not callable(func) and isinstance(func, str): func = getattr(self, func) values = [val] many_values = getattr(func, "many_split", None) if many_values: values = re.split(func.many_split, values[0]) # filter empty entries on m2m such as "my-value & " if len(values) > 1: values = [v for v in values if v.strip()] if not values: # keep an empty value if there is only empty values values = [values[0]] formated_values = [] field_name = formater.field_name force_new = formater.force_new if type(field_name) in (list, tuple): field_name = field_name[idx_fields] if type(force_new) in (list, tuple): force_new = force_new[idx_fields] if formater.concat: self.concats.add(field_name) concat_str = formater.concat_str if type(formater.concat_str) in (list, tuple): concat_str = concat_str[idx_fields] if concat_str: self.concat_str[field_name] = concat_str if self.DB_TARGETS: formater.import_instance = self.import_instance formater.reinit_db_target( self.DB_TARGETS["{}-{}".format(idx_col + 1, field_name)], idx_fields ) for idx, v in enumerate(values): try: """ # disable regex cut if formater.regexp_formater_args: args = [] for idx in formater.regexp_formater_args[idx_fields]: args.append(val_group[idx]) value = func.format(*args) else: """ if getattr(func, "need_archive", False): value = func.format(v, archive=self.archive) else: value = func.format(v) except ValueError as import_error: if formater.required: self.c_errors = True msg = str(import_error) if not any(1 for error in self.IGNORE_ERRORS if error in msg): self.errors.append((idx_line + 1, idx_col + 1, msg)) c_values.append("") return data if formater.value_format and value is not None and value != "": if "{item" in formater.value_format: value = formater.value_format.format(item=value) else: value = formater.value_format.format(value) if hasattr(value, "close"): # opened file are kept - they need to be listed and closed # later self.to_be_close.append(value) formated_values.append(value) if hasattr(func, "match_table"): if field_name not in self.match_table: self.match_table[field_name] = {} self.match_table[field_name].update(func.match_table) value = formated_values if not many_values: value = formated_values[0] printed_values = value if type(value) not in (list, tuple): printed_values = [value] if isinstance(func, FileFormater): printed_values = [str(v).split(os.sep)[-1] for v in printed_values] try: # don't reunicode - unicoded values c_values.append(" ; ".join([v for v in printed_values])) except TypeError: c_values.append(" ; ".join([str(v) for v in printed_values])) if value is None and formater.required: self.c_errors = True self.errors.append( (idx_line + 1, idx_col + 1, self.ERRORS["value_required"]) ) c_row.append("") return data field_names = [field_name] force_news = [force_new] concats = formater.concat concat_str = [concat_str] if idx_fields == 0: # duplicate fields are only for the first occurrence for duplicate_field in formater.duplicate_fields: if type(duplicate_field[0]) in (list, tuple): duplicate_field, force_new, concat, conc_str = duplicate_field[ 0 ] else: duplicate_field, force_new, concat, conc_str = duplicate_field field_names += [duplicate_field] force_news += [force_new] concats += [concat] concat_str += [conc_str] if formater.through: self._throughs.append((formater, value)) else: for idx, f_name in enumerate(field_names): if not f_name: continue self._field_name_to_data_dict( f_name, value, data, formater.force_value, force_new=force_news[idx], concat=concats[idx], concat_str=concat_str[idx], ) c_row.append(" ; ".join([v for v in c_values])) return data def _get_field_m2m( self, attribute, data, c_path, new_created, field_object, idx_line=None ): """ Manage and m2m field from raw data :param attribute: attribute name :param data: current data dictionary :param c_path: attribute path from the main model point of view :param new_created: dict of forced newly created items to prevent multiple creation :param field_object: django field object for this attribute :return: None """ m2ms = [] many_values = data.pop(attribute) model = None if hasattr(field_object, "remote_field"): model = field_object.remote_field.model elif hasattr(field_object, "related_model"): model = field_object.related_model elif hasattr(field_object, "model"): model = field_object.model if type(many_values) not in (list, tuple): many_values = [many_values] for val in many_values: if val.__class__ == model: # the value is a model instance: it is OK! m2ms.append((attribute, val)) continue if type(val) != dict: # value is not a dict, we don't know what to do with it... continue vals = [] # contruct many dict for each values default_dict = {} # # init with simple values that will be duplicated for key in val.keys(): if type(val[key]) not in (list, tuple): default_dict[key] = val[key] vals.append(default_dict.copy()) # # manage multiple values for key in list(val.keys()): if type(val[key]) in (list, tuple): for idx, v in enumerate(val[key]): if len(vals) <= idx: vals.append(default_dict.copy()) vals[idx][key] = v # check that m2m are not empty notempty = False for dct in vals: for k in dct: if dct[k] not in ("", None): notempty = True break if not notempty: continue field_names = get_all_field_names(model) for v in vals: if "history_modifier" in field_names: if "defaults" not in v: v["defaults"] = {} v["defaults"]["history_modifier"] = self.history_modifier m2m_m2ms = [] c_c_path = c_path[:] for k in list(v.keys()): if k not in field_names: continue self.get_field( model, k, v, m2m_m2ms, c_c_path, new_created, idx_line=idx_line ) if "__force_new" in v: created = v.pop("__force_new") key = ";".join(["{}-{}".format(k, v[k]) for k in sorted(v.keys())]) # only one forced creation if attribute in new_created and key in new_created[attribute]: continue if attribute not in new_created: new_created[attribute] = [] new_created[attribute].append(key) has_values = bool([1 for k in v if v[k] and k != "defaults"]) if has_values: if ( self.MODEL_CREATION_LIMIT and model not in self.MODEL_CREATION_LIMIT ): raise self._get_improperly_conf_error(model) if "defaults" in v: default_values = v.pop("defaults") for k in default_values.keys(): if k not in v: v[k] = default_values[k] v = model.objects.create(**v) else: continue else: v["defaults"] = v.get("defaults", {}) extra_fields = {} # "File" type is a temp object and can be different # for the same filename - it must be treated # separately for field in model._meta.fields: k = field.name # attr_class is a FileField attribute if hasattr(field, "attr_class") and k in v: extra_fields[k] = v.pop(k) created = False if ( not self.MODEL_CREATION_LIMIT or model in self.MODEL_CREATION_LIMIT ): if not any(v.values()): # empty continue try: v, created = model.objects.get_or_create(**v) except FieldError as e: raise ImporterError( str(_("Importer configuration error: " '"{}".')).format( e ) ) except Exception as e: msg = str(_('Import error: {} - "{}".')).format( str(model._meta.verbose_name), e) raise ImporterError(msg) else: get_v = v.copy() if "defaults" in get_v: get_v.pop("defaults") if not any(get_v.values()): # empty continue try: v = model.objects.get(**get_v) except model.DoesNotExist: raise self._get_does_not_exist_in_db_error(model, get_v) changed = False for k in extra_fields.keys(): if extra_fields[k]: changed = True setattr(v, k, extra_fields[k]) if changed: v._timestamp = self.timestamp v._queue = "low_priority" if hasattr(v, "no_post_process"): v.no_post_process(history=True) v.save() for att, objs in m2m_m2ms: if type(objs) not in (list, tuple): objs = [objs] for obj in objs: getattr(v, att).add(obj) if self.import_instance and hasattr(v, "imports") and created: v.imports.add(self.import_instance) m2ms.append((attribute, v)) return m2ms def _set_importer_trigger(self, cls, attribute, data): """ An importer trigger is used. Stock it for later execution and remove it from current data dict. :param cls: current model :param attribute: attribute name :param data: current data dictionary :return: None """ func = getattr(cls, attribute) if func.importer_trigger == "pre": func(data, data[attribute]) elif func.importer_trigger == "post": self._item_post_processing.append([cls, attribute, data, data[attribute]]) else: logger.warning( "Unknow importer_trigger '{}' for '{}'".format( func.importer_trigger, attribute ) ) data.pop(attribute) def get_field(self, cls, attribute, data, m2ms, c_path, new_created, idx_line=None): """ Get field from raw data :param cls: current model :param attribute: attribute name :param data: current data dictionary :param m2ms: many to many list of tuple: (m2m key, m2m value) :param c_path: attribute path from the main model point of view :param new_created: dict of forced newly created items to prevent multiple creation :return: None """ if hasattr(cls, attribute) and getattr( getattr(cls, attribute), "importer_trigger", None ): # importer trigger self._set_importer_trigger(cls, attribute, data) return if attribute == "data": # json field # no need to do anything return if attribute == "get_default": # force evaluation of default value for this field return try: field_object = cls._meta.get_field(attribute) except FieldDoesNotExist: raise ImporterError( str( _( 'Importer configuration error: field "{}" does not exist ' "for {}." ) ).format(attribute, str(cls._meta.verbose_name)) ) if field_object.many_to_many: try: m2ms += self._get_field_m2m( attribute, data, c_path, new_created, field_object, idx_line=idx_line, ) except Exception as e: msg = str(e) if not any(1 for error in self.IGNORE_ERRORS if error in msg): self.errors.append((self.idx_line, None, msg)) return if not hasattr(field_object, "related_model") or not field_object.related_model: return if type(data[attribute]) == list: # extract the first item from list # be careful if the list has more than one item this is arbitrary if len(data[attribute]) > 1: logger.warning( "Import {}: {} has many when only one is expected. Get " "the first one but it is not OK!".format( self.import_instance, attribute ) ) data[attribute] = data[attribute][0] return if not isinstance(data[attribute], dict): # we treat only dict formated values return # put history_modifier for every created item if "history_modifier" in get_all_field_names(field_object.remote_field.model): data[attribute]["history_modifier"] = self.history_modifier try: c_path.append(attribute) data[attribute], created = self.get_object( field_object.remote_field.model, data[attribute].copy(), c_path, idx_line=idx_line ) except ImporterError as import_error: msg = str(import_error) if not any(1 for error in self.IGNORE_ERRORS if error in msg): self.errors.append((self.idx_line, None, msg)) data[attribute] = None def get_object(self, cls, data, path=None, idx_line=None): if not path: path = [] m2ms = [] if not isinstance(data, dict): # if data is not a dict we don't know what to do return data, False is_empty = not bool( [k for k in data if k not in ("history_modifier", "defaults") and data[k]] ) if is_empty: # if no value, no creation return None, False c_path = path[:] # get all related fields new_created = {} try: for attribute in list(data.keys()): c_c_path = c_path[:] if attribute not in data: # removed by previous get_field continue if not attribute: data.pop(attribute) continue if not data[attribute]: if hasattr(cls, attribute) and getattr( getattr(cls, attribute), "importer_trigger", None ): data.pop(attribute) continue field_object = cls._meta.get_field(attribute) if field_object.many_to_many: data.pop(attribute) continue if attribute != "__force_new": self.get_field( cls, attribute, data, m2ms, c_c_path, new_created, idx_line=idx_line, ) except (ValueError, IntegrityError, FieldDoesNotExist) as e: try: message = str(e) except (UnicodeDecodeError, UnicodeDecodeError): message = "" try: data = str(data) except UnicodeDecodeError: data = "" raise ImporterError( "Erreur d'import %s %s, contexte : %s, erreur : %s" % (str(cls._meta.verbose_name), str("__".join(path)), str(data), message) ) # image field is not serialized image, associated_file = None, None if "image" in data and data["image"]: image = data.pop("image") if "associated_file" in data and data["associated_file"]: associated_file = data.pop("associated_file") create_dict = copy.deepcopy(data) if image: data["image"] = image create_dict["image"] = image if associated_file: data["associated_file"] = associated_file create_dict["associated_file"] = associated_file for k in list(create_dict.keys()): # filter unnecessary default values but not the json field if isinstance(create_dict[k], dict) and k != "data": if self.simulate: create_dict[k] = _("* created *") else: create_dict.pop(k) # File doesn't like deepcopy elif type(create_dict[k]) == File: create_dict[k] = copy.copy(data[k]) path = tuple(path) defaults = {} if hasattr(cls, "get_import_defaults"): defaults = cls.get_import_defaults() if "history_modifier" in create_dict: defaults.update({"history_modifier": create_dict.pop("history_modifier")}) created = False post_save_keys = [] get_by_unicity_key = False obj = None try: try: dct = {} if hasattr(cls, "get_import_defaults"): dct = cls.get_import_defaults() or {} dct.update(create_dict.copy()) for key in dct: if callable(dct[key]): dct[key] = dct[key]() if getattr(dct[key], "post_save", True): dct.pop(key) post_save_keys.append(key) if "__force_new" in dct: created = dct.pop("__force_new") if not [k for k in dct if dct[k] is not None]: return None, created new_dct = defaults.copy() new_dct.update(dct) if ( self.MODEL_CREATION_LIMIT and cls not in self.MODEL_CREATION_LIMIT ): raise self._get_improperly_conf_error(cls) if not self.simulate: self._create_item(cls, new_dct, idx_line) else: self.new_objects.append((path, cls, new_dct)) else: # manage UNICITY_KEYS - only level 1 if not path and self.UNICITY_KEYS: get_by_unicity_key = True for k in list(dct.keys()): if k not in self.UNICITY_KEYS and k != "defaults": if dct[k]: defaults[k] = dct.pop(k) else: dct.pop(k) if "get_default" in dct and dct["get_default"]: dct.pop("get_default") new_dct = defaults.copy() new_dct.update(dct) dct = new_dct if self.simulate: q = cls.objects.filter(**dct) if not q.count(): if ( self.MODEL_CREATION_LIMIT and cls not in self.MODEL_CREATION_LIMIT ): self.not_find_objects.append((path, cls, dct)) return _("* match not find *"), False dct.update(defaults) self.new_objects.append([path, cls, dct]) created = True elif q.count() > 1: self.ambiguous_objects.append((path, list(q.all()), dct)) if q.count() > 10: return ( _("* the query match more than 10 " "results*"), False, ) else: return ( str(_(" or ")).join( [str(item) for item in q.all()] ), False, ) else: self.updated_objects.append([path, q.all()[0], dct, {}]) dct["defaults"] = defaults.copy() else: if not dct and not defaults: obj = None else: if ( not self.MODEL_CREATION_LIMIT or cls in self.MODEL_CREATION_LIMIT ): q = cls.objects.filter(**dct) if q.count(): obj = cls.objects.get(**dct) else: created = True new_dct = dct.copy() for k in defaults: if k not in dct: new_dct[k] = defaults[k] obj = self._create_item(cls, new_dct, idx_line) else: try: obj = cls.objects.get(**dct) obj._no_post_save = ( True # delayed at the end of the import ) self._add_to_post_save(cls, obj.pk, idx_line) except cls.DoesNotExist: raise self._get_does_not_exist_in_db_error(cls, dct) dct["defaults"] = defaults.copy() if not created and not path and self.UNICITY_KEYS: updated_dct = {} if self.conservative_import: for k in dct["defaults"]: new_val = dct["defaults"][k] if new_val is None or new_val == "": continue val = getattr(obj, k) if val is None or val == "": updated_dct[k] = new_val elif ( k in self.concats and type(val) == str and type(new_val) == str ): updated_dct[k] = val + "\n" + new_val elif "defaults" in dct: for k in dct["defaults"]: new_val = dct["defaults"][k] if new_val is None or new_val == "": continue if obj and k == "data": updated_dct[k] = update_data(obj.data, new_val) else: updated_dct[k] = new_val if updated_dct: if self.simulate: self.updated_objects[-1][-1] = updated_dct else: for k in updated_dct: setattr(obj, k, updated_dct[k]) obj._timestamp = self.timestamp obj._queue = "low_priority" if hasattr(obj, "no_post_process"): obj.no_post_process(history=True) obj.save() if ( not self.simulate and self.import_instance and obj and hasattr(obj, "imports") and created ): obj.imports.add(self.import_instance) except (ValueError, IntegrityError, DatabaseError, GEOSException) as e: raise IntegrityError(str(e)) except cls.MultipleObjectsReturned as e: created = False if "defaults" in dct: dct.pop("defaults") if get_by_unicity_key: data = {k: data[k] for k in self.UNICITY_KEYS if data.get(k, None)} if not data: data = str(_('unicity key(s) "{}" is/are missing')).format( '" ; "'.join(self.UNICITY_KEYS)) raise ImporterError( str(_("Import error {}, {}")).format( str(cls._meta.verbose_name), data) ) raise IntegrityError(str(e)) # obj = cls.objects.filter(**dct).all()[0] if not obj and (post_save_keys or m2ms): raise IntegrityError(f"{cls} not created") for key in post_save_keys: getattr(obj, key)() for attr, value in m2ms: values = [value] if type(value) in (list, tuple): values = value if self.simulate: if created: obj_dct = self.new_objects[-1][-1] else: obj_dct = self.updated_objects[-1][-1] obj_dct[attr] = values else: for v in values: related_model = getattr(obj, attr) # an intermediary model is used if ( hasattr(related_model, "through") and not related_model.through._meta.auto_created ): # try to create it with default attributes inter_model = related_model.through target_name, item_name = None, None for field in inter_model._meta.get_fields(): rel_model = getattr(field, "related_model", None) # assume that the first found is correct... if rel_model == v.__class__: target_name = field.name elif rel_model == obj.__class__: item_name = field.name if target_name is None or item_name is None: raise IntegrityError(f"Configuration error for attribute {attr}.") inter_model.objects.get_or_create( **{item_name: obj, target_name: v} ) else: getattr(obj, attr).add(v) # force post save script v = v.__class__.objects.get(pk=v.pk) self._add_to_post_save(v.__class__, v.pk, idx_line) v._no_post_save = True try: v._timestamp = self.timestamp v._queue = "low_priority" if hasattr(v, "no_post_process"): v.no_post_process(history=True) v.save() except DatabaseError as import_error: msg = str(import_error) raise IntegrityError(msg) if self.simulate: # put m2m result in data dict current_data = data if m2ms: for item in path: if item not in current_data: current_data[item] = {} current_data = current_data[item] for key, value in m2ms: if not isinstance(value, list) and not isinstance(value, tuple): value = [value] current_data[key] = value if created: return dct, True else: # defaults are not presented as matching data dct.pop("defaults") return self.updated_objects[-1][1], False except IntegrityError as e: try: message = str(e) except (UnicodeDecodeError, UnicodeDecodeError): message = "" try: data = str(data) except UnicodeDecodeError: data = "" raise ImporterError( str( _("Import error {}, path \"{}\", context : {}, error : {}") ).format( str(cls._meta.verbose_name), str("__".join(path)), str(data), message ) ) if obj: self._add_to_post_save(obj.__class__, obj.pk, idx_line) return obj, created def _format_csv_line(self, values, empty="-"): return ( '"' + '","'.join([(v and str(v).replace('"', '""')) or empty for v in values]) + '"' ) def _get_csv(self, rows, header=None, empty="-"): if not rows: return "" if not header: header = [] csv_v = [] if header: csv_v.append(self._format_csv_line(header, empty=empty)) for values in rows: csv_v.append(self._format_csv_line(values, empty=empty)) return "\n".join(csv_v) def get_csv_errors(self): return self._get_csv(self.errors, header=[_("line"), _("col"), _("error")]) def get_csv_result(self): header = [(line_format and line_format.label) or "-" for line_format in self.LINE_FORMAT] return self._get_csv(self.validity, header=header) def get_csv_matches(self): header = [_("field"), _("source"), _("result")] values = [] for field in self.match_table: for source in self.match_table[field]: values.append((field, source, self.match_table[field][source])) return self._get_csv(values, header=header) @classmethod def choices_check(cls, choices): def function(value): choices_dct = dict(choices) value = value.strip() if not value: return if value not in choices_dct.values(): raise ValueError( _('"%(value)s" not in %(values)s') % { "value": value, "values": ", ".join([val for val in choices_dct.values()]), } ) return value return function