diff options
author | Étienne Loks <etienne.loks@iggdrasil.net> | 2017-02-16 21:00:08 +0100 |
---|---|---|
committer | Étienne Loks <etienne.loks@iggdrasil.net> | 2017-02-16 21:00:08 +0100 |
commit | 5921fedef065a34a97ecdd2ca4353aa72f25b116 (patch) | |
tree | 6b330beb6ef934dd76eb17cdd46f540c5c99c5e4 | |
parent | caf19dff2b465a6a43e48bd13cd1610ca4783bb1 (diff) | |
download | Ishtar-5921fedef065a34a97ecdd2ca4353aa72f25b116.tar.bz2 Ishtar-5921fedef065a34a97ecdd2ca4353aa72f25b116.zip |
Imports: prevent creation of new items when data is empty
-rw-r--r-- | archaeological_operations/tests.py | 25 | ||||
-rw-r--r-- | archaeological_operations/tests/MCC-operations-example.csv | 1 | ||||
-rw-r--r-- | ishtar_common/data_importer.py | 274 |
3 files changed, 155 insertions, 145 deletions
diff --git a/archaeological_operations/tests.py b/archaeological_operations/tests.py index 296236029..364cc4c8e 100644 --- a/archaeological_operations/tests.py +++ b/archaeological_operations/tests.py @@ -36,7 +36,7 @@ from archaeological_operations import views from ishtar_common.models import OrganizationType, Organization, ItemKey, \ ImporterType, IshtarUser, TargetKey, ImporterModel, IshtarSiteProfile, \ - Town, ImporterColumn + Town, ImporterColumn, Person from archaeological_context_records.models import Unit from ishtar_common import forms_common @@ -171,6 +171,7 @@ class ImportOperationTest(ImportTest, TestCase): def test_mcc_import_operation(self): first_ope_nb = models.Operation.objects.count() + first_person_nb = Person.objects.count() importer, form = self.init_ope_import() self.assertTrue(form.is_valid()) impt = form.save(self.ishtar_user) @@ -184,18 +185,20 @@ class ImportOperationTest(ImportTest, TestCase): current_ope_nb = models.Operation.objects.count() # no new operation imported because of a missing connection for # operation_type value - self.assertTrue(current_ope_nb == first_ope_nb) + self.assertEqual(current_ope_nb, first_ope_nb) self.init_ope_targetkey(imp=impt) impt.importation() - # a new operation has now been imported + # new operations have now been imported current_ope_nb = models.Operation.objects.count() - self.assertEqual(current_ope_nb, first_ope_nb + 1) + self.assertEqual(current_ope_nb, first_ope_nb + 2) + current_person_nb = Person.objects.count() + self.assertEqual(current_person_nb, first_person_nb + 1) # and well imported last_ope = models.Operation.objects.order_by('-pk').all()[0] self.assertEqual(last_ope.name, u"Oppìdum de Paris") - self.assertTrue(last_ope.code_patriarche == 4200) - self.assertTrue(last_ope.operation_type.txt_idx == 'prog_excavation') + self.assertEqual(last_ope.code_patriarche, 4200) + self.assertEqual(last_ope.operation_type.txt_idx, 'prog_excavation') self.assertEqual(last_ope.periods.count(), 2) periods = [period.txt_idx for period in last_ope.periods.all()] self.assertIn('iron_age', periods) @@ -239,7 +242,7 @@ class ImportOperationTest(ImportTest, TestCase): impt.initialize() self.init_ope_targetkey(imp=impt) impt.importation() - self.assertEqual(len(impt.errors), 1) + self.assertEqual(len(impt.errors), 2) self.assertIn("Importer configuration error", impt.errors[0]['error']) def test_model_limitation(self): @@ -253,10 +256,10 @@ class ImportOperationTest(ImportTest, TestCase): init_ope_number = models.Operation.objects.count() impt.importation() current_ope_nb = models.Operation.objects.count() - self.assertEqual(current_ope_nb, init_ope_number + 1) + self.assertEqual(current_ope_nb, init_ope_number + 2) - last_ope = models.Operation.objects.order_by('-pk').all()[0] - last_ope.delete() + for ope in models.Operation.objects.order_by('-pk').all()[:2]: + ope.delete() importer, form = self.init_ope_import() # add an inadequate model to make created_models non empty @@ -286,7 +289,7 @@ class ImportOperationTest(ImportTest, TestCase): # import of operations impt.importation() current_ope_nb = models.Operation.objects.count() - self.assertEqual(current_ope_nb, init_ope_number + 1) + self.assertEqual(current_ope_nb, init_ope_number + 2) def test_mcc_import_parcels(self): old_nb = models.Parcel.objects.count() diff --git a/archaeological_operations/tests/MCC-operations-example.csv b/archaeological_operations/tests/MCC-operations-example.csv index 432ceffca..3b9801c33 100644 --- a/archaeological_operations/tests/MCC-operations-example.csv +++ b/archaeological_operations/tests/MCC-operations-example.csv @@ -1,2 +1,3 @@ code OA,region,type operation,intitule operation,operateur,responsable operation,date debut terrain,date fin terrain,chronologie generale,identifiant document georeferencement,notice scientifique +4201,Bourgogne,Fouille programmée,Oppìdum de Paris 2,L'opérateur,,2000/01/31,2002/12/31,Age du Fer,, 4200,Bourgogne,Fouille programmée,Oppìdum de Paris,L'opérateur,Jean Sui-Resp'on Sablé,2000/01/22,2002/12/31,Age du Fer & Gallo-Romain,, diff --git a/ishtar_common/data_importer.py b/ishtar_common/data_importer.py index fbf19def2..426d32a7a 100644 --- a/ishtar_common/data_importer.py +++ b/ishtar_common/data_importer.py @@ -1381,146 +1381,152 @@ class Importer(object): def get_object(self, cls, data, path=[]): m2ms = [] - if data and type(data) == dict: - c_path = path[:] - - # get all related fields - new_created = {} - for attribute in list(data.keys()): - c_c_path = c_path[:] - if not attribute: - data.pop(attribute) - continue - if not data[attribute]: - continue - if attribute != '__force_new': - self.get_field(cls, attribute, data, m2ms, c_c_path, - new_created) - - create_dict = copy.deepcopy(data) - for k in create_dict.keys(): - # filter unnecessary default values - if type(create_dict[k]) == dict: - create_dict.pop(k) - # File doesn't like deepcopy - if type(create_dict[k]) == File: - create_dict[k] = copy.copy(data[k]) - - # default values - path = tuple(path) - defaults = {} - if path in self._defaults: - for k in self._defaults[path]: - if (k not in data or not data[k]): - defaults[k] = self._defaults[path][k] - - if 'history_modifier' in create_dict: - defaults.update({ - 'history_modifier': create_dict.pop('history_modifier') - }) - - created = False + if type(data) != dict: + return data, False + is_empty = not bool( + [k for k in data if k not in ('history_modifier', 'defaults') + and data[k]]) + if is_empty: + return None, False + + c_path = path[:] + + # get all related fields + new_created = {} + for attribute in list(data.keys()): + c_c_path = c_path[:] + if not attribute: + data.pop(attribute) + continue + if not data[attribute]: + continue + if attribute != '__force_new': + self.get_field(cls, attribute, data, m2ms, c_c_path, + new_created) + + create_dict = copy.deepcopy(data) + for k in create_dict.keys(): + # filter unnecessary default values + if type(create_dict[k]) == dict: + create_dict.pop(k) + # File doesn't like deepcopy + if type(create_dict[k]) == File: + create_dict[k] = copy.copy(data[k]) + + # default values + path = tuple(path) + defaults = {} + if path in self._defaults: + for k in self._defaults[path]: + if k not in data or not data[k]: + defaults[k] = self._defaults[path][k] + + if 'history_modifier' in create_dict: + defaults.update({ + 'history_modifier': create_dict.pop('history_modifier') + }) + + created = False + try: try: - try: - dct = create_dict.copy() - for key in dct: - if callable(dct[key]): - dct[key] = dct[key]() - if '__force_new' in dct: - created = dct.pop('__force_new') - if not [k for k in dct if dct[k] is not None]: - return None, created - new_dct = defaults.copy() - new_dct.update(dct) - if self.MODEL_CREATION_LIMIT and \ - cls not in self.MODEL_CREATION_LIMIT: - raise self._get_improperly_conf_error(cls) - obj = cls.objects.create(**new_dct) + dct = create_dict.copy() + for key in dct: + if callable(dct[key]): + dct[key] = dct[key]() + if '__force_new' in dct: + created = dct.pop('__force_new') + if not [k for k in dct if dct[k] is not None]: + return None, created + new_dct = defaults.copy() + new_dct.update(dct) + if self.MODEL_CREATION_LIMIT and \ + cls not in self.MODEL_CREATION_LIMIT: + raise self._get_improperly_conf_error(cls) + obj = cls.objects.create(**new_dct) + else: + # manage UNICITY_KEYS - only level 1 + if not path and self.UNICITY_KEYS: + for k in dct.keys(): + if k not in self.UNICITY_KEYS \ + and k != 'defaults': + defaults[k] = dct.pop(k) + if not self.MODEL_CREATION_LIMIT or \ + cls in self.MODEL_CREATION_LIMIT: + dct['defaults'] = defaults.copy() + obj, created = cls.objects.get_or_create(**dct) else: - # manage UNICITY_KEYS - only level 1 - if not path and self.UNICITY_KEYS: - for k in dct.keys(): - if k not in self.UNICITY_KEYS \ - and k != 'defaults': - defaults[k] = dct.pop(k) - if not self.MODEL_CREATION_LIMIT or \ - cls in self.MODEL_CREATION_LIMIT: + try: + obj = cls.objects.get(**dct) dct['defaults'] = defaults.copy() - obj, created = cls.objects.get_or_create(**dct) - else: - try: - obj = cls.objects.get(**dct) - dct['defaults'] = defaults.copy() - except cls.DoesNotExist: - raise self._get_does_not_exist_in_db_error( - cls, dct) - - if not created and not path and self.UNICITY_KEYS: - changed = False - if self.conservative_import: - for k in dct['defaults']: - new_val = dct['defaults'][k] - if new_val is None or new_val == '': - continue - val = getattr(obj, k) - if val is None or val == '': - changed = True - setattr(obj, k, new_val) - elif k in self.concats \ - and type(val) == unicode \ - and type(new_val) == unicode: - setattr(obj, k, val + u"\n" + new_val) - else: - for k in dct['defaults']: - new_val = dct['defaults'][k] - if new_val is None or new_val == '': - continue + except cls.DoesNotExist: + raise self._get_does_not_exist_in_db_error( + cls, dct) + + if not created and not path and self.UNICITY_KEYS: + changed = False + if self.conservative_import: + for k in dct['defaults']: + new_val = dct['defaults'][k] + if new_val is None or new_val == '': + continue + val = getattr(obj, k) + if val is None or val == '': changed = True setattr(obj, k, new_val) - if changed: - obj.save() - if self.import_instance and hasattr(obj, 'imports') \ - and created: - obj.imports.add(self.import_instance) - except ValueError as e: - raise IntegrityError(e.message) - except IntegrityError as e: - raise IntegrityError(e.message) - except DatabaseError as e: - raise IntegrityError(e.message) - except cls.MultipleObjectsReturned as e: - created = False - if 'defaults' in dct: - dct.pop('defaults') - raise IntegrityError(e.message) - # obj = cls.objects.filter(**dct).all()[0] - for attr, value in m2ms: - values = [value] - if type(value) in (list, tuple): - values = value - for v in values: - getattr(obj, attr).add(v) - # force post save script - v.save() - if m2ms: - # force post save script - obj.save() + elif k in self.concats \ + and type(val) == unicode \ + and type(new_val) == unicode: + setattr(obj, k, val + u"\n" + new_val) + else: + for k in dct['defaults']: + new_val = dct['defaults'][k] + if new_val is None or new_val == '': + continue + changed = True + setattr(obj, k, new_val) + if changed: + obj.save() + if self.import_instance and hasattr(obj, 'imports') \ + and created: + obj.imports.add(self.import_instance) + except ValueError as e: + raise IntegrityError(e.message) except IntegrityError as e: - message = e.message - try: - message = e.message.decode('utf-8') - except (UnicodeDecodeError, UnicodeDecodeError): - message = '' - try: - data = unicode(data) - except UnicodeDecodeError: - data = '' - raise ImporterError( - "Erreur d'import %s %s, contexte : %s, erreur : %s" - % (unicode(cls), unicode("__".join(path)), - unicode(data), message)) - return obj, created - return data + raise IntegrityError(e.message) + except DatabaseError as e: + raise IntegrityError(e.message) + except cls.MultipleObjectsReturned as e: + created = False + if 'defaults' in dct: + dct.pop('defaults') + raise IntegrityError(e.message) + # obj = cls.objects.filter(**dct).all()[0] + for attr, value in m2ms: + values = [value] + if type(value) in (list, tuple): + values = value + for v in values: + getattr(obj, attr).add(v) + # force post save script + v.save() + if m2ms: + # force post save script + obj.save() + except IntegrityError as e: + message = e.message + try: + message = e.message.decode('utf-8') + except (UnicodeDecodeError, UnicodeDecodeError): + message = '' + try: + data = unicode(data) + except UnicodeDecodeError: + data = '' + raise ImporterError( + "Erreur d'import %s %s, contexte : %s, erreur : %s" + % (unicode(cls), unicode("__".join(path)), + unicode(data), message)) + return obj, created def _format_csv_line(self, values, empty=u"-"): return u'"' + u'","'.join( |