summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--archaeological_operations/tests.py25
-rw-r--r--archaeological_operations/tests/MCC-operations-example.csv1
-rw-r--r--ishtar_common/data_importer.py274
3 files changed, 155 insertions, 145 deletions
diff --git a/archaeological_operations/tests.py b/archaeological_operations/tests.py
index 296236029..364cc4c8e 100644
--- a/archaeological_operations/tests.py
+++ b/archaeological_operations/tests.py
@@ -36,7 +36,7 @@ from archaeological_operations import views
from ishtar_common.models import OrganizationType, Organization, ItemKey, \
ImporterType, IshtarUser, TargetKey, ImporterModel, IshtarSiteProfile, \
- Town, ImporterColumn
+ Town, ImporterColumn, Person
from archaeological_context_records.models import Unit
from ishtar_common import forms_common
@@ -171,6 +171,7 @@ class ImportOperationTest(ImportTest, TestCase):
def test_mcc_import_operation(self):
first_ope_nb = models.Operation.objects.count()
+ first_person_nb = Person.objects.count()
importer, form = self.init_ope_import()
self.assertTrue(form.is_valid())
impt = form.save(self.ishtar_user)
@@ -184,18 +185,20 @@ class ImportOperationTest(ImportTest, TestCase):
current_ope_nb = models.Operation.objects.count()
# no new operation imported because of a missing connection for
# operation_type value
- self.assertTrue(current_ope_nb == first_ope_nb)
+ self.assertEqual(current_ope_nb, first_ope_nb)
self.init_ope_targetkey(imp=impt)
impt.importation()
- # a new operation has now been imported
+ # new operations have now been imported
current_ope_nb = models.Operation.objects.count()
- self.assertEqual(current_ope_nb, first_ope_nb + 1)
+ self.assertEqual(current_ope_nb, first_ope_nb + 2)
+ current_person_nb = Person.objects.count()
+ self.assertEqual(current_person_nb, first_person_nb + 1)
# and well imported
last_ope = models.Operation.objects.order_by('-pk').all()[0]
self.assertEqual(last_ope.name, u"Oppìdum de Paris")
- self.assertTrue(last_ope.code_patriarche == 4200)
- self.assertTrue(last_ope.operation_type.txt_idx == 'prog_excavation')
+ self.assertEqual(last_ope.code_patriarche, 4200)
+ self.assertEqual(last_ope.operation_type.txt_idx, 'prog_excavation')
self.assertEqual(last_ope.periods.count(), 2)
periods = [period.txt_idx for period in last_ope.periods.all()]
self.assertIn('iron_age', periods)
@@ -239,7 +242,7 @@ class ImportOperationTest(ImportTest, TestCase):
impt.initialize()
self.init_ope_targetkey(imp=impt)
impt.importation()
- self.assertEqual(len(impt.errors), 1)
+ self.assertEqual(len(impt.errors), 2)
self.assertIn("Importer configuration error", impt.errors[0]['error'])
def test_model_limitation(self):
@@ -253,10 +256,10 @@ class ImportOperationTest(ImportTest, TestCase):
init_ope_number = models.Operation.objects.count()
impt.importation()
current_ope_nb = models.Operation.objects.count()
- self.assertEqual(current_ope_nb, init_ope_number + 1)
+ self.assertEqual(current_ope_nb, init_ope_number + 2)
- last_ope = models.Operation.objects.order_by('-pk').all()[0]
- last_ope.delete()
+ for ope in models.Operation.objects.order_by('-pk').all()[:2]:
+ ope.delete()
importer, form = self.init_ope_import()
# add an inadequate model to make created_models non empty
@@ -286,7 +289,7 @@ class ImportOperationTest(ImportTest, TestCase):
# import of operations
impt.importation()
current_ope_nb = models.Operation.objects.count()
- self.assertEqual(current_ope_nb, init_ope_number + 1)
+ self.assertEqual(current_ope_nb, init_ope_number + 2)
def test_mcc_import_parcels(self):
old_nb = models.Parcel.objects.count()
diff --git a/archaeological_operations/tests/MCC-operations-example.csv b/archaeological_operations/tests/MCC-operations-example.csv
index 432ceffca..3b9801c33 100644
--- a/archaeological_operations/tests/MCC-operations-example.csv
+++ b/archaeological_operations/tests/MCC-operations-example.csv
@@ -1,2 +1,3 @@
code OA,region,type operation,intitule operation,operateur,responsable operation,date debut terrain,date fin terrain,chronologie generale,identifiant document georeferencement,notice scientifique
+4201,Bourgogne,Fouille programmée,Oppìdum de Paris 2,L'opérateur,,2000/01/31,2002/12/31,Age du Fer,,
4200,Bourgogne,Fouille programmée,Oppìdum de Paris,L'opérateur,Jean Sui-Resp'on Sablé,2000/01/22,2002/12/31,Age du Fer & Gallo-Romain,,
diff --git a/ishtar_common/data_importer.py b/ishtar_common/data_importer.py
index fbf19def2..426d32a7a 100644
--- a/ishtar_common/data_importer.py
+++ b/ishtar_common/data_importer.py
@@ -1381,146 +1381,152 @@ class Importer(object):
def get_object(self, cls, data, path=[]):
m2ms = []
- if data and type(data) == dict:
- c_path = path[:]
-
- # get all related fields
- new_created = {}
- for attribute in list(data.keys()):
- c_c_path = c_path[:]
- if not attribute:
- data.pop(attribute)
- continue
- if not data[attribute]:
- continue
- if attribute != '__force_new':
- self.get_field(cls, attribute, data, m2ms, c_c_path,
- new_created)
-
- create_dict = copy.deepcopy(data)
- for k in create_dict.keys():
- # filter unnecessary default values
- if type(create_dict[k]) == dict:
- create_dict.pop(k)
- # File doesn't like deepcopy
- if type(create_dict[k]) == File:
- create_dict[k] = copy.copy(data[k])
-
- # default values
- path = tuple(path)
- defaults = {}
- if path in self._defaults:
- for k in self._defaults[path]:
- if (k not in data or not data[k]):
- defaults[k] = self._defaults[path][k]
-
- if 'history_modifier' in create_dict:
- defaults.update({
- 'history_modifier': create_dict.pop('history_modifier')
- })
-
- created = False
+ if type(data) != dict:
+ return data, False
+ is_empty = not bool(
+ [k for k in data if k not in ('history_modifier', 'defaults')
+ and data[k]])
+ if is_empty:
+ return None, False
+
+ c_path = path[:]
+
+ # get all related fields
+ new_created = {}
+ for attribute in list(data.keys()):
+ c_c_path = c_path[:]
+ if not attribute:
+ data.pop(attribute)
+ continue
+ if not data[attribute]:
+ continue
+ if attribute != '__force_new':
+ self.get_field(cls, attribute, data, m2ms, c_c_path,
+ new_created)
+
+ create_dict = copy.deepcopy(data)
+ for k in create_dict.keys():
+ # filter unnecessary default values
+ if type(create_dict[k]) == dict:
+ create_dict.pop(k)
+ # File doesn't like deepcopy
+ if type(create_dict[k]) == File:
+ create_dict[k] = copy.copy(data[k])
+
+ # default values
+ path = tuple(path)
+ defaults = {}
+ if path in self._defaults:
+ for k in self._defaults[path]:
+ if k not in data or not data[k]:
+ defaults[k] = self._defaults[path][k]
+
+ if 'history_modifier' in create_dict:
+ defaults.update({
+ 'history_modifier': create_dict.pop('history_modifier')
+ })
+
+ created = False
+ try:
try:
- try:
- dct = create_dict.copy()
- for key in dct:
- if callable(dct[key]):
- dct[key] = dct[key]()
- if '__force_new' in dct:
- created = dct.pop('__force_new')
- if not [k for k in dct if dct[k] is not None]:
- return None, created
- new_dct = defaults.copy()
- new_dct.update(dct)
- if self.MODEL_CREATION_LIMIT and \
- cls not in self.MODEL_CREATION_LIMIT:
- raise self._get_improperly_conf_error(cls)
- obj = cls.objects.create(**new_dct)
+ dct = create_dict.copy()
+ for key in dct:
+ if callable(dct[key]):
+ dct[key] = dct[key]()
+ if '__force_new' in dct:
+ created = dct.pop('__force_new')
+ if not [k for k in dct if dct[k] is not None]:
+ return None, created
+ new_dct = defaults.copy()
+ new_dct.update(dct)
+ if self.MODEL_CREATION_LIMIT and \
+ cls not in self.MODEL_CREATION_LIMIT:
+ raise self._get_improperly_conf_error(cls)
+ obj = cls.objects.create(**new_dct)
+ else:
+ # manage UNICITY_KEYS - only level 1
+ if not path and self.UNICITY_KEYS:
+ for k in dct.keys():
+ if k not in self.UNICITY_KEYS \
+ and k != 'defaults':
+ defaults[k] = dct.pop(k)
+ if not self.MODEL_CREATION_LIMIT or \
+ cls in self.MODEL_CREATION_LIMIT:
+ dct['defaults'] = defaults.copy()
+ obj, created = cls.objects.get_or_create(**dct)
else:
- # manage UNICITY_KEYS - only level 1
- if not path and self.UNICITY_KEYS:
- for k in dct.keys():
- if k not in self.UNICITY_KEYS \
- and k != 'defaults':
- defaults[k] = dct.pop(k)
- if not self.MODEL_CREATION_LIMIT or \
- cls in self.MODEL_CREATION_LIMIT:
+ try:
+ obj = cls.objects.get(**dct)
dct['defaults'] = defaults.copy()
- obj, created = cls.objects.get_or_create(**dct)
- else:
- try:
- obj = cls.objects.get(**dct)
- dct['defaults'] = defaults.copy()
- except cls.DoesNotExist:
- raise self._get_does_not_exist_in_db_error(
- cls, dct)
-
- if not created and not path and self.UNICITY_KEYS:
- changed = False
- if self.conservative_import:
- for k in dct['defaults']:
- new_val = dct['defaults'][k]
- if new_val is None or new_val == '':
- continue
- val = getattr(obj, k)
- if val is None or val == '':
- changed = True
- setattr(obj, k, new_val)
- elif k in self.concats \
- and type(val) == unicode \
- and type(new_val) == unicode:
- setattr(obj, k, val + u"\n" + new_val)
- else:
- for k in dct['defaults']:
- new_val = dct['defaults'][k]
- if new_val is None or new_val == '':
- continue
+ except cls.DoesNotExist:
+ raise self._get_does_not_exist_in_db_error(
+ cls, dct)
+
+ if not created and not path and self.UNICITY_KEYS:
+ changed = False
+ if self.conservative_import:
+ for k in dct['defaults']:
+ new_val = dct['defaults'][k]
+ if new_val is None or new_val == '':
+ continue
+ val = getattr(obj, k)
+ if val is None or val == '':
changed = True
setattr(obj, k, new_val)
- if changed:
- obj.save()
- if self.import_instance and hasattr(obj, 'imports') \
- and created:
- obj.imports.add(self.import_instance)
- except ValueError as e:
- raise IntegrityError(e.message)
- except IntegrityError as e:
- raise IntegrityError(e.message)
- except DatabaseError as e:
- raise IntegrityError(e.message)
- except cls.MultipleObjectsReturned as e:
- created = False
- if 'defaults' in dct:
- dct.pop('defaults')
- raise IntegrityError(e.message)
- # obj = cls.objects.filter(**dct).all()[0]
- for attr, value in m2ms:
- values = [value]
- if type(value) in (list, tuple):
- values = value
- for v in values:
- getattr(obj, attr).add(v)
- # force post save script
- v.save()
- if m2ms:
- # force post save script
- obj.save()
+ elif k in self.concats \
+ and type(val) == unicode \
+ and type(new_val) == unicode:
+ setattr(obj, k, val + u"\n" + new_val)
+ else:
+ for k in dct['defaults']:
+ new_val = dct['defaults'][k]
+ if new_val is None or new_val == '':
+ continue
+ changed = True
+ setattr(obj, k, new_val)
+ if changed:
+ obj.save()
+ if self.import_instance and hasattr(obj, 'imports') \
+ and created:
+ obj.imports.add(self.import_instance)
+ except ValueError as e:
+ raise IntegrityError(e.message)
except IntegrityError as e:
- message = e.message
- try:
- message = e.message.decode('utf-8')
- except (UnicodeDecodeError, UnicodeDecodeError):
- message = ''
- try:
- data = unicode(data)
- except UnicodeDecodeError:
- data = ''
- raise ImporterError(
- "Erreur d'import %s %s, contexte : %s, erreur : %s"
- % (unicode(cls), unicode("__".join(path)),
- unicode(data), message))
- return obj, created
- return data
+ raise IntegrityError(e.message)
+ except DatabaseError as e:
+ raise IntegrityError(e.message)
+ except cls.MultipleObjectsReturned as e:
+ created = False
+ if 'defaults' in dct:
+ dct.pop('defaults')
+ raise IntegrityError(e.message)
+ # obj = cls.objects.filter(**dct).all()[0]
+ for attr, value in m2ms:
+ values = [value]
+ if type(value) in (list, tuple):
+ values = value
+ for v in values:
+ getattr(obj, attr).add(v)
+ # force post save script
+ v.save()
+ if m2ms:
+ # force post save script
+ obj.save()
+ except IntegrityError as e:
+ message = e.message
+ try:
+ message = e.message.decode('utf-8')
+ except (UnicodeDecodeError, UnicodeDecodeError):
+ message = ''
+ try:
+ data = unicode(data)
+ except UnicodeDecodeError:
+ data = ''
+ raise ImporterError(
+ "Erreur d'import %s %s, contexte : %s, erreur : %s"
+ % (unicode(cls), unicode("__".join(path)),
+ unicode(data), message))
+ return obj, created
def _format_csv_line(self, values, empty=u"-"):
return u'"' + u'","'.join(