#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2013-2017 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import copy import datetime import os import logging import re import sys import zipfile from django.conf import settings from django.contrib.auth.models import User from django.contrib.gis.geos.error import GEOSException from django.db.models.fields import FieldDoesNotExist from django.core.exceptions import FieldError from django.core.files import File from django.db import IntegrityError, DatabaseError, transaction from django.db.models import Q from django.template.defaultfilters import slugify from django.utils.translation import ugettext_lazy as _ from ishtar_common.utils import debug_line_no, get_all_field_names, update_data NEW_LINE_BREAK = "#####@@@#####" RE_FILTER_CEDEX = re.compile("(.*) *(?: *CEDEX|cedex|Cedex|Cédex|cédex *\d*)") def post_importer_action(func): def wrapper(self, context, value): return func(self, context, value) wrapper.importer_trigger = "post" return wrapper def pre_importer_action(func): def wrapper(self, context, value): return func(self, context, value) wrapper.importer_trigger = "pre" return wrapper class ImportFormater(object): def __init__( self, field_name, formater=None, required=True, through=None, through_key=None, through_dict=None, through_unicity_keys=None, duplicate_fields=None, regexp=None, regexp_formater_args=None, force_value=None, post_processing=False, concat=False, concat_str=False, comment="", force_new=None, export_field_name=None, value_format=None, label="", ): self.field_name = field_name if export_field_name: self.export_field_name = export_field_name else: self.export_field_name = field_name self.formater = formater self.required = required self.through = through self.through_key = through_key self.through_dict = through_dict self.through_unicity_keys = through_unicity_keys self.duplicate_fields = duplicate_fields or [] self.regexp = regexp self.value_format = value_format self.regexp_formater_args = regexp_formater_args or [] # write this value even if a value exists self.force_value = force_value # post process after import self.post_processing = post_processing # concatenate with existing value self.concat = concat self.concat_str = concat_str self.comment = comment self.force_new = force_new self.label = label def reinit_db_target(self, db_target, nb=0, user=None): if not self.formater: return if type(db_target) in (list, tuple): db_target = db_target[nb] if type(self.formater) not in (list, tuple): self.formater.db_target = db_target self.formater.init_db_target(user=user) else: for idx, formater in enumerate(self.formater): formater.db_target = db_target formater.init_db_target(user=user) def init_db_target(self, user=None): pass def __str__(self): return self.field_name def report_succes(self, *args): return def report_error(self, *args): return def init( self, vals, output=None, choose_default=False, import_instance=None, user=None ): try: lst = iter(self.formater) except TypeError: lst = [self.formater] for formater in lst: if formater: formater.check( vals, output, self.comment, choose_default=choose_default, import_instance=import_instance, user=user, ) def post_process(self, obj, context, value, owner=None): raise NotImplemented() class ImporterError(Exception): STANDARD = "S" HEADER = "H" def __init__(self, message, type="S"): self.msg = message self.type = type def __str__(self): return str(self.msg) class Formater(object): def __init__(self, *args, **kwargs): self.db_target = kwargs.get("db_target", None) def format(self, value): return value def check( self, values, output=None, comment="", choose_default=False, import_instance=None, user=None, ): return def init_db_target(self, user=None): pass def _base_target_filter(self, user=None): # set for all users q_or = ( Q(associated_import__isnull=True) & Q(associated_user__isnull=True) & Q(associated_group__isnull=True) ) if hasattr(self, "import_instance") and self.import_instance: # set for current import q_or = q_or | Q(associated_import=self.import_instance) if self.import_instance.associated_group: # set for associated group q_or = q_or | Q(associated_group=self.import_instance.associated_group) if user: # set for current user q_or = q_or | Q(associated_user=user) return q_or def get_db_target_query(self, user=None): if not self.db_target: return q = self.db_target.keys.filter(is_set=True) q_or = self._base_target_filter(user) q = q.filter(q_or) return q class ChoiceChecker(object): def report_new(self, comment): if not self.new_keys: return msg = 'For "%s" these new associations have been made:\n' % comment sys.stderr.write(msg.encode("utf-8")) for k in self.new_keys: msg = '"%s";"%s"\n' % (k, self.new_keys[k]) sys.stderr.write(msg.encode("utf-8")) class UnicodeFormater(Formater): def __init__( self, max_length=None, clean=False, re_filter=None, notnull=False, prefix="", db_target=None, import_instance=None, many_split=None, ): self.max_length = max_length self.db_target = db_target self.clean = clean self.re_filter = re_filter self.notnull = notnull self.prefix = prefix self.import_instance = import_instance self.many_split = many_split def format(self, value): try: if type(value) != str: value = str(value.strip()) vals = [] for v in value.split("\n"): v = v.strip() if v: vals.append(v) value = "\n".join(vals) if self.re_filter: m = self.re_filter.match(value) if m: value = "".join(m.groups()) if self.clean: if value.startswith(","): value = value[1:] if value.endswith(","): value = value[:-1] value = value.replace(", , ", ", ") except UnicodeDecodeError: return if self.max_length and len(value) > self.max_length: raise ValueError( _( '"%(value)s" is too long. The max length is %(length)d ' "characters." ) % {"value": value, "length": self.max_length} ) if self.notnull and not value: return if value: value = self.prefix + value return value class BooleanFormater(Formater): def format(self, value): value = value.strip().upper() if value in ("1", "OUI", "VRAI", "YES", "TRUE"): return True if value in ("", "0", "NON", "FAUX", "NO", "FALSE"): return False raise ValueError(_('"%(value)s" not equal to yes or no') % {"value": value}) class FloatFormater(Formater): def format(self, value): value = value.strip().replace(",", ".") value = value.replace(" ", "") if not value: return try: return float(value) except ValueError: raise ValueError(_('"%(value)s" is not a float') % {"value": value}) class InseeFormater(Formater): """ Formater for "code INSEE" (statistic institute ine France) The syntax "CodeINSEE-Year" is accepted (Ishtar trick) in order to manage old INSEE (year is the date of creation) """ ERROR = _('"{value}" is not an appropriate INSEE code') def format(self, value): value = value.strip() exp = value.split("-") code = exp[0] try: int(code) except ValueError: raise ValueError(str(self.ERROR).format(value)) while len(code) < 5: code = "0" + code if len(exp) > 2: raise ValueError(str(self.ERROR).format(value)) elif len(exp) == 1: return code try: datetime.datetime.strptime(exp[1], "%Y") except ValueError: raise ValueError(str(self.ERROR).format(value)) return code + "-" + exp[1] class YearFormater(Formater): def format(self, value): value = value.strip() if not value: return try: value = int(value) assert value > 0 and value < (datetime.date.today().year + 30) except (ValueError, AssertionError): raise ValueError(_('"%(value)s" is not a valid date') % {"value": value}) return value class YearNoFuturFormater(Formater): def format(self, value): value = value.strip() if not value: return try: value = int(value) assert value > 0 and value < datetime.date.today().year except (ValueError, AssertionError): raise ValueError(_('"%(value)s" is not a valid date') % {"value": value}) return value class IntegerFormater(Formater): def format(self, value): value = value.strip() value = value.replace(" ", "") if not value: return try: return int(value) except ValueError: raise ValueError(_('"%(value)s" is not an integer') % {"value": value}) class StrChoiceFormater(Formater, ChoiceChecker): def __init__( self, choices, strict=False, equiv_dict=None, model=None, cli=False, many_split="", db_target=None, import_instance=None, ): if not equiv_dict: equiv_dict = {} self.choices = list(choices) self.strict = strict self.equiv_dict = copy.deepcopy(equiv_dict) self.cli = cli self.model = model self.db_target = db_target self.create = False self.missings = set() self.new_keys = {} self.match_table = {} self.many_split = many_split self.import_instance = None for key, value in self.choices: value = str(value) if not self.strict: value = slugify(value) if value not in self.equiv_dict: v = key if model and v: v = model.objects.get(pk=v) self.equiv_dict[value] = v self.init_db_target() def init_db_target(self, user=None): if not self.db_target: return q = self.get_db_target_query(user) for target_key in list(q.all()): key = target_key.key if not self.strict: key = slugify(key) if key in self.equiv_dict: continue v = target_key.value if self.model and v and type(v) in (int, str): q = self.model.objects.filter(txt_idx=v) if not q.count(): q = self.model.objects.filter(pk=v) if not q.count(): target_key.delete() continue v = q.all()[0] self.equiv_dict[key] = v def prepare(self, value): return str(value).strip() def _get_choices(self, comment=""): msgstr = comment + " - " msgstr += str( _('Choice for "%s" is not available. ' "Which one is relevant?\n") ) idx = -1 for idx, choice in enumerate(self.choices): msgstr += "%d. %s\n" % (idx + 1, choice[1]) idx += 2 if self.create: msgstr += str(_("%d. None of the above - create new")) % idx + "\n" idx += 1 msgstr += str(_("%d. None of the above - skip")) % idx + "\n" return msgstr, idx def check( self, values, output=None, comment="", choose_default=False, import_instance=None, user=None, ): self.init_db_target(user) """ from ishtar_common.models import TargetKey if self.db_target: q = {'target': self.db_target, 'associated_import': import_instance, 'is_set': True } for v in self.equiv_dict: q['key'] = v value = self.equiv_dict[v] if hasattr(value, 'pk'): value = value.pk q['value'] = value with transaction.atomic(): try: t, created = TargetKey.objects.get_or_create(**q) except IntegrityError: pass """ if (not output or output == "silent") and not choose_default: return if self.many_split: new_values = [] r = re.compile(self.many_split) for value in values: new_values += r.split(value) values = new_values for value in values: base_value = copy.copy(value) value = self.prepare(value) if value in self.equiv_dict: continue if output != "cli" and not choose_default: self.missings.add(value) continue msgstr, idx = self._get_choices(comment) res = None if choose_default: res = 1 while res not in range(1, idx + 1): msg = msgstr % value sys.stdout.write(msg.encode("utf-8")) sys.stdout.write("\n>>> ") res = input() try: res = int(res) except ValueError: pass res -= 1 if res < len(self.choices): v = self.choices[res][0] if self.model and v: v = self.model.objects.get(pk=v) self.equiv_dict[value] = v self.add_key(v, value, import_instance) self.new_keys[value] = v elif self.create and res == len(self.choices): self.equiv_dict[value] = self.new(base_value) self.choices.append( (self.equiv_dict[value].pk, str(self.equiv_dict[value])) ) self.new_keys[value] = str(self.equiv_dict[value]) else: self.equiv_dict[value] = None if self.equiv_dict[value] and self.db_target: from ishtar_common.models import TargetKey q = {"target": self.db_target, "key": value} query = TargetKey.objects.filter(**q) query = query.filter(self._base_target_filter(user)) if query.count(): target = query.all()[0] target.value = self.equiv_dict[value] target.is_set = True target.save() else: q["associated_import"] = import_instance with transaction.atomic(): q["value"] = self.equiv_dict[value] q["is_set"] = True try: TargetKey.objects.create(**q) except IntegrityError: pass if output == "db" and self.db_target: from ishtar_common.models import TargetKey for missing in self.missings: q = {"target": self.db_target, "key": missing} query = TargetKey.objects.filter(**q) query = query.filter(self._base_target_filter(user)) if query.count(): continue with transaction.atomic(): q["associated_import"] = import_instance try: TargetKey.objects.create(**q) except IntegrityError: pass if output == "cli": self.report_new(comment) def new(self, value): return def add_key(self, obj, value, importer=None): return def format(self, value): origin_value = value value = self.prepare(value) if not self.strict: value = slugify(value) if value in self.equiv_dict: self.match_table[origin_value] = self.equiv_dict[value] or "" return self.equiv_dict[value] class TypeFormater(StrChoiceFormater): def __init__( self, model, cli=False, defaults=None, many_split=False, db_target=None, import_instance=None, ): if not defaults: defaults = {} self.create = True self.strict = False self.model = model self.defaults = defaults self.many_split = many_split self.db_target = db_target self.missings = set() self.equiv_dict, self.choices = {}, [] self.match_table = {} self.new_keys = {} self.import_instance = import_instance if self.import_instance: for item in model.objects.all(): self.choices.append((item.pk, str(item))) for key in item.get_keys(importer=import_instance): self.equiv_dict[key] = item def prepare(self, value): return slugify(str(value).strip()) def add_key(self, obj, value, importer=None): obj.add_key(slugify(value), force=True, importer=importer) def new(self, value): values = copy.copy(self.defaults) values["label"] = value values["txt_idx"] = slugify(value) if "order" in get_all_field_names(self.model): order = 1 q = self.model.objects.values("order").order_by("-order") if q.count(): order = q.all()[0]["order"] or 1 values["order"] = order return self.model.objects.create(**values) class DateFormater(Formater): def __init__(self, date_formats=None, db_target=None, import_instance=None): if not date_formats: date_formats = ["%d/%m/%Y"] self.date_formats = date_formats if type(date_formats) not in (list, tuple): self.date_formats = [self.date_formats] self.db_target = db_target self.import_instance = import_instance def format(self, value): value = value.strip() if not value: return for date_format in self.date_formats: try: return datetime.datetime.strptime(value, date_format).date() except: continue raise ValueError(_('"%(value)s" is not a valid date') % {"value": value}) class FileFormater(Formater): need_archive = True def format(self, value, archive): value = value.strip() if not value: return zp = zipfile.ZipFile(archive) value = value.strip().replace("\\", "/") items = value.replace("/", "_").split(".") base_dir = settings.MEDIA_ROOT + "imported" if not os.path.isdir(base_dir): os.mkdir(base_dir) filename = base_dir + os.sep + ".".join(items[:-1]) + "." + items[-1] try: with open(filename, "wb") as f: with zp.open(value) as z: f.write(z.read()) f = open(filename, "rb") my_file = File(f) # manually set the file size because of an issue with TempFile my_file.size = os.stat(filename).st_size return my_file except KeyError: raise ValueError( _('"%(value)s" is not a valid path for the ' "given archive") % {"value": value} ) class StrToBoolean(Formater, ChoiceChecker): def __init__( self, choices=None, cli=False, strict=False, db_target=None, import_instance=None, ): if not choices: choices = {} self.dct = copy.copy(choices) self.cli = cli self.strict = strict self.db_target = db_target self.missings = set() self.match_table = {} self.new_keys = {} self.import_instance = import_instance self.init_db_target() def init_db_target(self, user=None): if not self.db_target: return q = self.get_db_target_query(user) for target_key in q.all(): key = self.prepare(target_key.key) if key in self.dct: continue v = target_key.format() self.dct[key] = v def prepare(self, value): value = str(value).strip() if not self.strict: value = slugify(value) return value def check( self, values, output=None, comment="", choose_default=False, import_instance=None, user=None, ): if (not output or output == "silent") and not choose_default: return msgstr = comment + " - " msgstr += str( _('Choice for "%s" is not available. ' "Which one is relevant?\n") ) msgstr += "1. True\n" msgstr += "2. False\n" msgstr += "3. Empty\n" for value in values: value = self.prepare(value) if value in self.dct: continue if output != "cli" and not choose_default: self.missings.add(value) continue res = None if choose_default: res = 1 while res not in range(1, 4): msg = msgstr % value sys.stdout.write(msg.encode("utf-8")) sys.stdout.write("\n>>> ") res = input() try: res = int(res) except ValueError: pass if res == 1: self.dct[value] = True elif res == 2: self.dct[value] = False else: self.dct[value] = None self.new_keys[value] = str(self.dct[value]) if output == "db" and self.db_target: from ishtar_common.models import TargetKey for missing in self.missings: try: q = { "target": self.db_target, "key": missing, "associated_import": import_instance, } if not TargetKey.objects.filter(**q).count(): TargetKey.objects.create(**q) except IntegrityError: pass if output == "cli": self.report_new(comment) def format(self, value): origin_value = value value = self.prepare(value) if value in self.dct: val = self.dct[value] and "True" or "False" self.match_table[origin_value] = _(val) return self.dct[value] logger = logging.getLogger(__name__) def get_object_from_path(obj, path): for k in path.split("__")[:-1]: if not hasattr(obj, k): return obj = getattr(obj, k) return obj class Importer(object): SLUG = "" NAME = "" DESC = "" LINE_FORMAT = [] OBJECT_CLS = None UNICITY_KEYS = [] # if set only models inside this list can be created MODEL_CREATION_LIMIT = [] EXTRA_DEFAULTS = {} DEFAULTS = {} ERRORS = { "header_check": _( "The given file is not correct. Check the file " "format. If you use a CSV file: check that column separator " "and encoding are similar to the ones used by the reference " "file." ), "too_many_cols": _( "Too many cols (%(user_col)d) when " "maximum is %(ref_col)d" ), "no_data": _("No data provided"), "value_required": _("Value is required"), "not_enough_cols": _("At least %d columns must be filled"), "regex_not_match": _("The regexp doesn't match."), "improperly_configured": _( "Forced creation is set for model {} but this model is not in the " "list of models allowed to be created." ), "does_not_exist_in_db": _( "{} with values {} doesn't exist in the " "database. Create it first or fix your source file." ), } def _create_models(self, force=False): """ Create a db config from a hardcoded import. Not useful anymore? """ from ishtar_common import models q = models.ImporterType.objects.filter(slug=self.SLUG) if not force and (not self.SLUG or q.count()): return if force and q.count(): q.all()[0].delete() name = self.NAME if self.NAME else self.SLUG model_name = self.OBJECT_CLS.__module__ + "." + self.OBJECT_CLS.__name__ model_cls, c = models.ImporterModel.object.get_or_create( klass=model_name, default={"name": self.OBJECT_CLS.__name__} ) unicity_keys = "" if self.UNICITY_KEYS: unicity_keys = ";".join(self.UNICITY_KEYS) importer = models.ImporterType.objects.create( slug=self.SLUG, name=name, description=self.DESC, associated_models=model_cls, unicity_keys=unicity_keys, ) for default in self.DEFAULTS: values = self.DEFAULTS[default] imp_default = models.ImporterDefault.objects.create( importer_type=importer, target="__".join(default) ) for key in values: if key in ("history_modifier",): continue value = values[key] if hasattr(value, "txt_idx") and value.txt_idx: value = value.txt_idx elif hasattr(value, "pk") and value.pk: value = value.pk if callable(value): value = value() models.ImporterDefaultValues.objects.create( default_target=imp_default, target=key, value=value ) for idx, line in enumerate(self.line_format): idx += 1 if not line: continue column = models.ImporterColumn.objects.create( importer_type=importer, col_number=idx ) targets = line.field_name if type(targets) not in (list, tuple): targets = [targets] formaters = line.formater if type(formaters) not in (list, tuple): formaters = [formaters] for idx_target, target in enumerate(targets): formater = formaters[idx_target] formater_name = formater.__class__.__name__ if formater_name not in models.IMPORTER_TYPES_DCT: formater_name = "UnknowType" options = "" if formater_name == "TypeFormater": options = formater.model.__module__ + "." + formater.model.__name__ elif formater_name == "UnicodeFormater": options = str(formater.max_length or "") elif formater_name == "DateFormater": options = formater.date_formats[0] formater_model, created = models.FormaterType.objects.get_or_create( formater_type=formater_name, options=options.strip(), many_split=getattr(formater, "many_split", None) or "", ) regexp_filter = None if getattr(formater, "regexp", None): regexp_filter, created = models.Regexp.objects.get_or_create( regexp=formater.regex, defaults={"name": "Default name"} ) models.ImportTarget.objects.get_or_create( column=column, target=target, formater_type=formater_model, force_new=getattr(formater, "force_new", False), concat=getattr(formater, "concat", False), concat_str=getattr(formater, "concat_str", ""), regexp_filter=regexp_filter, comment=line.comment, ) return True def _get_improperly_conf_error(self, model): from ishtar_common.models import ImporterModel cls_name = model.__module__ + "." + model.__name__ q = ImporterModel.objects.filter(klass=cls_name) if q.count(): cls_name = q.all()[0].name return ImporterError(str(self.ERRORS["improperly_configured"]).format(cls_name)) def _get_does_not_exist_in_db_error(self, model, data): from ishtar_common.models import ImporterModel cls_name = model.__module__ + "." + model.__name__ q = ImporterModel.objects.filter(klass=cls_name) if q.count(): cls_name = q.all()[0].name values = ", ".join(["{}: {}".format(k, data[k]) for k in data]) raise ImporterError( str(self.ERRORS["does_not_exist_in_db"]).format(cls_name, values) ) def __init__( self, skip_lines=0, reference_header=None, check_col_num=False, test=False, history_modifier=None, output="silent", import_instance=None, conservative_import=False, ): """ * skip_line must be set if the data provided has got headers lines. * a reference_header can be provided to perform a data compliance check. It can be useful to warn about bad parsing. * test doesn't write in the database """ self.skip_lines = skip_lines self.reference_header = reference_header self.test = test self.errors = [] # list of (line, col, message) self.validity = [] # list of (line, col, message) # list used for simulation self.simulate = False self.new_objects, self.updated_objects = [], [] self.ambiguous_objects, self.not_find_objects = [], [] self.number_updated = 0 self.number_created = 0 self.check_col_num = check_col_num self.line_format = copy.copy(self.LINE_FORMAT) self.import_instance = import_instance self.archive = None self.current_csv_line = None self.conservative_import = conservative_import # for a conservative_import UNICITY_KEYS should be defined assert not self.conservative_import or bool(self.UNICITY_KEYS) self.DB_TARGETS = {} self.match_table = {} self.concats = set() self.concat_str = {} self.to_be_close = [] if import_instance and import_instance.imported_images: self.archive = import_instance.imported_images self._defaults = self.DEFAULTS.copy() # EXTRA_DEFAULTS are for multiple inheritance if self.EXTRA_DEFAULTS: for k in self.EXTRA_DEFAULTS: if k not in self._defaults: self._defaults[k] = {} self._defaults[k].update(self.EXTRA_DEFAULTS[k]) self.history_modifier = history_modifier self.output = output if not self.history_modifier: if self.import_instance: self.history_modifier = self.import_instance.user.user_ptr else: # import made by the CLI: get the first admin self.history_modifier = User.objects.filter(is_superuser=True).order_by( "pk" )[0] def post_processing(self, idx_line, item): # force django based post-processing for the item item = item.__class__.objects.get(pk=item.pk) item.save() if hasattr(item, "RELATED_POST_PROCESS"): for related_key in item.RELATED_POST_PROCESS: for related in getattr(item, related_key).all(): related.save() for func, context, value in self._item_post_processing: context["import_object"] = self.import_instance try: getattr(item, func)(context, value) except ImporterError as msg: self.errors.append((idx_line, None, msg)) return item def initialize(self, table, output="silent", choose_default=False, user=None): """ copy vals in columns and initialize formaters * output: - silent: no associations - cli: output by command line interface and stocked in the database - db: output on the database with no interactive association (further exploitation by web interface) - user: associated user """ assert output in ("silent", "cli", "db") vals = [] for idx_line, line in enumerate(table): if self.skip_lines > idx_line: continue for idx_col, val in enumerate(line): if idx_col >= len(self.line_format): break if idx_col >= len(vals): vals.append([]) vals[idx_col].append(val) for idx, formater in enumerate(self.line_format): if formater and idx < len(vals): formater.import_instance = self.import_instance if self.DB_TARGETS: field_names = formater.field_name if type(field_names) not in (list, tuple): field_names = [field_names] db_targets = [] for field_name in field_names: db_targets.append( self.DB_TARGETS["{}-{}".format(idx + 1, field_name)] ) formater.reinit_db_target(db_targets, user=user) formater.init( vals[idx], output, choose_default=choose_default, import_instance=self.import_instance, user=user, ) def get_formaters(self): return self.line_format def importation( self, table, initialize=True, choose_default=False, user=None, line_to_process=None, simulate=False, ): if initialize: self.initialize( table, self.output, choose_default=choose_default, user=user ) self.simulate = simulate return self._importation(table, line_to_process=line_to_process) def get_current_values(self, obj): return obj def _associate_db_target_to_formaters(self): if not self.import_instance: return self.DB_TARGETS = {} from ishtar_common.models import ImporterColumn, ImportTarget for idx, line in enumerate(self.line_format): idx += 1 if not line: continue col = ImporterColumn.objects.get( importer_type=self.import_instance.importer_type, col_number=idx ) formater = line.formater targets = line.field_name if type(formater) not in (list, tuple): formater = [formater] targets = [targets] for target in targets: tg = target if type(target) == list and type(target[0]) == list: tg = target[0] self.DB_TARGETS["{}-{}".format(idx, tg)] = ImportTarget.objects.get( column=col, target=tg ) @classmethod def _field_name_to_data_dict( cls, field_name, value, data, force_value=False, concat=False, concat_str="", force_new=False, ): field_names = field_name if type(field_names) not in (list, tuple): field_names = [field_name] for field_name in field_names: keys = field_name.split("__") current_data = data for idx, key in enumerate(keys): if idx == (len(keys) - 1): # last if concat: if isinstance(value, (list, tuple)): if key not in current_data: current_data[key] = [] if not value: continue current_data[key] += value else: if key not in current_data: current_data[key] = "" if not value: continue current_data[key] = ( (current_data[key] + (concat_str or "")) if current_data[key] else "" ) current_data[key] += value elif force_value and value: if concat_str and key in current_data and current_data[key]: current_data[key] = ( str(current_data[key]) + concat_str + str(value) ) else: current_data[key] = value elif key not in current_data or not current_data[key]: current_data[key] = value elif concat_str: current_data[key] = ( str(current_data[key]) + concat_str + str(value) ) if force_new: current_data["__force_new"] = True elif key not in current_data: current_data[key] = {} current_data = current_data[key] return data def _importation(self, table, line_to_process=None): self.match_table = {} table = list(table) if not table or not table[0]: raise ImporterError(self.ERRORS["no_data"], ImporterError.HEADER) if self.check_col_num and len(table[0]) > len(self.line_format): raise ImporterError( self.ERRORS["too_many_cols"] % {"user_col": len(table[0]), "ref_col": len(self.line_format)} ) self.errors = [] self.validity = [] self.number_imported = 0 idx_last_col = 0 # index of the last required column for idx_last_col, formater in enumerate(reversed(self.line_format)): if formater and formater.required: break else: idx_last_col += 1 # min col number to be filled self.min_col_number = len(self.line_format) - idx_last_col # check the conformity with the reference header if ( self.reference_header and self.skip_lines and self.reference_header != table[0] ): raise ImporterError(self.ERRORS["header_check"], type=ImporterError.HEADER) self.now = datetime.datetime.now() start = datetime.datetime.now() total = len(table) if self.output == "cli": sys.stdout.write("\n") results = [] for idx_line, line in enumerate(table): self.idx_line = idx_line if line_to_process is not None: if line_to_process != idx_line: continue if idx_line > line_to_process: return results if self.output == "cli": left = None if idx_line > 10: ellapsed = datetime.datetime.now() - start time_by_item = ellapsed / idx_line if time_by_item: left = ((total - idx_line) * time_by_item).seconds txt = "\r* %d/%d" % (idx_line + 1, total) if left: txt += " (%d seconds left)" % left sys.stdout.write(txt.encode("utf-8")) sys.stdout.flush() try: results.append(self._line_processing(idx_line, line)) except ImporterError as msg: self.errors.append((idx_line, None, msg)) for item in self.to_be_close: item.close() return results def _line_processing(self, idx_line, line): for item in self.to_be_close: item.close() self.to_be_close = [] self.idx_line = idx_line if self.skip_lines > idx_line: self.validity.append(line) return if not line: self.validity.append([]) return if ( not self.simulate and self.import_instance and not self.import_instance.has_changes(idx_line) ): self.validity.append(line) return self._throughs = [] # list of (formater, value) self._post_processing = [] # list of (formater, value) self._item_post_processing = [] data = {} self.current_csv_line = None # raw line for simulation if self.simulate: self.current_csv_line = line n = datetime.datetime.now() logger.debug("%s - Processing line %d" % (str(n - self.now), idx_line)) self.now = n n2 = n self.c_errors = False c_row = [] for idx_col, val in enumerate(line): try: self._row_processing(c_row, idx_col, idx_line, val, data) except: pass self.validity.append(c_row) if not self.c_errors and (idx_col + 1) < self.min_col_number: self.c_errors = True self.errors.append( ( idx_line + 1, idx_col + 1, self.ERRORS["not_enough_cols"] % self.min_col_number, ) ) if self.c_errors: return n = datetime.datetime.now() logger.debug("* %s - Cols read" % (str(n - n2))) n2 = n if self.test: return # manage unicity of items (mainly for updates) if "history_modifier" in get_all_field_names(self.OBJECT_CLS): data["history_modifier"] = self.history_modifier self.new_objects, self.updated_objects = [], [] self.ambiguous_objects, self.not_find_objects = [], [] obj, created = self.get_object(self.OBJECT_CLS, data) if self.simulate: return data # print(data) # print(self._defaults) if self.import_instance: self.import_instance.add_imported_line(self.idx_line) if self.import_instance and hasattr(obj, "imports") and created: obj.imports.add(self.import_instance) if created: self.number_created += 1 else: self.number_updated += 1 if not created and "defaults" in data: for k in data["defaults"]: setattr(obj, k, data["defaults"][k]) obj.save() n = datetime.datetime.now() logger.debug("* %s - Item saved" % (str(n - n2))) n2 = n for formater, value in self._throughs: n = datetime.datetime.now() logger.debug( "* %s - Processing formater %s" % (str(n - n2), formater.field_name) ) n2 = n data = {} if formater.through_dict: data = formater.through_dict.copy() if formater.through_key: data[formater.through_key] = obj data[formater.field_name] = value through_cls = formater.through if formater.through_unicity_keys: data["defaults"] = {} for k in list(data.keys()): if k not in formater.through_unicity_keys and k != "defaults": data["defaults"][k] = data.pop(k) created = False if "__force_new" in data: if ( self.MODEL_CREATION_LIMIT and through_cls not in self.MODEL_CREATION_LIMIT ): raise self._get_improperly_conf_error(through_cls) created = data.pop("__force_new") t_obj = through_cls.objects.create(**data) else: if ( not self.MODEL_CREATION_LIMIT or through_cls in self.MODEL_CREATION_LIMIT ): t_obj, created = through_cls.objects.get_or_create(**data) else: get_data = data.copy() if "defaults" in get_data: get_data.pop("defaults") try: t_obj = through_cls.objects.get(**get_data) except through_cls.DoesNotExist: raise self._get_does_not_exist_in_db_error( through_cls, get_data ) if not created and "defaults" in data: t_obj = t_obj.__class__.objects.get(pk=t_obj.pk) for k in data["defaults"]: setattr(t_obj, k, data["defaults"][k]) t_obj.save() if self.import_instance and hasattr(t_obj, "imports") and created: t_obj.imports.add(self.import_instance) if not obj: return data for formater, val in self._post_processing: formater.post_process(obj, data, val, owner=self.history_modifier) self.post_processing(idx_line, obj) return data def _row_processing(self, c_row, idx_col, idx_line, val, data): if idx_col >= len(self.line_format): return formater = self.line_format[idx_col] if formater and formater.post_processing: self._post_processing.append((formater, val)) if not formater or not formater.field_name: c_row.append(_("Not imported")) return if formater.regexp: # multiline regexp is a mess... val = val.replace("\n", NEW_LINE_BREAK) match = formater.regexp.match(val) if not match: if formater.required: self.errors.append( (idx_line + 1, idx_col + 1, self.ERRORS["value_required"]) ) self.c_errors = True elif not val.strip(): c_row.append("") return val = val.replace(NEW_LINE_BREAK, "\n") self.errors.append( ( idx_line + 1, idx_col + 1, str(self.ERRORS["regex_not_match"]) + val, ) ) c_row.append("") return val_group = [] for g in formater.regexp.findall(val): if isinstance(g, (tuple, list)): g = "".join(g) val_group.append(g.replace(NEW_LINE_BREAK, "\n") if g else "") val = "".join(val_group) field_names = formater.field_name if not isinstance(field_names, (list, tuple)): field_names = [field_names] c_values = [] for idx_fields, field_name in enumerate(field_names): func = formater.formater if type(func) in (list, tuple): func = func[idx_fields] if not callable(func) and type(func) == str: func = getattr(self, func) values = [val] many_values = getattr(func, "many_split", None) if many_values: values = re.split(func.many_split, values[0]) # filter empty entries on m2m such as "my-value & " if len(values) > 1: values = [v for v in values if v.strip()] if not values: # keep an empty value if there is only empty values values = [values[0]] formated_values = [] field_name = formater.field_name force_new = formater.force_new if type(field_name) in (list, tuple): field_name = field_name[idx_fields] if type(force_new) in (list, tuple): force_new = force_new[idx_fields] if formater.concat: self.concats.add(field_name) concat_str = formater.concat_str if type(formater.concat_str) in (list, tuple): concat_str = concat_str[idx_fields] if concat_str: self.concat_str[field_name] = concat_str if self.DB_TARGETS: formater.import_instance = self.import_instance formater.reinit_db_target( self.DB_TARGETS["{}-{}".format(idx_col + 1, field_name)], idx_fields ) for idx, v in enumerate(values): try: """ # disable regex cut if formater.regexp_formater_args: args = [] for idx in formater.regexp_formater_args[idx_fields]: args.append(val_group[idx]) value = func.format(*args) else: """ if getattr(func, "need_archive", False): value = func.format(v, archive=self.archive) else: value = func.format(v) except ValueError as e: if formater.required: self.c_errors = True self.errors.append((idx_line + 1, idx_col + 1, str(e))) c_values.append("") return if formater.value_format and value is not None and value != "": value = formater.value_format.format(value) if hasattr(value, "close"): # opened file are kept - they need to be listed and closed # later self.to_be_close.append(value) formated_values.append(value) if hasattr(func, "match_table"): if field_name not in self.match_table: self.match_table[field_name] = {} self.match_table[field_name].update(func.match_table) value = formated_values if not many_values: value = formated_values[0] printed_values = value if type(value) not in (list, tuple): printed_values = [value] try: # don't reunicode - unicoded values c_values.append(" ; ".join([v for v in printed_values])) except TypeError: c_values.append(" ; ".join([str(v) for v in printed_values])) if value is None and formater.required: self.c_errors = True self.errors.append( (idx_line + 1, idx_col + 1, self.ERRORS["value_required"]) ) return field_names = [field_name] force_news = [force_new] concats = formater.concat concat_str = [concat_str] if idx_fields == 0: # duplicate fields are only for the first occurrence for duplicate_field in formater.duplicate_fields: if type(duplicate_field[0]) in (list, tuple): duplicate_field, force_new, concat, conc_str = duplicate_field[ 0 ] else: duplicate_field, force_new, concat, conc_str = duplicate_field field_names += [duplicate_field] force_news += [force_new] concats += [concat] concat_str += [conc_str] if formater.through: self._throughs.append((formater, value)) else: for idx, f_name in enumerate(field_names): self._field_name_to_data_dict( f_name, value, data, formater.force_value, force_new=force_news[idx], concat=concats[idx], concat_str=concat_str[idx], ) c_row.append(" ; ".join([v for v in c_values])) def _get_field_m2m(self, attribute, data, c_path, new_created, field_object): """ Manage and m2m field from raw data :param attribute: attribute name :param data: current data dictionary :param c_path: attribute path from the main model point of view :param new_created: dict of forced newly created items to prevent multiple creation :param field_object: django field object for this attribute :return: None """ m2ms = [] many_values = data.pop(attribute) model = None if hasattr(field_object, "rel"): model = field_object.rel.to elif hasattr(field_object, "related_model"): model = field_object.related_model elif hasattr(field_object, "to"): model = field_object.to elif hasattr(field_object, "model"): model = field_object.model if type(many_values) not in (list, tuple): many_values = [many_values] for val in many_values: if val.__class__ == model: # the value is a model instance: it is OK! m2ms.append((attribute, val)) continue if type(val) != dict: # value is not a dict, we don't know what to do with it... continue vals = [] # contruct many dict for each values default_dict = {} # # get default values p = [attribute] if c_path: p = list(c_path) + p p = tuple(p) if p in self._defaults: for k in self._defaults[p]: default_dict[k] = self._defaults[p][k] # # init with simple values that will be duplicated for key in val.keys(): if type(val[key]) not in (list, tuple): default_dict[key] = val[key] vals.append(default_dict.copy()) # # manage multiple values for key in list(val.keys()): if type(val[key]) in (list, tuple): for idx, v in enumerate(val[key]): if len(vals) <= idx: vals.append(default_dict.copy()) vals[idx][key] = v # check that m2m are not empty notempty = False for dct in vals: for k in dct: if dct[k] not in ("", None): notempty = True break if not notempty: continue field_names = get_all_field_names(model) for v in vals: if "history_modifier" in field_names: if "defaults" not in v: v["defaults"] = {} v["defaults"]["history_modifier"] = self.history_modifier m2m_m2ms = [] c_c_path = c_path[:] for k in list(v.keys()): if k not in field_names: continue self.get_field(model, k, v, m2m_m2ms, c_c_path, new_created) if "__force_new" in v: created = v.pop("__force_new") key = ";".join(["{}-{}".format(k, v[k]) for k in sorted(v.keys())]) # only one forced creation if attribute in new_created and key in new_created[attribute]: continue if attribute not in new_created: new_created[attribute] = [] new_created[attribute].append(key) has_values = bool([1 for k in v if v[k] and k != "defaults"]) if has_values: if ( self.MODEL_CREATION_LIMIT and model not in self.MODEL_CREATION_LIMIT ): raise self._get_improperly_conf_error(model) if "defaults" in v: default_values = v.pop("defaults") for k in default_values.keys(): if k not in v: v[k] = default_values[k] v = model.objects.create(**v) else: continue else: v["defaults"] = v.get("defaults", {}) extra_fields = {} # "File" type is a temp object and can be different # for the same filename - it must be treated # separately for field in model._meta.fields: k = field.name # attr_class is a FileField attribute if hasattr(field, "attr_class") and k in v: extra_fields[k] = v.pop(k) created = False if ( not self.MODEL_CREATION_LIMIT or model in self.MODEL_CREATION_LIMIT ): try: v, created = model.objects.get_or_create(**v) except FieldError as e: raise ImporterError( str(_("Importer configuration error: " '"{}".')).format( e ) ) except Exception as e: msg = str(_('Import error: {} - "{}".')).format(model, e) raise ImporterError(msg) else: get_v = v.copy() if "defaults" in get_v: get_v.pop("defaults") try: v = model.objects.get(**get_v) except model.DoesNotExist: raise self._get_does_not_exist_in_db_error(model, get_v) changed = False for k in extra_fields.keys(): if extra_fields[k]: changed = True setattr(v, k, extra_fields[k]) if changed: v.save() for att, objs in m2m_m2ms: if type(objs) not in (list, tuple): objs = [objs] for obj in objs: getattr(v, att).add(obj) if self.import_instance and hasattr(v, "imports") and created: v.imports.add(self.import_instance) m2ms.append((attribute, v)) return m2ms def _set_importer_trigger(self, cls, attribute, data): """ An importer trigger is used. Stock it for later execution and remove it from current data dict. :param cls: current model :param attribute: attribute name :param data: current data dictionary :return: None """ func = getattr(cls, attribute) if func.importer_trigger == "pre": func(data, data[attribute]) elif func.importer_trigger == "post": self._item_post_processing.append([attribute, data, data[attribute]]) else: logger.warning( "Unknow importer_trigger '{}' for '{}'".format( func.importer_trigger, attribute ) ) data.pop(attribute) def get_field(self, cls, attribute, data, m2ms, c_path, new_created): """ Get field from raw data :param cls: current model :param attribute: attribute name :param data: current data dictionary :param m2ms: many to many list of tuple: (m2m key, m2m value) :param c_path: attribute path from the main model point of view :param new_created: dict of forced newly created items to prevent multiple creation :return: None """ if hasattr(cls, attribute) and getattr( getattr(cls, attribute), "importer_trigger", None ): # importer trigger self._set_importer_trigger(cls, attribute, data) return if attribute == "data": # json field # no need to do anything return if attribute == "get_default": # force evaluation of default value for this field return try: field_object = cls._meta.get_field(attribute) except FieldDoesNotExist: raise ImporterError( str( _( 'Importer configuration error: field "{}" does not exist ' "for {}." ) ).format(attribute, cls._meta.verbose_name) ) if field_object.many_to_many: try: m2ms += self._get_field_m2m( attribute, data, c_path, new_created, field_object ) except Exception as e: self.errors.append((self.idx_line, None, str(e))) return if not hasattr(field_object, "rel") or not field_object.rel: return if type(data[attribute]) == list: # extract the first item from list # be careful if the list has more than one item this is arbitrary if len(data[attribute]) > 1: logger.warning( "Import {}: {} has many when only one is expected. Get " "the first one but it is not OK!".format( self.import_instance, attribute ) ) data[attribute] = data[attribute][0] return if not isinstance(data[attribute], dict): # we treat only dict formated values return # put history_modifier for every created item if "history_modifier" in get_all_field_names(field_object.rel.to): data[attribute]["history_modifier"] = self.history_modifier try: c_path.append(attribute) data[attribute], created = self.get_object( field_object.rel.to, data[attribute].copy(), c_path ) except ImporterError as msg: self.errors.append((self.idx_line, None, msg)) data[attribute] = None def get_object(self, cls, data, path=None): if not path: path = [] m2ms = [] if type(data) != dict: # if data is not a dict we don't know what to do return data, False is_empty = not bool( [k for k in data if k not in ("history_modifier", "defaults") and data[k]] ) if is_empty: # if no value, no creation return None, False c_path = path[:] # get all related fields new_created = {} try: for attribute in list(data.keys()): c_c_path = c_path[:] if attribute not in data: # removed by previous get_field continue if not attribute: data.pop(attribute) continue if not data[attribute]: if hasattr(cls, attribute) and getattr( getattr(cls, attribute), "importer_trigger", None ): data.pop(attribute) continue field_object = cls._meta.get_field(attribute) if field_object.many_to_many: data.pop(attribute) continue if attribute != "__force_new": self.get_field(cls, attribute, data, m2ms, c_c_path, new_created) except (ValueError, IntegrityError, FieldDoesNotExist) as e: try: message = str(e) except (UnicodeDecodeError, UnicodeDecodeError): message = "" try: data = str(data) except UnicodeDecodeError: data = "" raise ImporterError( "Erreur d'import %s %s, contexte : %s, erreur : %s" % (str(cls), str("__".join(path)), str(data), message) ) # image field is not serialized image, associated_file = None, None if "image" in data and data["image"]: image = data.pop("image") if "associated_file" in data and data["associated_file"]: associated_file = data.pop("associated_file") create_dict = copy.deepcopy(data) if image: data["image"] = image create_dict["image"] = image if associated_file: data["associated_file"] = associated_file create_dict["associated_file"] = associated_file for k in list(create_dict.keys()): # filter unnecessary default values but not the json field if isinstance(create_dict[k], dict) and k != "data": if self.simulate: create_dict[k] = _("* created *") else: create_dict.pop(k) # File doesn't like deepcopy elif type(create_dict[k]) == File: create_dict[k] = copy.copy(data[k]) # default values path = tuple(path) defaults = {} if hasattr(cls, "get_import_defaults"): defaults = cls.get_import_defaults() if path in self._defaults: for k in self._defaults[path]: if (k not in data or not data[k]) and self._defaults[path][k]: defaults[k] = self._defaults[path][k] if "history_modifier" in create_dict: defaults.update({"history_modifier": create_dict.pop("history_modifier")}) created = False post_save_keys = [] try: try: dct = {} if hasattr(cls, "get_import_defaults"): dct = cls.get_import_defaults() or {} dct.update(create_dict.copy()) for key in dct: if callable(dct[key]): dct[key] = dct[key]() if getattr(dct[key], "post_save", True): dct.pop(key) post_save_keys.append(key) if "__force_new" in dct: created = dct.pop("__force_new") if not [k for k in dct if dct[k] is not None]: return None, created new_dct = defaults.copy() new_dct.update(dct) if ( self.MODEL_CREATION_LIMIT and cls not in self.MODEL_CREATION_LIMIT ): raise self._get_improperly_conf_error(cls) if not self.simulate: obj = cls.objects.create(**new_dct) else: self.new_objects.append((path, cls, new_dct)) else: # manage UNICITY_KEYS - only level 1 if not path and self.UNICITY_KEYS: for k in list(dct.keys()): if k not in self.UNICITY_KEYS and k != "defaults": if dct[k]: defaults[k] = dct.pop(k) else: dct.pop(k) if "get_default" in dct and dct["get_default"]: dct.pop("get_default") new_dct = defaults.copy() new_dct.update(dct) dct = new_dct obj = None if self.simulate: q = cls.objects.filter(**dct) if not q.count(): if ( self.MODEL_CREATION_LIMIT and cls not in self.MODEL_CREATION_LIMIT ): self.not_find_objects.append((path, cls, dct)) return _("* match not find *"), False dct.update(defaults) self.new_objects.append([path, cls, dct]) created = True elif q.count() > 1: self.ambiguous_objects.append((path, list(q.all()), dct)) if q.count() > 10: return ( _("* the query match more than 10 " "results*"), False, ) else: return ( str(_(" or ")).join( [str(item) for item in q.all()] ), False, ) else: self.updated_objects.append([path, q.all()[0], dct, {}]) dct["defaults"] = defaults.copy() else: if not dct and not defaults: obj = None else: if ( not self.MODEL_CREATION_LIMIT or cls in self.MODEL_CREATION_LIMIT ): dct["defaults"] = defaults.copy() obj, created = cls.objects.get_or_create(**dct) else: try: obj = cls.objects.get(**dct) dct["defaults"] = defaults.copy() except cls.DoesNotExist: raise self._get_does_not_exist_in_db_error(cls, dct) if not created and not path and self.UNICITY_KEYS: updated_dct = {} if self.conservative_import: for k in dct["defaults"]: new_val = dct["defaults"][k] if new_val is None or new_val == "": continue val = getattr(obj, k) if val is None or val == "": updated_dct[k] = new_val elif ( k in self.concats and type(val) == str and type(new_val) == str ): updated_dct[k] = val + "\n" + new_val else: for k in dct["defaults"]: new_val = dct["defaults"][k] if new_val is None or new_val == "": continue if obj and k == "data": updated_dct[k] = update_data(obj.data, new_val) else: updated_dct[k] = new_val if updated_dct: if self.simulate: self.updated_objects[-1][-1] = updated_dct else: for k in updated_dct: setattr(obj, k, updated_dct[k]) obj.save() if ( not self.simulate and self.import_instance and hasattr(obj, "imports") and created ): obj.imports.add(self.import_instance) except (ValueError, IntegrityError, DatabaseError, GEOSException) as e: raise IntegrityError(str(e)) except cls.MultipleObjectsReturned as e: created = False if "defaults" in dct: dct.pop("defaults") raise IntegrityError(str(e)) # obj = cls.objects.filter(**dct).all()[0] for key in post_save_keys: getattr(obj, key)() for attr, value in m2ms: values = [value] if type(value) in (list, tuple): values = value if self.simulate: if created: obj_dct = self.new_objects[-1][-1] else: obj_dct = self.updated_objects[-1][-1] obj_dct[attr] = values else: for v in values: related_model = getattr(obj, attr) # an intermediary model is used if ( hasattr(related_model, "through") and not related_model.through._meta.auto_created ): # try to create it with default attributes inter_model = related_model.through target_name, item_name = None, None for field in inter_model._meta.get_fields(): rel_model = getattr(field, "related_model", None) # assume that the first found is correct... if rel_model == v.__class__: target_name = field.name elif rel_model == obj.__class__: item_name = field.name assert target_name is not None assert item_name is not None inter_model.objects.get_or_create( **{item_name: obj, target_name: v} ) else: getattr(obj, attr).add(v) # force post save script v = v.__class__.objects.get(pk=v.pk) try: v.save() except DatabaseError as e: raise IntegrityError(e.message) if self.simulate: # put m2m result in data dict current_data = data if m2ms: for item in path: if item not in current_data: current_data[item] = {} current_data = current_data[item] for key, value in m2ms: if not isinstance(value, list) and not isinstance(value, tuple): value = [value] current_data[key] = value if created: return dct, True else: # defaults are not presented as matching data dct.pop("defaults") return self.updated_objects[-1][1], False if m2ms: # force post save script obj.save() if hasattr(obj, "fix"): # post save/m2m specific fix obj.fix() except IntegrityError as e: try: message = str(e) except (UnicodeDecodeError, UnicodeDecodeError): message = "" try: data = str(data) except UnicodeDecodeError: data = "" raise ImporterError( "Erreur d'import %s %s, contexte : %s, erreur : %s" % (str(cls), str("__".join(path)), str(data), message) ) return obj, created def _format_csv_line(self, values, empty="-"): return ( '"' + '","'.join([(v and str(v).replace('"', '""')) or empty for v in values]) + '"' ) def _get_csv(self, rows, header=None, empty="-"): if not rows: return "" if not header: header = [] csv_v = [] if header: csv_v.append(self._format_csv_line(header, empty=empty)) for values in rows: csv_v.append(self._format_csv_line(values, empty=empty)) return "\n".join(csv_v) def get_csv_errors(self): return self._get_csv(self.errors, header=[_("line"), _("col"), _("error")]) def get_csv_result(self): return self._get_csv(self.validity) def get_csv_matches(self): header = [_("field"), _("source"), _("result")] values = [] for field in self.match_table: for source in self.match_table[field]: values.append((field, source, self.match_table[field][source])) return self._get_csv(values, header=header) @classmethod def choices_check(cls, choices): def function(value): choices_dct = dict(choices) value = value.strip() if not value: return if value not in choices_dct.values(): raise ValueError( _('"%(value)s" not in %(values)s') % { "value": value, "values": ", ".join([val for val in choices_dct.values()]), } ) return value return function