From 804d5a65839300a3c0765b7be6d3e67af6901fe2 Mon Sep 17 00:00:00 2001 From: Valérie-Emma Leroux Date: Sat, 4 Feb 2017 21:31:00 +0100 Subject: Admin: Archaeological operations: add order for ReportState --- archaeological_operations/admin.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) (limited to 'archaeological_operations') diff --git a/archaeological_operations/admin.py b/archaeological_operations/admin.py index d8ce23662..3abf22eae 100644 --- a/archaeological_operations/admin.py +++ b/archaeological_operations/admin.py @@ -105,10 +105,16 @@ class ActTypeAdmin(GeneralTypeAdmin): list_filter = ('intented_to',) list_display = ['label', 'txt_idx', 'available', 'intented_to'] - admin.site.register(models.ActType, ActTypeAdmin) -general_models = [models.RemainType, models.ReportState] + +class ReportStateAdmin(GeneralTypeAdmin): + list_display = ['label', 'txt_idx', 'available', 'order', 'comment'] + +admin.site.register(models.ReportState, ReportStateAdmin) + + +general_models = [models.RemainType] for model in general_models: admin.site.register(model, GeneralTypeAdmin) -- cgit v1.2.3 From 84199607f2f323e6df1458c41ea7c02d6ea2cbba Mon Sep 17 00:00:00 2001 From: Étienne Loks Date: Sun, 5 Feb 2017 18:57:36 +0100 Subject: Imports: manage model limitation (don't create items not in the list) --- archaeological_operations/tests.py | 118 +++++++++++++++++++++++++++---------- ishtar_common/data_importer.py | 93 ++++++++++++++++++++++++++--- ishtar_common/models.py | 16 ++++- ishtar_common/tests.py | 4 +- 4 files changed, 187 insertions(+), 44 deletions(-) (limited to 'archaeological_operations') diff --git a/archaeological_operations/tests.py b/archaeological_operations/tests.py index d4134693f..f2126a68e 100644 --- a/archaeological_operations/tests.py +++ b/archaeological_operations/tests.py @@ -35,7 +35,7 @@ import models from archaeological_operations import views from ishtar_common.models import OrganizationType, Organization, \ - ImporterType, IshtarUser, TargetKey, IshtarSiteProfile, Town + ImporterType, IshtarUser, TargetKey, ImporterModel, IshtarSiteProfile, Town from ishtar_common import forms_common from ishtar_common.tests import WizardTest, WizardTestFormData as FormData, \ @@ -65,41 +65,22 @@ class ImportOperationTest(TestCase): self.username, self.password, self.user = create_superuser() self.ishtar_user = IshtarUser.objects.get(pk=self.user.pk) - def testMCCImportOperation(self, test=True): - # MCC opérations - if self.test_operations is False: - test = False - first_ope_nb = models.Operation.objects.count() - MCC_OPERATION = ImporterType.objects.get(name=u"MCC - Opérations") + def init_ope_import(self): + mcc_operation = ImporterType.objects.get(name=u"MCC - Opérations") mcc_operation_file = open( settings.ROOT_PATH + '../archaeological_operations/tests/MCC-operations-example.csv', 'rb') file_dict = {'imported_file': SimpleUploadedFile( mcc_operation_file.name, mcc_operation_file.read())} - post_dict = {'importer_type': MCC_OPERATION.pk, 'skip_lines': 1, + post_dict = {'importer_type': mcc_operation.pk, 'skip_lines': 1, "encoding": 'utf-8'} - form = forms_common.NewImportForm(data=post_dict, files=file_dict, - instance=None) + form = forms_common.NewImportForm(data=post_dict, files=file_dict) form.is_valid() - if test: - self.assertTrue(form.is_valid()) - impt = form.save(self.ishtar_user) - target_key_nb = TargetKey.objects.count() - impt.initialize() - # new key have to be set - if test: - self.assertTrue(TargetKey.objects.count() > target_key_nb) + return mcc_operation, form - # first try to import - impt.importation() - current_ope_nb = models.Operation.objects.count() - # no new operation imported because of a missing connection for - # operation_type value - if test: - self.assertTrue(current_ope_nb == first_ope_nb) - - # doing manualy connections + def init_ope_targetkey(self, imp): + # doing manually connections tg = TargetKey.objects.filter(target__target='operation_type' ).order_by('-pk').all()[0] tg.value = models.OperationType.objects.get( @@ -107,18 +88,44 @@ class ImportOperationTest(TestCase): tg.is_set = True tg.save() - target = TargetKey.objects.get(key='gallo-romain') + target = TargetKey.objects.get(key='gallo-romain', + associated_import=imp) gallo = models.Period.objects.get(txt_idx='gallo-roman') target.value = gallo.pk target.is_set = True target.save() - target = TargetKey.objects.get(key='age-du-fer') + target = TargetKey.objects.get(key='age-du-fer', + associated_import=imp) iron = models.Period.objects.get(txt_idx='iron_age') target.value = iron.pk target.is_set = True target.save() + def test_mcc_import_operation(self, test=True): + # MCC opérations + if self.test_operations is False: + test = False + first_ope_nb = models.Operation.objects.count() + importer, form = self.init_ope_import() + if test: + self.assertTrue(form.is_valid()) + impt = form.save(self.ishtar_user) + target_key_nb = TargetKey.objects.count() + impt.initialize() + # new key have to be set + if test: + self.assertTrue(TargetKey.objects.count() > target_key_nb) + + # first try to import + impt.importation() + current_ope_nb = models.Operation.objects.count() + # no new operation imported because of a missing connection for + # operation_type value + if test: + self.assertTrue(current_ope_nb == first_ope_nb) + self.init_ope_targetkey(imp=impt) + impt.importation() if not test: return @@ -131,8 +138,9 @@ class ImportOperationTest(TestCase): self.assertTrue(last_ope.code_patriarche == 4200) self.assertTrue(last_ope.operation_type.txt_idx == 'prog_excavation') self.assertEqual(last_ope.periods.count(), 2) - periods = last_ope.periods.all() - self.assertTrue(iron in periods and gallo in periods) + periods = [period.txt_idx for period in last_ope.periods.all()] + self.assertIn('iron_age', periods) + self.assertIn('gallo-roman', periods) # a second importation will be not possible: no two same patriarche # code @@ -141,10 +149,56 @@ class ImportOperationTest(TestCase): self.assertTrue(last_ope == models.Operation.objects.order_by('-pk').all()[0]) + def test_model_limitation(self): + importer, form = self.init_ope_import() + importer.created_models.clear() + impt = form.save(self.ishtar_user) + impt.initialize() + self.init_ope_targetkey(imp=impt) + + # no model defined in created_models: normal import + init_ope_number = models.Operation.objects.count() + impt.importation() + current_ope_nb = models.Operation.objects.count() + self.assertEqual(current_ope_nb, init_ope_number + 1) + + last_ope = models.Operation.objects.order_by('-pk').all()[0] + last_ope.delete() + + importer, form = self.init_ope_import() + # add an inadequate model to make created_models non empty + importer.created_models.clear() + importer.created_models.add(ImporterModel.objects.get( + klass='ishtar_common.models.Organization' + )) + impt = form.save(self.ishtar_user) + impt.initialize() + self.init_ope_targetkey(imp=impt) + + # no imports + impt.importation() + current_ope_nb = models.Operation.objects.count() + self.assertEqual(current_ope_nb, init_ope_number) + + importer, form = self.init_ope_import() + # add operation model to allow creation + importer.created_models.clear() + importer.created_models.add(ImporterModel.objects.get( + klass='archaeological_operations.models.Operation' + )) + impt = form.save(self.ishtar_user) + impt.initialize() + self.init_ope_targetkey(imp=impt) + + # import of operations + impt.importation() + current_ope_nb = models.Operation.objects.count() + self.assertEqual(current_ope_nb, init_ope_number + 1) + def testMCCImportParcels(self, test=True): if self.test_operations is False: test = False - self.testMCCImportOperation(test=False) + self.test_mcc_import_operation(test=False) old_nb = models.Parcel.objects.count() MCC_PARCEL = ImporterType.objects.get(name=u"MCC - Parcelles") mcc_file = open( diff --git a/ishtar_common/data_importer.py b/ishtar_common/data_importer.py index de4883c69..79259b76d 100644 --- a/ishtar_common/data_importer.py +++ b/ishtar_common/data_importer.py @@ -29,6 +29,7 @@ import zipfile from django.conf import settings from django.contrib.auth.models import User +from django.core.exceptions import ImproperlyConfigured from django.core.files import File from django.db import IntegrityError, DatabaseError, transaction from django.template.defaultfilters import slugify @@ -613,6 +614,8 @@ class Importer(object): OBJECT_CLS = None IMPORTED_LINE_FIELD = None UNICITY_KEYS = [] + # if set only models inside this list can be created + MODEL_CREATION_LIMIT = [] EXTRA_DEFAULTS = {} DEFAULTS = {} ERRORS = { @@ -626,10 +629,19 @@ class Importer(object): 'no_data': _(u"No data provided"), 'value_required': _(u"Value is required"), 'not_enough_cols': _(u"At least %d columns must be filled"), - 'regex_not_match': _(u"The regexp doesn't match.") + 'regex_not_match': _(u"The regexp doesn't match."), + 'improperly_configured': _( + u"Force creation is set for model {} but this model is not in the " + u"list of model allowed to be created."), + 'does_not_exist_in_db': _(u"{} with values {} doesn't exist in the " + u"database. Create it first or fix your source file"), } def _create_models(self, force=False): + """ + Create a db config from a hardcoded import. + Not useful anymore? + """ from ishtar_common import models q = models.ImporterType.objects.filter(slug=self.SLUG) if not force and (not self.SLUG or q.count()): @@ -1009,11 +1021,32 @@ class Importer(object): if k not in formater.through_unicity_keys \ and k != 'defaults': data['defaults'][k] = data.pop(k) + created = False if '__force_new' in data: + if self.MODEL_CREATION_LIMIT and \ + through_cls not in self.MODEL_CREATION_LIMIT: + raise ImproperlyConfigured( + unicode(self.ERRORS[ 'improperly_configured']).format( + through_cls)) created = data.pop('__force_new') t_obj = through_cls.objects.create(**data) else: - t_obj, created = through_cls.objects.get_or_create(**data) + if not self.MODEL_CREATION_LIMIT or \ + through_cls in self.MODEL_CREATION_LIMIT: + t_obj, created = through_cls.objects.get_or_create(**data) + else: + get_data = data.copy() + if 'defaults' in get_data: + get_data.pop('defaults') + try: + t_obj = through_cls.objects.get(**get_data) + except through_cls.DoesNotExist: + values = u", ".join( + [u"{}: {}".format(k, get_data[k]) for k in get_data] + ) + raise ImporterError( + unicode(self.ERRORS['does_not_exist_in_db'] + ).format(through_cls, values)) if not created and 'defaults' in data: for k in data['defaults']: setattr(t_obj, k, data['defaults'][k]) @@ -1247,6 +1280,12 @@ class Importer(object): new_created[attribute].append(key) has_values = bool([1 for k in v if v[k]]) if has_values: + if self.MODEL_CREATION_LIMIT and \ + model not in self.MODEL_CREATION_LIMIT: + raise ImproperlyConfigured( + unicode( + self.ERRORS['improperly_configured'] + ).format(model)) v = model.objects.create(**v) else: continue @@ -1255,14 +1294,32 @@ class Importer(object): extra_fields = {} # "File" type is a temp object and can be different # for the same filename - it must be treated - # separatly + # separately for field in model._meta.fields: k = field.name - # attr_class est un attribut de FileField + # attr_class is a FileField attribute if hasattr(field, 'attr_class') and k in v: extra_fields[k] = v.pop(k) - v, created = model.objects.get_or_create( - **v) + if not self.MODEL_CREATION_LIMIT or \ + model in self.MODEL_CREATION_LIMIT: + v, created = model.objects.get_or_create( + **v) + else: + get_v = v.copy() + if 'defaults' in get_v: + get_v.pop('defaults') + try: + v = model.objects.get(**get_v) + except model.DoesNotExist: + values = u", ".join( + [u"{}: {}".format(k, get_v[k]) + for k in get_v] + ) + raise ImporterError( + unicode( + self.ERRORS[ + 'does_not_exist_in_db'] + ).format(model, values)) changed = False for k in extra_fields.keys(): if extra_fields[k]: @@ -1336,6 +1393,7 @@ class Importer(object): 'history_modifier': create_dict.pop('history_modifier') }) + created = False try: try: dct = create_dict.copy() @@ -1348,6 +1406,11 @@ class Importer(object): return None, created new_dct = defaults.copy() new_dct.update(dct) + if self.MODEL_CREATION_LIMIT and \ + cls not in self.MODEL_CREATION_LIMIT: + raise ImproperlyConfigured( + unicode(self.ERRORS[ 'improperly_configured'] + ).format(cls)) obj = cls.objects.create(**new_dct) else: # manage UNICITY_KEYS - only level 1 @@ -1356,9 +1419,21 @@ class Importer(object): if k not in self.UNICITY_KEYS \ and k != 'defaults': defaults[k] = dct.pop(k) - - dct['defaults'] = defaults.copy() - obj, created = cls.objects.get_or_create(**dct) + if not self.MODEL_CREATION_LIMIT or \ + cls in self.MODEL_CREATION_LIMIT: + dct['defaults'] = defaults.copy() + obj, created = cls.objects.get_or_create(**dct) + else: + try: + obj = cls.objects.get(**dct) + dct['defaults'] = defaults.copy() + except cls.DoesNotExist: + values = u", ".join( + [u"{}: {}".format(k, dct[k]) for k in dct] + ) + raise ImporterError( + unicode(self.ERRORS['does_not_exist_in_db'] + ).format(cls, values)) if not created and not path and self.UNICITY_KEYS: changed = False diff --git a/ishtar_common/models.py b/ishtar_common/models.py index 6cf5bff7d..c27f9cc29 100644 --- a/ishtar_common/models.py +++ b/ishtar_common/models.py @@ -35,7 +35,8 @@ import zipfile from django.conf import settings from django.core.cache import cache -from django.core.exceptions import ObjectDoesNotExist, ValidationError +from django.core.exceptions import ObjectDoesNotExist, ValidationError, \ + SuspiciousOperation from django.core.files import File from django.core.files.uploadedfile import SimpleUploadedFile from django.core.validators import validate_slug @@ -1723,9 +1724,16 @@ def get_model_fields(model): def import_class(full_path_classname): + """ + Return the model class from the full path + TODO: add a white list for more security + """ mods = full_path_classname.split('.') if len(mods) == 1: mods = ['ishtar_common', 'models', mods[0]] + elif 'models' not in mods: + raise SuspiciousOperation( + u"Try to import a non model from a string") module = import_module('.'.join(mods[:-1])) return getattr(module, mods[-1]) @@ -1820,9 +1828,13 @@ class ImporterType(models.Model): UNICITY_KEYS = [] if self.unicity_keys: UNICITY_KEYS = [un.strip() for un in self.unicity_keys.split(';')] + MODEL_CREATION_LIMIT = [] + for modls in self.created_models.all(): + MODEL_CREATION_LIMIT.append(import_class(modls.klass)) args = {'OBJECT_CLS': OBJECT_CLS, 'DESC': self.description, 'DEFAULTS': DEFAULTS, 'LINE_FORMAT': LINE_FORMAT, - 'UNICITY_KEYS': UNICITY_KEYS} + 'UNICITY_KEYS': UNICITY_KEYS, + 'MODEL_CREATION_LIMIT': MODEL_CREATION_LIMIT} name = str(''.join( x for x in slugify(self.name).replace('-', ' ').title() if not x.isspace())) diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py index a512dcc07..a3fa62ce7 100644 --- a/ishtar_common/tests.py +++ b/ishtar_common/tests.py @@ -27,6 +27,7 @@ from django.contrib.contenttypes.models import ContentType from django.core.cache import cache from django.core.exceptions import ValidationError from django.core.files.base import File as DjangoFile +from django.core.files.uploadedfile import SimpleUploadedFile from django.core.management import call_command from django.core.urlresolvers import reverse from django.template.defaultfilters import slugify @@ -35,6 +36,7 @@ from django.test.client import Client from django.test.simple import DjangoTestSuiteRunner from ishtar_common import models +from ishtar_common import forms_common from ishtar_common.utils import post_save_point """ @@ -275,7 +277,7 @@ class AdminGenTypeTest(TestCase): models.OrganizationType, models.PersonType, models.TitleType, models.AuthorType, models.SourceType, models.OperationType, models.SpatialReferenceSystem, models.Format, models.SupportType] - models_with_data = gen_models + [models.ImporterModel] + models_with_data = gen_models + [models.ImporterModel] models = models_with_data module_name = 'ishtar_common' -- cgit v1.2.3 From 9f1d5342e130d0371927063447682728ab666402 Mon Sep 17 00:00:00 2001 From: Étienne Loks Date: Mon, 6 Feb 2017 12:23:49 +0100 Subject: Test: refactor - should be a little bit faster --- archaeological_context_records/tests.py | 53 +++--------- archaeological_finds/tests.py | 28 +++---- archaeological_operations/tests.py | 141 ++++++++++++++++++++------------ 3 files changed, 112 insertions(+), 110 deletions(-) (limited to 'archaeological_operations') diff --git a/archaeological_context_records/tests.py b/archaeological_context_records/tests.py index 87eb80c57..032861d5d 100644 --- a/archaeological_context_records/tests.py +++ b/archaeological_context_records/tests.py @@ -29,67 +29,40 @@ from django.test.client import Client from ishtar_common.models import ImporterType, IshtarSiteProfile from ishtar_common.tests import create_superuser from archaeological_operations.tests import OperationInitTest, \ - ImportOperationTest + ImportTest, ImportOperationTest from archaeological_operations import models as models_ope from archaeological_context_records import models -from ishtar_common import forms_common -class ImportContextRecordTest(ImportOperationTest): - test_operations = False - test_context_records = True +class ImportContextRecordTest(ImportTest, TestCase): fixtures = ImportOperationTest.fixtures + [ settings.ROOT_PATH + '../archaeological_context_records/fixtures/initial_data-fr.json', ] - def testMCCImportContextRecords(self, test=True): - if test and not self.test_context_records: - return - self.testMCCImportParcels(test=False) - + def test_mcc_import_contextrecords(self): old_nb = models.ContextRecord.objects.count() - MCC = ImporterType.objects.get(name=u"MCC - UE") - mcc_file = open( - settings.ROOT_PATH + - '../archaeological_context_records/tests/' - 'MCC-context-records-example.csv', 'rb') - file_dict = {'imported_file': SimpleUploadedFile(mcc_file.name, - mcc_file.read())} - post_dict = {'importer_type': MCC.pk, 'skip_lines': 1, - "encoding": 'utf-8'} - form = forms_common.NewImportForm(data=post_dict, files=file_dict, - instance=None) - form.is_valid() - if test: - self.assertTrue(form.is_valid()) + mcc, form = self.init_context_record_import() + + self.assertTrue(form.is_valid()) impt = form.save(self.ishtar_user) impt.initialize() - # doing manual connections - hc = models.Unit.objects.get(txt_idx='not_in_context').pk - self.setTargetKey('unit', 'hc', hc) - self.setTargetKey('unit', 'hors-contexte', hc) - layer = models.Unit.objects.get(txt_idx='layer').pk - self.setTargetKey('unit', 'couche', layer) - + self.init_cr_targetkey(impt) impt.importation() - if not test: - return - - # new ues has now been imported + # new context records has now been imported current_nb = models.ContextRecord.objects.count() - self.assertTrue(current_nb == (old_nb + 4)) + self.assertEqual(current_nb, old_nb + 4) self.assertEqual( - models.ContextRecord.objects.filter(unit__pk=hc).count(), 3) + models.ContextRecord.objects.filter( + unit__txt_idx='not_in_context').count(), 3) self.assertEqual( - models.ContextRecord.objects.filter(unit__pk=layer).count(), 1) + models.ContextRecord.objects.filter( + unit__txt_idx='layer').count(), 1) class ContextRecordInit(OperationInitTest): - test_operations = False - def create_context_record(self, user=None, data={}, force=False): if not getattr(self, 'context_records', None): self.context_records = [] diff --git a/archaeological_finds/tests.py b/archaeological_finds/tests.py index f0112123b..a1dc33dd8 100644 --- a/archaeological_finds/tests.py +++ b/archaeological_finds/tests.py @@ -33,11 +33,12 @@ from archaeological_context_records.models import Period, Dating from archaeological_finds import models, views from archaeological_warehouse.models import Warehouse, WarehouseType -from archaeological_context_records.tests import ImportContextRecordTest, \ - ContextRecordInit - from ishtar_common import forms_common + from ishtar_common.tests import WizardTest, WizardTestFormData as FormData +from archaeological_operations.tests import ImportTest +from archaeological_context_records.tests import ImportContextRecordTest, \ + ContextRecordInit class FindInit(ContextRecordInit): @@ -217,17 +218,14 @@ class ATreatmentWizardCreationTest(WizardTest, FindInit, TestCase): treat) -class ImportFindTest(ImportContextRecordTest): - test_operations = False - test_context_records = False - +class ImportFindTest(ImportTest, TestCase): fixtures = ImportContextRecordTest.fixtures + [ settings.ROOT_PATH + '../archaeological_finds/fixtures/initial_data-fr.json', ] - def testMCCImportFinds(self, test=True): - self.testMCCImportContextRecords(test=False) + def test_mcc_import_finds(self): + self.init_context_record() old_nb = models.BaseFind.objects.count() old_nb_find = models.Find.objects.count() @@ -252,22 +250,18 @@ class ImportFindTest(ImportContextRecordTest): mcc_images.read())} post_dict = {'importer_type': MCC.pk, 'skip_lines': 1, "encoding": 'utf-8'} - form = forms_common.NewImportForm(data=post_dict, files=file_dict, - instance=None) + form = forms_common.NewImportForm(data=post_dict, files=file_dict) form.is_valid() - if test: - self.assertTrue(form.is_valid()) + self.assertTrue(form.is_valid()) impt = form.save(self.ishtar_user) impt.initialize() # doing manual connections ceram = models.MaterialType.objects.get(txt_idx='ceramic').pk glass = models.MaterialType.objects.get(txt_idx='glass').pk - self.setTargetKey('find__material_types', 'terre-cuite', ceram) - self.setTargetKey('find__material_types', 'verre', glass) + self.set_target_key('find__material_types', 'terre-cuite', ceram) + self.set_target_key('find__material_types', 'verre', glass) impt.importation() - if not test: - return # new finds has now been imported current_nb = models.BaseFind.objects.count() self.assertEqual(current_nb, (old_nb + 4)) diff --git a/archaeological_operations/tests.py b/archaeological_operations/tests.py index f2126a68e..040c7c3d8 100644 --- a/archaeological_operations/tests.py +++ b/archaeological_operations/tests.py @@ -36,35 +36,27 @@ from archaeological_operations import views from ishtar_common.models import OrganizationType, Organization, \ ImporterType, IshtarUser, TargetKey, ImporterModel, IshtarSiteProfile, Town +from archaeological_context_records.models import Unit from ishtar_common import forms_common from ishtar_common.tests import WizardTest, WizardTestFormData as FormData, \ create_superuser, create_user -class ImportOperationTest(TestCase): - fixtures = [settings.ROOT_PATH + - '../fixtures/initial_data-auth-fr.json', - settings.ROOT_PATH + - '../ishtar_common/fixtures/initial_data-fr.json', - settings.ROOT_PATH + - '../ishtar_common/fixtures/test_towns.json', - settings.ROOT_PATH + - '../ishtar_common/fixtures/initial_importtypes-fr.json', - settings.ROOT_PATH + - '../archaeological_operations/fixtures/initial_data-fr.json'] - test_operations = True +class ImportTest(object): + def setUp(self): + self.username, self.password, self.user = create_superuser() + self.ishtar_user = IshtarUser.objects.get(pk=self.user.pk) - def setTargetKey(self, target, key, value): - tg = TargetKey.objects.get(target__target=target, key=key) + def set_target_key(self, target, key, value, imp=None): + keys = {'target__target': target, 'key': key} + if imp: + keys['associated_import'] = imp + tg = TargetKey.objects.get(**keys) tg.value = value tg.is_set = True tg.save() - def setUp(self): - self.username, self.password, self.user = create_superuser() - self.ishtar_user = IshtarUser.objects.get(pk=self.user.pk) - def init_ope_import(self): mcc_operation = ImporterType.objects.get(name=u"MCC - Opérations") mcc_operation_file = open( @@ -102,33 +94,95 @@ class ImportOperationTest(TestCase): target.is_set = True target.save() - def test_mcc_import_operation(self, test=True): - # MCC opérations - if self.test_operations is False: - test = False + def init_ope(self): + importer, form = self.init_ope_import() + impt = form.save(self.ishtar_user) + impt.initialize() + self.init_ope_targetkey(imp=impt) + impt.importation() + + def init_parcel_import(self): + self.init_ope() + mcc_parcel = ImporterType.objects.get(name=u"MCC - Parcelles") + mcc_file = open( + settings.ROOT_PATH + + '../archaeological_operations/tests/MCC-parcelles-example.csv', + 'rb') + file_dict = {'imported_file': SimpleUploadedFile(mcc_file.name, + mcc_file.read())} + post_dict = {'importer_type': mcc_parcel.pk, 'skip_lines': 1, + "encoding": 'utf-8'} + form = forms_common.NewImportForm(data=post_dict, files=file_dict) + form.is_valid() + return mcc_parcel, form + + def init_parcel(self): + importer, form = self.init_parcel_import() + impt = form.save(self.ishtar_user) + impt.initialize() + impt.importation() + + def init_context_record_import(self): + self.init_parcel() + mcc = ImporterType.objects.get(name=u"MCC - UE") + mcc_file = open( + settings.ROOT_PATH + + '../archaeological_context_records/tests/' + 'MCC-context-records-example.csv', 'rb') + file_dict = {'imported_file': SimpleUploadedFile(mcc_file.name, + mcc_file.read())} + post_dict = {'importer_type': mcc.pk, 'skip_lines': 1, + "encoding": 'utf-8'} + form = forms_common.NewImportForm(data=post_dict, files=file_dict) + form.is_valid() + return mcc, form + + def init_cr_targetkey(self, imp): + hc = Unit.objects.get(txt_idx='not_in_context').pk + self.set_target_key('unit', 'hc', hc, imp=imp) + self.set_target_key('unit', 'hors-contexte', hc, imp=imp) + layer = Unit.objects.get(txt_idx='layer').pk + self.set_target_key('unit', 'couche', layer, imp=imp) + + def init_context_record(self): + mcc, form = self.init_context_record_import() + impt = form.save(self.ishtar_user) + impt.initialize() + self.init_cr_targetkey(impt) + impt.importation() + + +class ImportOperationTest(ImportTest, TestCase): + fixtures = [settings.ROOT_PATH + + '../fixtures/initial_data-auth-fr.json', + settings.ROOT_PATH + + '../ishtar_common/fixtures/initial_data-fr.json', + settings.ROOT_PATH + + '../ishtar_common/fixtures/test_towns.json', + settings.ROOT_PATH + + '../ishtar_common/fixtures/initial_importtypes-fr.json', + settings.ROOT_PATH + + '../archaeological_operations/fixtures/initial_data-fr.json'] + + def test_mcc_import_operation(self): first_ope_nb = models.Operation.objects.count() importer, form = self.init_ope_import() - if test: - self.assertTrue(form.is_valid()) + self.assertTrue(form.is_valid()) impt = form.save(self.ishtar_user) target_key_nb = TargetKey.objects.count() impt.initialize() # new key have to be set - if test: - self.assertTrue(TargetKey.objects.count() > target_key_nb) + self.assertTrue(TargetKey.objects.count() > target_key_nb) # first try to import impt.importation() current_ope_nb = models.Operation.objects.count() # no new operation imported because of a missing connection for # operation_type value - if test: - self.assertTrue(current_ope_nb == first_ope_nb) + self.assertTrue(current_ope_nb == first_ope_nb) self.init_ope_targetkey(imp=impt) impt.importation() - if not test: - return # a new operation has now been imported current_ope_nb = models.Operation.objects.count() self.assertTrue(current_ope_nb == (first_ope_nb + 1)) @@ -195,33 +249,16 @@ class ImportOperationTest(TestCase): current_ope_nb = models.Operation.objects.count() self.assertEqual(current_ope_nb, init_ope_number + 1) - def testMCCImportParcels(self, test=True): - if self.test_operations is False: - test = False - self.test_mcc_import_operation(test=False) + def test_mcc_import_parcels(self): old_nb = models.Parcel.objects.count() - MCC_PARCEL = ImporterType.objects.get(name=u"MCC - Parcelles") - mcc_file = open( - settings.ROOT_PATH + - '../archaeological_operations/tests/MCC-parcelles-example.csv', - 'rb') - file_dict = {'imported_file': SimpleUploadedFile(mcc_file.name, - mcc_file.read())} - post_dict = {'importer_type': MCC_PARCEL.pk, 'skip_lines': 1, - "encoding": 'utf-8'} - form = forms_common.NewImportForm(data=post_dict, files=file_dict, - instance=None) - form.is_valid() - if test: - self.assertTrue(form.is_valid()) + mcc_parcel, form = self.init_parcel_import() impt = form.save(self.ishtar_user) impt.initialize() impt.importation() - if not test: - return # new parcels has now been imported current_nb = models.Parcel.objects.count() - self.assertTrue(current_nb == (old_nb + 2)) + self.assertEqual(current_nb, old_nb + 2) + # and well imported last_parcels = models.Parcel.objects.order_by('-pk').all()[0:2] external_ids = sorted(['4200-59350-YY55', '4200-75101-XXXX']) @@ -249,8 +286,6 @@ class ImportOperationTest(TestCase): self.assertEqual(parcel_count - 2, models.Parcel.objects.count()) def testParseParcels(self): - if not self.test_operations: - return # the database needs to be initialised before importing from archaeological_operations.import_from_csv import parse_parcels # default_town = Town.objects.create(numero_insee="12345", -- cgit v1.2.3