#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2012-2017 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import json import datetime from subprocess import Popen, PIPE from io import StringIO, BytesIO import tempfile import locale import zipfile from django.apps import apps from django.conf import settings from django.contrib.auth.models import Group from django.contrib.contenttypes.models import ContentType from django.core.files.uploadedfile import SimpleUploadedFile from django.core.urlresolvers import reverse from django.db.models import Q from django.test.client import Client from django.utils.text import slugify from django.contrib.auth.models import User, Permission from django.utils.translation import ugettext_lazy as _, pgettext, pgettext_lazy from . import models from ishtar_common.views import document_deletion_steps from ishtar_common.serializers import document_serialization from archaeological_operations import views, serializers from ishtar_common.models import ( OrganizationType, Organization, ItemKey, ImporterType, IshtarUser, TargetKey, ImporterModel, IshtarSiteProfile, Town, ImporterColumn, Person, Author, SourceType, AuthorType, DocumentTemplate, PersonType, TargetKeyGroup, JsonDataField, JsonDataSection, ImportTarget, FormaterType, CustomForm, ExcludedField, UserProfile, ProfileType, Area, CustomFormJsonField, get_current_profile, Document, ValueFormater, Regexp, ) from archaeological_files.models import File, FileType from archaeological_context_records.models import Unit, ContextRecord from ishtar_common import forms_common from ishtar_common.tests import ( WizardTest, WizardTestFormData as FormData, create_superuser, create_user, TestCase, OPERATION_FIXTURES, AutocompleteTestBase, AcItem, OPERATION_TOWNS_FIXTURES, FILE_FIXTURES, COMMON_FIXTURES, GenericSerializationTest, WAREHOUSE_FIXTURES, SearchText, ) from ishtar_common.serializers import restore_serialized class FileInit(object): def login_as_superuser(self): self.client.login(username="username", password="tralala") def create_file(self): self.extra_models, self.model_list = {}, [] self.user, created = User.objects.get_or_create( username="username", is_superuser=True ) self.user.set_password("tralala") self.user.save() self.o_user, created = User.objects.get_or_create(username="ousername") person_type, created = PersonType.objects.get_or_create( label="Test person type", txt_idx="test_person", available=True ) self.extra_models["person_type"] = person_type self.model_list.append(person_type) person = models.Person( surname="Surname", name="Name", history_modifier=self.o_user ) person.save() self.extra_models["person"] = person self.model_list.append(person) file_type, created = FileType.objects.get_or_create( label="Test file type", txt_idx="test_file", available=True ) self.extra_models["file_type"] = file_type self.model_list.append(file_type) dct = { "year": 2010, "numeric_reference": 1000, "file_type": file_type, "internal_reference": "UNIT_testÉ ?", "in_charge": person, "history_modifier": self.o_user, "total_surface": 10000, } self.item = File(**dct) self.item.save() class ImportTest(object): def setUp(self): self.username, self.password, self.user = create_superuser() self.ishtar_user = IshtarUser.objects.get(pk=self.user.pk) def set_target_key(self, target, key, value, imp=None): keys = {"target__target": target, "key": key} if imp: keys["associated_import"] = imp tg = TargetKey.objects.get(**keys) tg.value = value tg.is_set = True tg.save() def init_ope_import(self, filename="MCC-operations-example.csv", sep=","): mcc_operation = ImporterType.objects.get(name="MCC - Opérations") mcc_operation_file = open( settings.ROOT_PATH + "../archaeological_operations/tests/" + filename, "rb" ) file_dict = { "imported_file": SimpleUploadedFile( mcc_operation_file.name, mcc_operation_file.read() ) } group, c = TargetKeyGroup.objects.get_or_create(name="My group") post_dict = { "importer_type": mcc_operation.pk, "skip_lines": 1, "encoding": "utf-8", "name": "init_ope_import", "associated_group": group.pk, "csv_sep": sep, } form = forms_common.NewImportForm( data=post_dict, files=file_dict, user=self.user ) form.is_valid() return mcc_operation, form def init_ope_targetkey(self, imp): # doing manually connections q = Q(importer=imp) | Q(user=imp.user) if imp.associated_group: q |= Q(group=imp.associated_group) for ik in ItemKey.objects.filter(q).all(): ik.delete() # target for this import target = ( TargetKey.objects.filter(target__target="operation_type") .order_by("-pk") .all()[0] ) target.value = models.OperationType.objects.get(txt_idx="prog_excavation").pk target.is_set = True target.associated_import = imp target.save() # target for all users tgs = list(TargetKey.objects.filter(key="gallo-romain").all()) for tg in tgs[1:]: tg.delete() target2 = tgs[0] gallo = models.Period.objects.get(txt_idx="gallo-roman") target2.value = gallo.pk target2.is_set = True target2.associated_import = None target2.associated_group = None target2.associated_user = None target2.save() # target for this user tgs = list(TargetKey.objects.filter(key="age-du-fer").all()) for tg in tgs[1:]: tg.delete() target3 = tgs[0] iron = models.Period.objects.get(txt_idx="iron-age") target3.value = iron.pk target3.is_set = True target3.associated_import = None target3.associated_user = self.ishtar_user target3.associated_group = None target3.save() # target for another user username, password, user = create_user() another_user = IshtarUser.objects.get(pk=user.pk) tgs = list(TargetKey.objects.filter(key="neolithik").all()) for tg in tgs[1:]: tg.delete() target4 = tgs[0] neo = models.Period.objects.get(txt_idx="neolithic") target4.value = neo.pk target4.is_set = True target4.associated_import = None target4.associated_group = None target4.associated_user = another_user target4.save() # target for the current group tgs = list(TargetKey.objects.filter(key="moderne").all()) for tg in tgs[1:]: tg.delete() target5 = tgs[0] modern = models.Period.objects.get(txt_idx="modern") target5.value = modern.pk target5.is_set = True target5.associated_import = None target5.associated_user = None target5.associated_group = imp.associated_group target5.save() return [target, target2, target3, target4, target5] def init_ope(self): importer, form = self.init_ope_import() impt = form.save(self.ishtar_user) impt.initialize() self.init_ope_targetkey(imp=impt) impt.importation() def init_parcel_import(self): self.init_ope() mcc_parcel = ImporterType.objects.get(name="MCC - Parcelles") mcc_file = open( settings.ROOT_PATH + "../archaeological_operations/tests/MCC-parcelles-example.csv", "rb", ) file_dict = { "imported_file": SimpleUploadedFile(mcc_file.name, mcc_file.read()) } post_dict = { "importer_type": mcc_parcel.pk, "skip_lines": 1, "encoding": "utf-8", "name": "init_parcel_import", "csv_sep": ",", } form = forms_common.NewImportForm( data=post_dict, files=file_dict, user=self.user ) form.is_valid() return mcc_parcel, form def init_parcel(self): importer, form = self.init_parcel_import() impt = form.save(self.ishtar_user) impt.initialize() impt.importation() def init_context_record_import(self): self.init_parcel() mcc = ImporterType.objects.get(name="MCC - UE") mcc_file = open( settings.ROOT_PATH + "../archaeological_context_records/tests/" "MCC-context-records-example.csv", "rb", ) file_dict = { "imported_file": SimpleUploadedFile(mcc_file.name, mcc_file.read()) } post_dict = { "importer_type": mcc.pk, "skip_lines": 1, "encoding": "utf-8", "name": "init_context_record_import", "csv_sep": ",", } form = forms_common.NewImportForm( data=post_dict, files=file_dict, user=self.user ) form.is_valid() return mcc, form def init_cr_targetkey(self, imp): hc = Unit.objects.get(txt_idx="not_in_context").pk self.set_target_key("unit", "hc", hc, imp=imp) self.set_target_key("unit", "hors-contexte", hc, imp=imp) layer = Unit.objects.get(txt_idx="negative").pk self.set_target_key("unit", "couche", layer, imp=imp) def init_context_record(self): mcc, form = self.init_context_record_import() impt = form.save(self.ishtar_user) impt.initialize() self.init_cr_targetkey(impt) impt.importation() class ImportOperationTest(ImportTest, TestCase): fixtures = OPERATION_TOWNS_FIXTURES def test_mcc_import_operation(self): create_user() # create it before import to have a relevant person # number first_person_nb = Person.objects.count() first_ope_nb = models.Operation.objects.count() importer, form = self.init_ope_import() self.assertTrue(form.is_valid()) impt = form.save(self.ishtar_user) target_key_nb = TargetKey.objects.count() impt.initialize() # new key have to be set self.assertTrue(TargetKey.objects.count() > target_key_nb) # first try to import impt.importation() current_ope_nb = models.Operation.objects.count() # no new operation imported because of a missing connection for # operation_type value self.assertEqual(current_ope_nb, first_ope_nb) self.init_ope_targetkey(imp=impt) impt.importation() # new operations have now been imported current_ope_nb = models.Operation.objects.count() self.assertEqual(current_ope_nb, first_ope_nb + 2) current_person_nb = Person.objects.count() self.assertEqual(current_person_nb, first_person_nb + 1) # and well imported last_ope = models.Operation.objects.order_by("-pk").all()[0] self.assertEqual(last_ope.name, "Oppìdum de Paris") self.assertEqual(last_ope.code_patriarche, "4200") self.assertEqual(last_ope.operation_type.txt_idx, "prog_excavation") self.assertEqual(last_ope.periods.count(), 3) periods = [period.txt_idx for period in last_ope.periods.all()] self.assertIn("iron-age", periods) self.assertIn("gallo-roman", periods) # target key set for another user self.assertNotIn("neolithic", periods) # a second importation will be not possible: no two same patriarche # code impt.importation() self.assertEqual(last_ope, models.Operation.objects.order_by("-pk").all()[0]) def test_import_bad_encoding(self): self.init_ope_import("MCC-operations-example-bad-encoding.csv") def test_import_semi_colon_sep(self): first_ope_nb = models.Operation.objects.count() importer, form = self.init_ope_import( "MCC-operations-example-semi-colon.csv", sep=";" ) self.assertTrue(form.is_valid()) impt = form.save(self.ishtar_user) impt.initialize() self.init_ope_targetkey(imp=impt) impt.importation() # new operations have now been imported current_ope_nb = models.Operation.objects.count() self.assertEqual(current_ope_nb, first_ope_nb + 2) def test_import_multi_column_concat(self): first_ope_nb = models.Operation.objects.count() importer, form = self.init_ope_import( "MCC-operations-example-multi-col-periods.csv" ) col = ImporterColumn.objects.create(col_number=12, importer_type_id=importer.pk) period_imp = ImporterColumn.objects.get( col_number=9, importer_type_id=importer.pk ) period_target = period_imp.targets.all()[0] target = ImportTarget.objects.create( column=col, target=period_target.target, formater_type=period_target.formater_type, concat=True, ) importer, form = self.init_ope_import( "MCC-operations-example-multi-col-periods.csv" ) self.assertTrue(form.is_valid()) impt = form.save(self.ishtar_user) impt.initialize() self.init_ope_targetkey(imp=impt) impt.importation() current_ope_nb = models.Operation.objects.count() self.assertEqual(current_ope_nb, first_ope_nb + 2) modern = models.Period.objects.get(txt_idx="modern") for ope in models.Operation.objects.order_by("-pk")[0:2]: self.assertIn(modern, list(ope.periods.all())) target.delete() col.delete() def test_import_value_format(self): importer, form = self.init_ope_import() column = importer.columns.get(col_number=1) f = ValueFormater.objects.create(name="-", slug="-", format_string="oa-{}") column.value_format = f column.save() self.assertTrue(form.is_valid()) impt = form.save(self.ishtar_user) impt.initialize() self.init_ope_targetkey(imp=impt) impt.importation() self.assertEqual( models.Operation.objects.filter(code_patriarche="oa-4201").count(), 1 ) self.assertEqual( models.Operation.objects.filter(code_patriarche="oa-4200").count(), 1 ) f.delete() def test_keys_limitation(self): # each key association associated to the import init_ope_number = models.Operation.objects.count() importer, form = self.init_ope_import() impt = form.save(self.ishtar_user) impt.initialize() self.init_ope_targetkey(imp=impt) importer, form = self.init_ope_import() other_imp = form.save(self.ishtar_user) # re-associate with another import q = Q(importer=impt) | Q(user=impt.user) if impt.associated_group: q |= Q(group=impt.associated_group) for ik in ItemKey.objects.filter(q).all(): ik.delete() q = Q(associated_import=impt) | Q(associated_user=impt.user) if impt.associated_group: q |= Q(associated_group=impt.associated_group) for tg in TargetKey.objects.filter(q).all(): tg.associated_user = None tg.associated_group = None tg.associated_import = other_imp tg.save() impt.importation() current_ope_nb = models.Operation.objects.count() # no new operation self.assertEqual(current_ope_nb, init_ope_number) for tg in TargetKey.objects.filter(associated_import=other_imp).all(): tg.associated_import = impt tg.save() def test_bad_configuration(self): importer, form = self.init_ope_import() col = ImporterColumn.objects.get(importer_type=importer, col_number=1) target = col.targets.all()[0] target.target = "cody" # random and not appropriate string target.save() # self.init_ope() # importer, form = self.init_ope_import() impt = form.save(self.ishtar_user) impt.initialize() self.init_ope_targetkey(imp=impt) impt.importation() self.assertEqual(len(impt.errors), 2) self.assertTrue("cody" in impt.errors[0]["error"]) self.assertTrue( "Importer configuration error" in impt.errors[0]["error"] or "Erreur de configuration de l'importeur" in impt.errors[0]["error"] ) def test_model_limitation(self): importer, form = self.init_ope_import() importer.created_models.clear() impt = form.save(self.ishtar_user) impt.initialize() self.init_ope_targetkey(imp=impt) # no model defined in created_models: normal import init_ope_number = models.Operation.objects.count() impt.importation() current_ope_nb = models.Operation.objects.count() self.assertEqual(current_ope_nb, init_ope_number + 2) for ope in models.Operation.objects.order_by("-pk").all()[:2]: ope.delete() importer, form = self.init_ope_import() # add an inadequate model to make created_models non empty importer.created_models.clear() importer.created_models.add( ImporterModel.objects.get(klass="ishtar_common.models.Organization") ) impt = form.save(self.ishtar_user) impt.initialize() self.init_ope_targetkey(imp=impt) # no imports impt.importation() current_ope_nb = models.Operation.objects.count() self.assertEqual(current_ope_nb, init_ope_number) importer, form = self.init_ope_import() # add operation model to allow creation importer.created_models.clear() importer.created_models.add( ImporterModel.objects.get( klass="archaeological_operations.models.Operation" ) ) impt = form.save(self.ishtar_user) impt.initialize() self.init_ope_targetkey(imp=impt) # import of operations impt.importation() current_ope_nb = models.Operation.objects.count() self.assertEqual(current_ope_nb, init_ope_number + 2) def test_export_libreoffice_template(self): if not settings.USE_LIBREOFFICE: # function not available return mcc_operation = ImporterType.objects.get(name="MCC - Opérations") generated_file = mcc_operation.get_libreoffice_template() zip_file = zipfile.ZipFile(generated_file) self.assertIsNone( zip_file.testzip(), "Libreoffice template generated " "is not a correct zip file.", ) filename = None for name in zip_file.namelist(): if name == "content.xml": filename = name break self.assertIsNotNone(filename) # only check that all operation types are listed in the source file with tempfile.TemporaryDirectory(prefix="tmp-ishtar-") as tmpdir: imported_file = zip_file.extract(filename, tmpdir) with open(imported_file) as content_file: content = content_file.read() for ope_type in models.OperationType.objects.all(): ope_type = str(ope_type).replace("'", "'") self.assertIn(ope_type, content) def test_mcc_import_parcels(self): old_nb = models.Parcel.objects.count() mcc_parcel, form = self.init_parcel_import() impt = form.save(self.ishtar_user) impt.initialize() impt.importation() # new parcels has now been imported current_nb = models.Parcel.objects.count() self.assertEqual(current_nb, old_nb + 3) # and well imported last_parcels = models.Parcel.objects.order_by("-pk").all()[0:3] external_ids = sorted(["4200-59350-YY55", "4200-75101-XXXX", "4201-59350-YY55"]) parcel_numbers = sorted(["42", "55", "55"]) sections = sorted(["ZX", "YY", "YY"]) self.assertEqual(external_ids, sorted([p.external_id for p in last_parcels])) self.assertEqual( parcel_numbers, sorted([p.parcel_number for p in last_parcels]) ) self.assertEqual(sections, sorted([p.section for p in last_parcels])) ope1 = models.Operation.objects.get(code_patriarche="4200") towns_ope = ope1.towns.all() imported = [imp for acc, imp in impt.get_all_imported()] for p in last_parcels: self.assertTrue(p.town in towns_ope) self.assertTrue(p in imported) self.assertEqual(len(imported), len(last_parcels)) self.assertEqual( models.Parcel.objects.get( parcel_number="55", section="YY", operation_id=ope1.pk ).external_id, "4200-59350-YY55", ) # cached_label update ope2 = models.Operation.objects.get(code_patriarche="4201") self.assertIsNotNone(ope2.cached_label) self.assertIn("LILLE", ope2.cached_label.upper()) # delete associated parcel with the import deletion parcel_count = models.Parcel.objects.count() impt.delete() self.assertEqual(parcel_count - 3, models.Parcel.objects.count()) def test_json_fields(self): importer, form = self.init_ope_import("operations-with-json-fields.csv") col = ImporterColumn.objects.create(importer_type=importer, col_number=11) formater_type = FormaterType.objects.get(formater_type="IntegerFormater") it = ImportTarget.objects.create( column=col, target="data__autre_refs__arbitraire", formater_type=formater_type, ) impt = form.save(self.ishtar_user) impt.initialize() self.init_ope_targetkey(imp=impt) impt.importation() ope1 = models.Operation.objects.get(code_patriarche="4200") self.assertEqual(ope1.data, {"autre_refs": {"arbitraire": 789}}) ope2 = models.Operation.objects.get(code_patriarche="4201") self.assertEqual(ope2.data, {"autre_refs": {"arbitraire": 456}}) # #4292: Import of new JSON fields erase all precedent JSON fields it.delete() col2 = ImporterColumn.objects.create(importer_type=importer, col_number=12) ImportTarget.objects.create( column=col2, target="data__autre", formater_type=formater_type ) impt = form.save(self.ishtar_user) impt.initialize() self.init_ope_targetkey(imp=impt) impt.importation() ope1 = models.Operation.objects.get(code_patriarche="4200") self.assertEqual(ope1.data, {"autre_refs": {"arbitraire": 789}, "autre": 666}) ope2 = models.Operation.objects.get(code_patriarche="4201") self.assertEqual(ope2.data, {"autre_refs": {"arbitraire": 456}, "autre": 333}) def test_regexp(self): importer, form = self.init_ope_import("operations-with-json-fields.csv") r, __ = Regexp.objects.get_or_create( name="Code INSEE", regexp="([0-9]{0,1}[0-9A-B]) *([0-9]{3})" ) col = ImporterColumn.objects.create( importer_type=importer, col_number=13, regexp_pre_filter=r ) formater_type = FormaterType.objects.get( formater_type="UnicodeFormater", options="" ) ImportTarget.objects.create( column=col, target="data__code_insee", formater_type=formater_type ) impt = form.save(self.ishtar_user) impt.initialize() self.init_ope_targetkey(imp=impt) impt.importation() ope1 = models.Operation.objects.get(code_patriarche="4200") self.assertEqual(ope1.data, {"code_insee": "45123"}) ope2 = models.Operation.objects.get(code_patriarche="4201") self.assertEqual(ope2.data, {"code_insee": "6456"}) class ImportDocumentTest(ImportTest, TestCase): fixtures = OPERATION_TOWNS_FIXTURES def init_doc_import(self, filename="document-example.csv"): model, __ = ImporterModel.objects.get_or_create( klass="ishtar_common.models.Document", defaults={"name": "Documentation"} ) doc_import, __ = ImporterType.objects.get_or_create( name="Doc import", slug="doc-import", associated_models=model ) col = ImporterColumn.objects.create( col_number=1, importer_type_id=doc_import.pk ) formater = FormaterType.objects.filter(formater_type="IntegerFormater").all()[0] ImportTarget.objects.create( target="import_get_next_index", formater_type_id=formater.pk, column_id=col.pk, ) col = ImporterColumn.objects.create( col_number=2, importer_type_id=doc_import.pk ) formater, __ = FormaterType.objects.get_or_create( formater_type="UnicodeFormater", options=None ) ImportTarget.objects.create( target="title", formater_type_id=formater.pk, column_id=col.pk ) doc_import_file = open( settings.ROOT_PATH + "../archaeological_operations/tests/" + filename, "rb" ) file_dict = { "imported_file": SimpleUploadedFile( doc_import_file.name, doc_import_file.read() ) } group, c = TargetKeyGroup.objects.get_or_create(name="My group") post_dict = { "importer_type": doc_import.pk, "skip_lines": 1, "encoding": "utf-8", "name": "init_ope_import", "associated_group": group.pk, "csv_sep": ",", } form = forms_common.NewImportForm( data=post_dict, files=file_dict, user=self.user ) form.is_valid() return doc_import, form def test_import_document(self): doc_nb = Document.objects.count() current_index = Document.get_next_index() - 1 importer, form = self.init_doc_import() self.assertTrue(form.is_valid()) impt = form.save(self.ishtar_user) impt.importation() self.assertEqual(doc_nb + 2, Document.objects.count()) self.assertEqual(current_index + 2, Document.get_next_index() - 1) class ImportStepByStepTest(ImportTest, TestCase): fixtures = OPERATION_TOWNS_FIXTURES def test_step_by_step_import(self): create_user() importer, form = self.init_ope_import() impt = form.save(self.ishtar_user) impt.initialize() self.init_ope_targetkey(imp=impt) c = Client() first_person_nb = Person.objects.count() first_ope_nb = models.Operation.objects.count() import_url = reverse( "import_step_by_step", kwargs={"pk": impt.pk, "line_number": 2} ) response = c.get(import_url) # no login redirect self.assertEqual(response.status_code, 302) self.assertRedirects(response, "/accounts/login/?next=" + import_url) c.login(username=self.username, password=self.password) response = c.get(import_url) self.assertEqual(response.status_code, 200) # verify pagination for next link is OK self.assertIn( 'href="/import-step-by-step/{}/3/"'.format(impt.pk), response.content.decode(), ) # creation have been evaluated self.assertIn( str(_("New objects will be created.")), response.content.decode("utf-8") ) # import this line posted = {"valid": "import"} response = c.post(import_url, posted) self.assertEqual(response.status_code, 302) # successful import - go to the next line new_import_url = reverse( "import_step_by_step", kwargs={"pk": impt.pk, "line_number": 3} ) self.assertRedirects(response, new_import_url) current_ope_nb = models.Operation.objects.count() self.assertEqual(current_ope_nb, first_ope_nb + 1) current_person_nb = Person.objects.count() self.assertEqual(current_person_nb, first_person_nb) # re-import posted = {"valid": "import"} c.post(import_url, posted) # no changes current_ope_nb = models.Operation.objects.count() self.assertEqual(current_ope_nb, first_ope_nb + 1) current_person_nb = Person.objects.count() self.assertEqual(current_person_nb, first_person_nb) # on import page, already imported is visible response = c.get(import_url) self.assertEqual(response.status_code, 200) self.assertIn( str(_("This line have been already imported.")), response.content.decode("utf-8"), ) # import next page next_import_url = reverse( "import_step_by_step", kwargs={"pk": impt.pk, "line_number": 3} ) posted = {"valid": "import"} c.post(next_import_url, posted) current_ope_nb = models.Operation.objects.count() self.assertEqual(current_ope_nb, first_ope_nb + 2) current_person_nb = Person.objects.count() self.assertEqual(current_person_nb, first_person_nb + 1) # modifiy CSV with an invalid date posted = { "valid": "change-csv", "col-1": "4201", "col-2": "Bourgogne", "col-3": "Fouille programmée", "col-4": "Oppìdum de Paris 2", "col-5": "L'opérateur", "col-6": "", "col-7": "2000/01/32", "col-8": "2002/12/31", "col-9": "Age du Fer", } response = c.post(import_url, posted) self.assertEqual(response.status_code, 200) # modification have been made in the source CSV with open(impt.imported_file.path) as fle: fle.readline() # header imported_line = fle.readline() self.assertIn("2000/01/32", imported_line) # error detected on the source file error = str( _( "The following error(s) has been encountered while parsing " "the source file:" ) ) self.assertIn(error, response.content.decode("utf-8")) class SerializationTest(GenericSerializationTest, TestCase): fixtures = COMMON_FIXTURES + WAREHOUSE_FIXTURES def setUp(self): self.username, self.password, self.user = create_superuser() operation = create_operation(self.user, values={"code_patriarche": "66666"}) ope2 = create_operation(self.user, values={"code_patriarche": "66667"}) self.operations = [operation, ope2] models.RecordRelations.objects.create( left_record=operation, right_record=ope2, relation_type=models.RelationType.objects.all()[0], ) site = models.ArchaeologicalSite.objects.create(reference="ref-site") operation.archaeological_sites.add(site) operation.top_sites.add(site) default = { "town": Town.objects.create(numero_insee="66666"), "section": "A", "parcel_number": "1", "operation": operation, } parcel = models.Parcel.objects.create(**default) models.ParcelOwner.objects.create( parcel=parcel, owner=Person.objects.create(), start_date=datetime.date.today(), end_date=datetime.date.today(), ) def test_serialization(self): res = self.generic_serialization_test(serializers.operation_serialization) ope_json = json.loads( res[("operations", "archaeological_operations__Operation")] ) self.assertEqual(len(ope_json), 2) result_queryset = models.Operation.objects.filter(code_patriarche="66666") res = self.generic_serialization_test( serializers.operation_serialization, no_test=True, kwargs={"operation_queryset": result_queryset}, ) ope_json = json.loads( res[("operations", "archaeological_operations__Operation")] ) self.assertEqual(len(ope_json), 1) site_json = json.loads( res[("operations", "archaeological_operations__ArchaeologicalSite")] ) self.assertEqual(len(site_json), 1) rel_json = json.loads( res[("operations", "archaeological_operations__RecordRelations")] ) self.assertEqual(len(rel_json), 0) result_queryset = models.Operation.objects.filter(code_patriarche="66667") res = self.generic_serialization_test( serializers.operation_serialization, no_test=True, kwargs={"operation_queryset": result_queryset}, ) ope_json = json.loads( res[("operations", "archaeological_operations__Operation")] ) self.assertEqual(len(ope_json), 1) site_json = json.loads( res[("operations", "archaeological_operations__ArchaeologicalSite")] ) self.assertEqual(len(site_json), 0) rel_json = json.loads( res[("operations", "archaeological_operations__RecordRelations")] ) self.assertEqual(len(rel_json), 0) def reinit_lock(self): for operation in self.operations: operation = models.Operation.objects.get(pk=operation.pk) operation.locked = False operation.lock_user = None operation.save() def test_lock(self): self.reinit_lock() self.generic_serialization_test( serializers.operation_serialization, no_test=True, kwargs={"put_locks": False}, ) for operation in self.operations: operation = models.Operation.objects.get(pk=operation.pk) self.assertFalse(operation.locked) self.assertIsNone(operation.lock_user) self.reinit_lock() self.generic_serialization_test( serializers.operation_serialization, no_test=True, kwargs={"put_locks": True}, ) for operation in self.operations: operation = models.Operation.objects.get(pk=operation.pk) self.assertTrue(operation.locked) self.assertIsNone(operation.lock_user) self.reinit_lock() self.generic_serialization_test( serializers.operation_serialization, no_test=True, kwargs={"put_locks": True, "lock_user": self.user}, ) for operation in self.operations: operation = models.Operation.objects.get(pk=operation.pk) self.assertTrue(operation.locked) self.assertEqual(operation.lock_user, self.user) def test_restore(self): current_number, zip_filename = self.generic_restore_test_genzip( serializers.OPERATION_MODEL_LIST, serializers.operation_serialization ) self.generic_restore_test( zip_filename, current_number, serializers.OPERATION_MODEL_LIST ) def test_unlock_on_restore(self): current_number, zip_filename = self.generic_restore_test_genzip( serializers.OPERATION_MODEL_LIST, serializers.operation_serialization, kwargs={"put_locks": True, "lock_user": self.user}, ) self.generic_restore_test( zip_filename, current_number, serializers.OPERATION_MODEL_LIST, delete_existing=False, ) for operation in self.operations: operation = models.Operation.objects.get( code_patriarche=operation.code_patriarche ) self.assertTrue(operation.locked) self.assertEqual(operation.lock_user, self.user) self.generic_restore_test( zip_filename, current_number, serializers.OPERATION_MODEL_LIST, delete_existing=False, release_locks=True, ) for operation in self.operations: operation = models.Operation.objects.get( code_patriarche=operation.code_patriarche ) self.assertFalse(operation.locked) self.assertIsNone(operation.lock_user) def test_historization_on_restore(self): current_number, zip_filename = self.generic_restore_test_genzip( serializers.OPERATION_MODEL_LIST, serializers.operation_serialization ) operation = models.Operation.objects.get(code_patriarche="66666") version_nb = operation.history.count() restore_serialized(zip_filename, delete_existing=False) operation = models.Operation.objects.get(code_patriarche="66666") self.assertEqual(operation.history_creator, self.user) self.assertEqual(operation.history_modifier, self.user) # no new version self.assertEqual(operation.history.count(), version_nb) __, __, new_user = create_user() restore_serialized(zip_filename, delete_existing=False, user=new_user) operation = models.Operation.objects.get(code_patriarche="66666") self.assertEqual(operation.history_creator, self.user) self.assertEqual(operation.history_modifier, new_user) self.assertEqual(operation.history.count(), version_nb + 1) restore_serialized(zip_filename, delete_existing=True, user=new_user) operation = models.Operation.objects.get(code_patriarche="66666") self.assertEqual(operation.history_creator, new_user) self.assertEqual(operation.history_modifier, new_user) self.assertEqual(operation.history.count(), 1) class ParcelTest(ImportTest, TestCase): fixtures = OPERATION_TOWNS_FIXTURES def test_parse_parcels(self): # the database needs to be initialised before importing from archaeological_operations.utils import parse_parcels # default_town = Town.objects.create(numero_insee="12345", # name="default_town") test_values = ( ( "1996 : XT:53,54,56,57,59,60,61,62", { 1996: [ ("XT", "53"), ("XT", "54"), ("XT", "56"), ("XT", "57"), ("XT", "59"), ("XT", "60"), ("XT", "61"), ("XT", "62"), ] }, ), ("AD:23", {None: [("AD", "23")]}), ("1961 :B1:227;", {1961: [("B1", "227")]}), ( "1982 CV:35;CV:36", { 1982: [ ("CV", "35"), ("CV", "36"), ] }, ), ( "E:24;E:25", { None: [ ("E", "24"), ("E", "25"), ] }, ), ( "B : 375, 376, 386, 387, 645, 646 / C : 412 à 415, 432 à 435, " "622 / F : 120, 149, 150, 284, 287, 321 à 323", { None: [ ("B", "375"), ("B", "376"), ("B", "386"), ("B", "387"), ("B", "645"), ("B", "646"), ("C", "412"), ("C", "413"), ("C", "414"), ("C", "415"), ("C", "432"), ("C", "433"), ("C", "434"), ("C", "435"), ("C", "622"), ("F", "120"), ("F", "149"), ("F", "150"), ("F", "284"), ("F", "287"), ("F", "321"), ("F", "322"), ("F", "323"), ] }, ), ( "AD : 95, 96, 86, 87, 81, 252, AE : 58, AD : 115 à 132", { None: [ ("AD", "95"), ("AD", "96"), ("AD", "86"), ("AD", "87"), ("AD", "81"), ("AD", "252"), ("AD", "115"), ("AD", "116"), ("AD", "117"), ("AD", "118"), ("AD", "119"), ("AD", "120"), ("AD", "121"), ("AD", "122"), ("AD", "123"), ("AD", "124"), ("AD", "125"), ("AD", "126"), ("AD", "127"), ("AD", "128"), ("AD", "129"), ("AD", "130"), ("AD", "131"), ("AD", "132"), ("AE", "58"), ] }, ), ( "XD:1 à 13, 24 à 28, 33 à 39, 50 à 52, 80, 83, 84 à 86, 259 à " "261, 182, 225 ; XH:5 ; P:1640, 1888, 1889, 1890 ; R:1311, " "1312, 1314,1342, 1343, 1559 à 1569", { None: [ ("XD", "1"), ("XD", "2"), ("XD", "3"), ("XD", "4"), ("XD", "5"), ("XD", "6"), ("XD", "7"), ("XD", "8"), ("XD", "9"), ("XD", "10"), ("XD", "11"), ("XD", "12"), ("XD", "13"), ("XD", "24"), ("XD", "25"), ("XD", "26"), ("XD", "27"), ("XD", "28"), ("XD", "33"), ("XD", "34"), ("XD", "35"), ("XD", "36"), ("XD", "37"), ("XD", "38"), ("XD", "39"), ("XD", "50"), ("XD", "51"), ("XD", "52"), ("XD", "80"), ("XD", "83"), ("XD", "84"), ("XD", "85"), ("XD", "86"), ("XD", "259"), ("XD", "260"), ("XD", "261"), ("XD", "182"), ("XD", "225"), ("XH", "5"), ("P", "1640"), ("P", "1888"), ("P", "1889"), ("P", "1890"), ("R", "1311"), ("R", "1312"), ("R", "1314"), ("R", "1342"), ("R", "1343"), ("R", "1559"), ("R", "1560"), ("R", "1561"), ("R", "1562"), ("R", "1563"), ("R", "1564"), ("R", "1565"), ("R", "1566"), ("R", "1567"), ("R", "1568"), ("R", "1569"), ] }, ), ( "BZ:2 à 5, 365 ; CD:88 à 104, 106, 108, 326", { None: [ ("BZ", "2"), ("BZ", "3"), ("BZ", "4"), ("BZ", "5"), ("BZ", "365"), ("CD", "88"), ("CD", "89"), ("CD", "90"), ("CD", "91"), ("CD", "92"), ("CD", "93"), ("CD", "94"), ("CD", "95"), ("CD", "96"), ("CD", "97"), ("CD", "98"), ("CD", "99"), ("CD", "100"), ("CD", "101"), ("CD", "102"), ("CD", "103"), ("CD", "104"), ("CD", "106"), ("CD", "326"), ("CD", "108"), ] }, ), ( "AV 118 à 125, 127, 132 à 137, 153, 398p, 399, 402; BI 27, 30, " "32, 33, 188, 255, 256 à 258, 260, 284p, 294; BL 297", { None: [ ("AV", "118"), ("AV", "119"), ("AV", "120"), ("AV", "121"), ("AV", "122"), ("AV", "123"), ("AV", "124"), ("AV", "125"), ("AV", "127"), ("AV", "132"), ("AV", "133"), ("AV", "134"), ("AV", "135"), ("AV", "136"), ("AV", "137"), ("AV", "153"), ("AV", "398p"), ("AV", "399"), ("AV", "402"), ("BI", "27"), ("BI", "30"), ("BI", "32"), ("BI", "33"), ("BI", "188"), ("BI", "255"), ("BI", "256"), ("BI", "257"), ("BI", "258"), ("BI", "260"), ("BI", "284p"), ("BI", "294"), ("BL", "297"), ] }, ), ( "A : 904 à 906, 911 ; E:40, 41", { None: [ ("A", "904"), ("A", "905"), ("A", "906"), ("A", "911"), ("E", "40"), ("E", "41"), ] }, ), ( "1991 : BE:8, 12", { "1991": [ ("BE", "8"), ("BE", "12"), ] }, ), ( "1979 : EM:1", {"1979": [("EM", "1")]}, ), ( "B:448;B:449;B:450;B:451;B:452;B:455;B:456;B:457;B:458;B:459;" "B:1486;", { None: [ ("B", "448"), ("B", "449"), ("B", "450"), ("B", "451"), ("B", "452"), ("B", "455"), ("B", "456"), ("B", "457"), ("B", "458"), ("B", "459"), ("B", "1486"), ] }, ), ( "AC : 72 à 81, 91 à 100, 197 / ZC:180 à 189", { None: [ ("AC", "72"), ("AC", "73"), ("AC", "74"), ("AC", "75"), ("AC", "76"), ("AC", "77"), ("AC", "78"), ("AC", "79"), ("AC", "80"), ("AC", "81"), ("AC", "91"), ("AC", "92"), ("AC", "93"), ("AC", "94"), ("AC", "95"), ("AC", "96"), ("AC", "97"), ("AC", "98"), ("AC", "99"), ("AC", "100"), ("AC", "197"), ("ZC", "180"), ("ZC", "181"), ("ZC", "182"), ("ZC", "183"), ("ZC", "184"), ("ZC", "185"), ("ZC", "186"), ("ZC", "187"), ("ZC", "188"), ("ZC", "189"), ] }, ), ( "AB 37 et 308", { None: [ ("AB", "37"), ("AB", "308"), ] }, ), ( "1983 D2 n° 458 et 459", { "1983": [ ("D2", "458"), ("D2", "459"), ] }, ), ( "ZS : 21p, 66", { None: [ ("ZS", "21p"), ("ZS", "66"), ] }, ), ( "VV:166, 167, domaine public", { None: [ ("VV", "166"), ("VV", "167"), ] }, ), ( " AS:13 à 15, 17 à 19, 21 à 32, 34 à 45, 47 à 53, 69, 70, 82, " "84 / CK:1, 24, 25, 29, 30, 37 à 43", { None: [ ("AS", "13"), ("AS", "14"), ("AS", "15"), ("AS", "17"), ("AS", "18"), ("AS", "19"), ("AS", "21"), ("AS", "22"), ("AS", "23"), ("AS", "24"), ("AS", "25"), ("AS", "26"), ("AS", "27"), ("AS", "28"), ("AS", "29"), ("AS", "30"), ("AS", "31"), ("AS", "32"), ("AS", "34"), ("AS", "35"), ("AS", "36"), ("AS", "37"), ("AS", "38"), ("AS", "39"), ("AS", "40"), ("AS", "41"), ("AS", "42"), ("AS", "43"), ("AS", "44"), ("AS", "45"), ("AS", "47"), ("AS", "48"), ("AS", "49"), ("AS", "50"), ("AS", "51"), ("AS", "52"), ("AS", "53"), ("AS", "69"), ("AS", "70"), ("AS", "82"), ("AS", "84"), ("CK", "1"), ("CK", "24"), ("CK", "25"), ("CK", "29"), ("CK", "30"), ("CK", "37"), ("CK", "38"), ("CK", "39"), ("CK", "40"), ("CK", "41"), ("CK", "42"), ("CK", "43"), ] }, ), ( " ZN:37, 15, 35, 28, 29 / ZM:9, 73", { None: [ ("ZN", "37"), ("ZN", "15"), ("ZN", "35"), ("ZN", "28"), ("ZN", "29"), ("ZM", "9"), ("ZM", "73"), ] }, ), ( " Tranche n°1 : YP:243, 12, 14 à 16, 18 à 26, DP / Tranche n°2 :" "YP:17, 307, 27, 308, 44 à 46, 683, BM:1, 250, 488 à 492", { None: [ ("YP", "243"), ("YP", "12"), ("YP", "14"), ("YP", "15"), ("YP", "16"), ("YP", "18"), ("YP", "19"), ("YP", "20"), ("YP", "21"), ("YP", "22"), ("YP", "23"), ("YP", "24"), ("YP", "25"), ("YP", "26"), ("YP", "17"), ("YP", "27"), ("YP", "308"), ("YP", "44"), ("YP", "45"), ("YP", "46"), ("YP", "683"), ("YP", "307"), ("BM", "1"), ("BM", "250"), ("BM", "488"), ("BM", "489"), ("BM", "490"), ("BM", "491"), ("BM", "492"), ] }, ), ( " H : 106, 156, 158", { None: [ ("H", "106"), ("H", "156"), ("H", "158"), ] }, ), ( "Section YO : parcelles n° 19; 20", { None: [ ("YO", "19"), ("YO", "20"), ] }, ), ( "1991 :AI:23;19;20;21;22;181;AM:116;214;215;233;235", { "1991": [ ("AI", "19"), ("AI", "20"), ("AI", "21"), ("AI", "22"), ("AI", "23"), ("AI", "181"), ("AM", "116"), ("AM", "214"), ("AM", "215"), ("AM", "233"), ("AM", "235"), ] }, ), ) # ),("Domaine public", {} # ),("Tranche 1 : AV:4 à 6, 18, 80, 104 / partiellement : 5 et 18", {} # ),(" 1987 : ZD: ?", {} # ),("A:26a, 26b, 27 / AB:95 / AK:4, 12, 20", {} for value, result in test_values: parcels = parse_parcels(value) if not parcels and not result: continue self.assertTrue(parcels != [], msg='No parcel parsed for "%s"' % value) parcels_copy = parcels[:] for year in result.keys(): for values in parcels_copy: if values["year"] != year and values["year"] != str(year): continue self.assertTrue( (values["section"], values["parcel_number"]) in result[year], msg='Section - Parcel number: "%s - %s" is not ' 'in "%s"' % ( values["section"], values["parcel_number"], str(result[year]), ), ) parcels.pop(parcels.index(values)) result[year].pop( result[year].index((values["section"], values["parcel_number"])) ) # all parcels have been imported self.assertEqual( parcels, [], msg='Parcel(s): "%s" haven\'t be ' 'recognized in "%s"' % (str(parcels), value), ) not_imported = [data for data in result.values() if data] self.assertEqual( not_imported, [], msg='Parcel(s): "%s" haven\'t be ' 'recognized in "%s"' % (str(not_imported), value), ) def create_orga(user): orga_type, created = OrganizationType.objects.get_or_create(txt_idx="operator") orga, created = Organization.objects.get_or_create( name="Operator", organization_type=orga_type, history_modifier=user ) return orga def create_operation(user, orga=None, values=None): operation_type = models.OperationType.objects.get(txt_idx="arch_diagnostic") if not values: values = {} dct = { "year": 2010, "operation_type_id": operation_type.pk, "history_modifier": user, } dct.update(values) if orga: dct["operator"] = orga if "code_patriarche" not in dct: idx = 1 while models.Operation.objects.filter(code_patriarche=str(idx)).count(): idx += 1 dct["code_patriarche"] = str(idx) return models.Operation.objects.create(**dct) class OperationInitTest(object): def create_user(self): username, password, self.user = create_user() return self.user def get_default_user(self): q = User.objects.filter(is_superuser=False) if q.count(): return q.all()[0] return self.create_user() def create_orgas(self, user=None): if not user: user = self.get_default_user() self.orgas = [create_orga(user)] return self.orgas def get_default_orga(self, user=None): if not hasattr(self, "orgas") or not self.orgas: self.create_orgas(user) return self.orgas[0] def create_towns(self, datas=None): if not datas: datas = {} default = {"numero_insee": "12345", "name": "default_town"} default.update(datas) town = models.Town.objects.create(**default) if not hasattr(self, "towns") or not self.towns: self.towns = [] self.towns.append(town) return self.towns def get_default_town(self): towns = getattr(self, "towns", None) if not towns: self.create_towns() return self.towns[0] def create_parcel(self, data=None): default = { "town": self.get_default_town(), "section": "A", "parcel_number": "1", } if not hasattr(self, "operations"): self.create_operation() default["operation"] = self.operations[0] if data: default.update(data) if not getattr(self, "parcels", None): self.parcels = [] self.parcels.append(models.Parcel.objects.create(**default)) return self.parcels def get_default_parcel(self, force=False): if force: return self.create_parcel()[-1] parcel = self.create_parcel()[0] if models.Parcel.objects.filter(pk=parcel.pk).count(): return parcel self.parcels.pop(0) return self.create_operation()[-1] def create_operation(self, user=None, orga=None): if not orga: self.get_default_orga(user) if not user: self.get_default_user() if not getattr(self, "operations", None): self.operations = [] self.operations.append(create_operation(user, orga)) return self.operations def get_default_operation(self, force=False, user=None): if force: return self.create_operation(user=user)[-1] ope = self.create_operation(user=user)[0] if models.Operation.objects.filter(pk=ope.pk).count(): return ope self.operations.pop(0) return self.create_operation(user=user)[-1] def tearDown(self): # cleanup for further test if hasattr(self, "user"): try: self.user.delete() except: pass self.user = None # all try/except is necessary for bad migrations on main... # should be removed at the next big version if hasattr(self, "operations"): for ope in self.operations: try: ope.delete() except: pass self.operations = [] if hasattr(self, "parcels"): for p in self.parcels: try: p.delete() except: pass self.parcels = [] class OperationTest(TestCase, OperationInitTest): fixtures = FILE_FIXTURES def setUp(self): IshtarSiteProfile.objects.get_or_create(slug="default", active=True) self.username, self.password, self.user = create_superuser() self.alt_username, self.alt_password, self.alt_user = create_user() self.alt_user.user_permissions.add( Permission.objects.get(codename="view_own_operation") ) self.orgas = self.create_orgas(self.user) self.operations = self.create_operation(self.user, self.orgas[0]) self.operations += self.create_operation(self.alt_user, self.orgas[0]) self.item = self.operations[0] for idx in range(15): ContextRecord.objects.create(label="CR-{}".format(idx), operation=self.item) def test_external_id(self): self.item.code_patriarche = "123456789" self.item.save() parcel = self.get_default_parcel() parcel.operation = self.item parcel.save() correct_ext_id = "{}-{}-{}{}".format( self.item.code_patriarche, parcel.town.numero_insee, parcel.section, parcel.parcel_number, ) self.assertEqual(parcel.external_id, correct_ext_id) # auto has been previously set parcel.external_id = "blabla" parcel.save() self.assertEqual(parcel.external_id, correct_ext_id) # deactivate auto parcel.auto_external_id = False parcel.external_id = "blabla" parcel.save() self.assertEqual(parcel.external_id, "blabla") def test_complete_identifier(self): profile = get_current_profile() profile.operation_complete_identifier = ( "{code_patriarche}-{towns__numero_insee}" ) profile.save() self.item = models.Operation.objects.get(pk=self.item.pk) t = Town.objects.create(numero_insee="12345", name="OK town") self.item.towns.add(t) self.item = models.Operation.objects.get(pk=self.item.pk) self.item.code_patriarche = "123456789" self.item.year = 2020 self.item.save() self.item = models.Operation.objects.get(pk=self.item.pk) self.assertEqual( self.item.complete_identifier, "{}-{}".format(self.item.code_patriarche, t.numero_insee), ) profile.operation_complete_identifier = "{year}-{towns__numero_insee}" profile.save() self.item.save() self.item = models.Operation.objects.get(pk=self.item.pk) self.assertEqual( self.item.complete_identifier, "{}-{}".format(self.item.year, t.numero_insee), ) def test_associated(self): scientist = Person.objects.create(name="C-3PO") self.item.scientist = scientist self.item.save() scientist = Person.objects.get(pk=scientist.pk) self.assertIn( PersonType.objects.get(txt_idx="head_scientist"), scientist.person_types.all(), ) # do not change if in the list sra = Person.objects.create(name="R2D2") sra.person_types.add(PersonType.objects.get(txt_idx="sra_agent")) self.item.scientist = sra self.item.save() self.assertNotIn( PersonType.objects.get(txt_idx="head_scientist"), sra.person_types.all() ) def create_relations(self): rel1 = models.RelationType.objects.create( symmetrical=True, label="Include", txt_idx="include" ) rel2 = models.RelationType.objects.create( symmetrical=False, label="Included", txt_idx="included", inverse_relation=rel1, ) models.RecordRelations.objects.create( left_record=self.operations[0], right_record=self.operations[1], relation_type=rel1, ) return rel1, rel2 def testPostDeleteRelations(self): self.create_relations() self.operations[0].delete() def testPostDeleteParcels(self): ope = self.operations[0] town = Town.objects.create(name="plouf", numero_insee="20000") parcel = models.Parcel.objects.create(town=town) parcel_nb = models.Parcel.objects.count() ope.parcels.add(parcel) ope.delete() # our parcel has no operation attached and... is not more deleted self.assertEqual(parcel_nb, models.Parcel.objects.count()) ope = self.operations[1] parcel = models.Parcel.objects.create(town=town) parcel_nb = models.Parcel.objects.count() ope.parcels.add(parcel) ope.parcels.clear() # no signal raised... should resave models.Parcel.objects.filter(pk=parcel.pk).all()[0].save() # parcel is no longer automatically delete on detach self.assertEqual(parcel_nb, models.Parcel.objects.count()) def testIndex(self): ope = create_operation(self.user, values={"year": 2042}) self.assertEqual(ope.operation_code, 1) ope2 = create_operation(self.user, values={"year": 2042}) self.assertEqual(ope2.operation_code, 2) ope = create_operation(self.user, values={"year": 0}) self.assertEqual(ope.operation_code, 1) ope2 = create_operation(self.user, values={"year": 0}) self.assertEqual(ope2.operation_code, 2) def test_cache_update(self): self.create_towns() operation = self.operations[0] self.assertIsNotNone(operation.cached_label) town, ope_id = operation.cached_label.split(" | ") self.assertIn(town, ("Intercommunal", "Multi-town")) self.assertEqual(ope_id, "OA1 - OP2010-1") operation = models.Operation.objects.get(pk=operation.pk) operation.year = 2011 operation.save() operation.towns.add(self.towns[0]) operation = models.Operation.objects.get(pk=operation.pk) self.assertIsNotNone(operation.cached_label) town, ope_id = operation.cached_label.split(" | ") self.assertEqual(ope_id, "OA1 - OP2011-1") self.assertEqual(town, self.towns[0].name) def test_search_vector_update(self): operation = self.operations[0] town = self.create_towns({"numero_insee": "12346", "name": "Daisy"})[-1] operation.towns.add(town) town = self.create_towns({"numero_insee": "12347", "name": "Dirty old"})[-1] operation.towns.add(town) operation = models.Operation.objects.get(pk=operation.pk) operation.comment = "Zardoz" profile = get_current_profile() profile.operation_region_code = "42" profile.save() operation.code_patriarche = "42HUIAAA5" operation.save() self.assertIsNotNone(operation.search_vector) for key in ( "old", "dirty", "daisy", "'2010'", "zardoz", "huiaaa5", "{}42huiaaa5".format(profile.operation_prefix.lower()), "42huiaaa5", ): self.assertIn(key, operation.search_vector) def test_cache_bulk_update(self): operation = self.operations[0] init_parcel = self.create_parcel()[0] operation.parcels.add(init_parcel) from archaeological_context_records.models import ContextRecord cr_data = { "label": "Context record", "operation": operation, "parcel": init_parcel, "history_modifier": self.get_default_user(), } cr = ContextRecord.objects.create(**cr_data) from archaeological_finds.models import BaseFind, Find, MaterialType bf_data = { "label": "Base find", "history_modifier": self.get_default_user(), "context_record": cr, } base_find = BaseFind.objects.create(**bf_data) find = Find.objects.create( history_modifier=self.get_default_user(), label="Find me" ) find.base_finds.add(base_find) mat = MaterialType.objects.create( label="Adamentium", txt_idx="admentium", code="ADA" ) find.material_types.add(mat) class TestObj(object): def __init__(self): self.context_record_reached = [] def reached(self, sender, **kwargs): instance = kwargs.get("instance") if sender == ContextRecord: self.context_record_reached.append(instance) test_obj = TestObj() operation = models.Operation.objects.get(pk=operation.pk) operation.test_obj = test_obj operation.year = 2011 operation.save() # bulk update of context records cached label gen don't have to be # reached self.assertEqual(len(test_obj.context_record_reached), 0) # verify the relevance of the update cr = ContextRecord.objects.get(pk=cr.pk) self.assertIsNotNone(cr.cached_label) ope_id, parcel_sec, parcel_nb, cr_label = cr.cached_label.split(" | ") profile = get_current_profile() self.assertEqual(ope_id, "OA1") base_find = BaseFind.objects.get(pk=base_find.pk) op_code, idx = base_find.cache_short_id.split(" | ") self.assertEqual(op_code, "OA1") self.assertEqual(idx, "00001") self.assertIsNotNone(base_find.cache_complete_id) op_code, mat_code, lbl, idx = base_find.cache_complete_id.split(" | ") self.assertEqual(op_code, "OA1") self.assertEqual(mat_code, "ADA") self.assertEqual(lbl, "Context record") self.assertEqual(idx, "00001") find = Find.objects.get(pk=find.pk) self.assertIsNotNone(find.cached_label) op_code_idx, lbl = find.cached_label.split(" | ") self.assertEqual(op_code_idx, "OA1-00001") self.assertEqual(lbl, "Find me") operation = models.Operation.objects.get(pk=operation.pk) operation.code_patriarche = "666" operation.save() cr = ContextRecord.objects.get(pk=cr.pk) self.assertIsNotNone(cr.cached_label) ope_id, parcel_sec, parcel_nb, cr_label = cr.cached_label.split(" | ") self.assertEqual(ope_id, "{}666".format(profile.operation_prefix)) base_find = BaseFind.objects.get(pk=base_find.pk) self.assertIsNotNone(base_find.cache_short_id) op_code, idx = base_find.cache_short_id.split(" | ") self.assertEqual(op_code, "OA666") self.assertIsNotNone(base_find.cache_complete_id) op_code, mat_code, lbl, idx = base_find.cache_complete_id.split(" | ") self.assertEqual(op_code, "OA666") find = Find.objects.get(pk=find.pk) self.assertIsNotNone(find.cached_label) op_code_idx, lbl = find.cached_label.split(" | ") self.assertEqual(op_code_idx, "OA666-00001") def test_show(self): operation = self.operations[0] source = models.Document.objects.create( title="Source title", source_type=models.SourceType.objects.all()[0] ) operation.documents.add(source) c = Client() response = c.get(reverse("show-operation", kwargs={"pk": operation.pk})) self.assertEqual(response.status_code, 200) # empty content when not allowed self.assertEqual(response.content, b"") response = c.get(reverse("show-document", kwargs={"pk": source.pk})) self.assertRedirects(response, "/") c.login(username=self.username, password=self.password) response = c.get(reverse("show-operation", kwargs={"pk": operation.pk})) self.assertEqual(response.status_code, 200) self.assertIn(b'class="card sheet"', response.content) response = c.get(reverse("show-document", kwargs={"pk": source.pk})) self.assertEqual(response.status_code, 200) self.assertIn(b'class="card sheet"', response.content) def test_show_history(self): operation = self.operations[-1] c = Client() operation = models.Operation.objects.get(id=operation.id) operation._force_history = True operation.common_name = "hophop" operation.history_modifier = self.user operation.save() c.login(username=self.username, password=self.password) response = c.get(reverse("show-operation", kwargs={"pk": operation.pk})) self.assertEqual(response.status_code, 200) self.assertIn(b'class="card sheet"', response.content) self.assertIn(b"/show-historized-operation/", response.content) c.login(username=self.alt_username, password=self.alt_password) response = c.get(reverse("show-operation", kwargs={"pk": operation.pk})) self.assertEqual(response.status_code, 200) self.assertIn(b'class="card sheet"', response.content) self.assertNotIn(b"/show-historized-operation/", response.content) def test_show_pdf(self): operation = self.operations[0] c = Client() response = c.get( reverse("show-operation", kwargs={"pk": operation.pk, "type": "pdf"}) ) self.assertEqual(response.status_code, 200) # empty content when not allowed self.assertEqual(response.content, b"") c.login(username=self.username, password=self.password) response = c.get( reverse("show-operation", kwargs={"pk": operation.pk, "type": "pdf"}) ) self.assertEqual(response.status_code, 200) f = BytesIO(response.content) filetype = ( Popen("/usr/bin/file -b --mime -", shell=True, stdout=PIPE, stdin=PIPE) .communicate(f.read(1024))[0] .strip() ) self.assertTrue(filetype.startswith(b"application/pdf")) def test_show_odt(self): locale.setlocale(locale.LC_ALL, "fr_FR.UTF-8") operation = self.operations[0] c = Client() response = c.get( reverse("show-operation", kwargs={"pk": operation.pk, "type": "odt"}) ) self.assertEqual(response.status_code, 200) # empty content when not allowed self.assertEqual(response.content, b"") c.login(username=self.username, password=self.password) response = c.get( reverse("show-operation", kwargs={"pk": operation.pk, "type": "odt"}) ) self.assertEqual(response.status_code, 200) f = BytesIO(response.content) z = zipfile.ZipFile(f) self.assertIsNone(z.testzip()) filename = None for name in z.namelist(): if name == "content.xml": filename = name break self.assertIsNotNone(filename) tmpdir = tempfile.mkdtemp(prefix="tmp-ishtar-") imported_file = z.extract(filename, tmpdir) with open(imported_file) as content_file: content = content_file.read() # check that all context records have been exported #4078 # for idx in range(15): # self.assertIn('CR-{}'.format(idx), content) def test_json(self): operation = self.operations[0] operation.data = { "groundhog": { "number": 53444, "awake_state": "réveillée", "with_feather": "Oui", }, "frog_number": 32303, } operation.save() content_type = ContentType.objects.get_for_model(operation) groundhog_section = JsonDataSection.objects.create( name="Marmotte", content_type=content_type ) JsonDataField.objects.create( name="État d'éveil", key="groundhog__awake_state", content_type=content_type, section=groundhog_section, ) JsonDataField.objects.create( name="Avec plume", key="groundhog__with_feather", content_type=content_type, section=groundhog_section, ) JsonDataField.objects.create( name="Zzzzzzzz", key="groundhog__zzz", content_type=content_type, section=groundhog_section, ) JsonDataField.objects.create( name="Grenouille", key="frog_number", content_type=content_type ) c = Client() c.login(username=self.username, password=self.password) response = c.get(reverse("show-operation", kwargs={"pk": operation.pk})) self.assertEqual(response.status_code, 200) content = response.content.decode() self.assertIn('class="card sheet"', content) self.assertIn("Marmotte", content) self.assertIn("État d'éveil", content) self.assertIn("réveillée", content) self.assertIn("Grenouille", content) self.assertIn(">32303<", content) self.assertNotIn(">53444<", content) self.assertNotIn("Zzzzzzzz", content) operation.data = {} operation.save() response = c.get(reverse("show-operation", kwargs={"pk": operation.pk})) self.assertEqual(response.status_code, 200) self.assertIn(b'class="card sheet"', response.content) self.assertNotIn(b"Marmotte", response.content) def test_json_search_vector_update(self): operation = self.operations[0] content_type = ContentType.objects.get_for_model(operation) JsonDataField.objects.create( name="Nom de marmotte", key="groundhog__name", content_type=content_type, search_index=True, ) JsonDataField.objects.create( name="Numéro grenouille", key="frog_number", content_type=content_type ) operation = models.Operation.objects.get(pk=operation.pk) operation.data = { "groundhog": {"name": "La Marmotte héhé", "color": "Red"}, "frog_number": 32303, } operation.save() operation = models.Operation.objects.get(pk=operation.pk) self.assertIsNotNone(operation.search_vector) for key in ("marmott",): self.assertIn(key, operation.search_vector) for key in ("32303", "red", "Red"): self.assertNotIn(key, operation.search_vector) def test_document(self): operation = self.operations[0] q = Document.objects.values("index").order_by("-index") if q.count(): c_index = q.all()[0]["index"] else: c_index = 0 doc = Document.objects.create(title="Image!") operation.documents.add(doc) operation.save() doc = Document.objects.get(pk=doc.pk) self.assertEqual(c_index + 1, doc.index) self.assertEqual(doc.external_id, str(doc.index)) doc2 = Document.objects.create(title="Image2!") operation.documents.add(doc2) doc2 = Document.objects.get(pk=doc2.pk) self.assertEqual(c_index + 2, doc2.index) def test_get_containers(self): Find = apps.get_model("archaeological_finds", "Find") BaseFind = apps.get_model("archaeological_finds", "BaseFind") Warehouse = apps.get_model("archaeological_warehouse", "Warehouse") WarehouseType = apps.get_model("archaeological_warehouse", "WarehouseType") Container = apps.get_model("archaeological_warehouse", "Container") ContainerType = apps.get_model("archaeological_warehouse", "ContainerType") operation = self.operations[0] hc, __ = Unit.objects.get_or_create(txt_idx="not-in-context", order=10) cr = ContextRecord.objects.create(operation=operation, unit=hc) bf = BaseFind.objects.create(context_record=cr) f = Find.objects.create() f.base_finds.add(bf) wt = WarehouseType.objects.create(label="WT") w = Warehouse.objects.create(name="Warehouse", warehouse_type=wt) ct = ContainerType.objects.create(label="CT") c = Container.objects.create(reference="Test", location=w, container_type=ct) f.container = c f.save() values = operation.get_containers_values([], []) self.assertEqual(len(values), 1) class LockTest(TestCase, OperationInitTest): fixtures = FILE_FIXTURES def setUp(self): IshtarSiteProfile.objects.get_or_create(slug="default", active=True) self.username, self.password, self.user = create_superuser() self.orgas = self.create_orgas(self.user) self.operations = self.create_operation(self.user, self.orgas[0]) self.client = Client() self.client.login(username=self.username, password=self.password) self.operation = self.operations[0] self.operation.locked = True self.operation.save() def test_wizard_lock(self): cls_wiz = OperationWizardModifTest() url = reverse(cls_wiz.url_name) # first wizard step step = "selec-operation_modification" response = cls_wiz.wizard_post( self.client, url, step, {"pk": self.operation.pk} ) msg = str(_("This item is locked for edition.")).replace("'", "'") self.assertIn( msg, response.content.decode(), msg="wizard lock for edition not effective" ) def test_qa_lock(self): url = reverse("operation-qa-bulk-update", args=[self.operation.pk]) response = self.client.get(url) self.assertRedirects(response, reverse("qa-not-available", args=["locked"])) def test_sheet_lock(self): url = reverse("show-operation", kwargs={"pk": self.operation.pk}) response = self.client.get(url) msg = str(_("This item has been locked. Edition is disabled.")).replace( "'", "'" ) self.assertIn(msg, response.content.decode(), msg="lock not displayed on sheet") class CustomFormTest(TestCase, OperationInitTest): fixtures = FILE_FIXTURES def setUp(self): IshtarSiteProfile.objects.get_or_create(slug="default", active=True) self.username, self.password, self.user = create_superuser() self.alt_username, self.alt_password, self.alt_user = create_user() self.alt_user.user_permissions.add( Permission.objects.get(codename="view_own_operation") ) self.orgas = self.create_orgas(self.user) self.operations = self.create_operation(self.user, self.orgas[0]) self.operations += self.create_operation(self.alt_user, self.orgas[0]) self.item = self.operations[0] def test_filters(self): c = Client() c.login(username=self.username, password=self.password) cls_wiz = OperationWizardModifTest() url = reverse(cls_wiz.url_name) # first wizard step step = "selec-operation_modification" cls_wiz.wizard_post(c, url, step, {"pk": self.operations[0].pk}) step = "general-operation_modification" data = { "{}{}-current_step".format(cls_wiz.url_name, cls_wiz.wizard_name): [step], } MSG_FOUND = ( " - '{}' field found on the modification " "wizard. It should have been filtered." ) MSG_NOT_FOUND = ( " - '{}' field not found on the modification " "wizard. It shouldn't have been filtered." ) key_in_charge = "in_charge" response = c.post(url, data) content = response.content.decode() res = key_in_charge in content self.assertTrue(res, msg="filter all" + MSG_NOT_FOUND.format(key_in_charge)) f = CustomForm.objects.create( name="Test - all", form="operation-010-general", available=True, apply_to_all=True, ) ExcludedField.objects.create(custom_form=f, field="in_charge") response = c.post(url, data) content = response.content.decode() res = key_in_charge not in content self.assertTrue(res, msg="filter all" + MSG_FOUND.format(key_in_charge)) # user type form prevail on "all" f_scientist = CustomForm.objects.create( name="Test - user type", form="operation-010-general", available=True ) tpe = PersonType.objects.get(txt_idx="head_scientist") key_address = "address" f_scientist.user_types.add(tpe) self.user.ishtaruser.person.person_types.add(tpe) ExcludedField.objects.create(custom_form=f_scientist, field=key_address) response = c.post(url, data) content = response.content.decode() res = key_in_charge in content self.assertTrue( res, msg="filter profile type" + MSG_NOT_FOUND.format(key_in_charge) ) res = key_address not in content self.assertTrue(res, msg="filter profile type" + MSG_FOUND.format(key_address)) # profile type form prevail on "all" and "user types" f_scientist2 = CustomForm.objects.create( name="Test - profile type", form="operation-010-general", available=True ) key_scientific = "scientific_documentation_comment" ExcludedField.objects.create(custom_form=f_scientist2, field=key_scientific) collaborator = ProfileType.objects.get(txt_idx="collaborator") UserProfile.objects.create( profile_type=collaborator, person=self.user.ishtaruser.person, current=True ) f_scientist2.profile_types.add(collaborator) response = c.post(url, data) content = response.content.decode() res = key_in_charge in content self.assertTrue( res, msg="filter profile type" + MSG_NOT_FOUND.format(key_in_charge) ) res = key_address in content self.assertTrue( res, msg="filter profile type" + MSG_NOT_FOUND.format(key_address) ) res = key_scientific not in content self.assertTrue( res, msg="filter profile type" + MSG_FOUND.format(key_scientific) ) # user prevail on "all", "profile_type" and "user_types" f_user = CustomForm.objects.create( name="Test - user", form="operation-010-general", available=True ) f_user.users.add(self.user.ishtaruser) response = c.post(url, data) content = response.content.decode() res = key_in_charge in content self.assertTrue(res, msg="filter user" + MSG_NOT_FOUND.format(key_in_charge)) res = key_scientific in content self.assertTrue(res, msg="filter user" + MSG_NOT_FOUND.format(key_scientific)) res = key_address in content self.assertTrue(res, msg="filter user" + MSG_FOUND.format(key_address)) def test_enabled(self): c = Client() c.login(username=self.username, password=self.password) cls_wiz = OperationWizardModifTest() url = reverse(cls_wiz.url_name) # first wizard step step = "selec-operation_modification" cls_wiz.wizard_post(c, url, step, {"pk": self.operations[0].pk}) step = "collaborators-operation_modification" data = { "{}{}-current_step".format(cls_wiz.url_name, cls_wiz.wizard_name): [step], } response = c.post(url, data) self.assertNotEqual(response.status_code, 404) CustomForm.objects.create( name="Test2", form="operation-020-collaborators", available=True, apply_to_all=True, enabled=False, ) response = c.post(url, data) self.assertEqual(response.status_code, 404) def test_json(self): operation = self.operations[0] operation.data = { "groundhog": { "number": 53444, "awake_state": "réveillée", "with_feather": "Oui", }, "frog_number": 32303, } operation.save() content_type = ContentType.objects.get_for_model(operation) field = JsonDataField.objects.create( name="État d'éveil", key="groundhog__awake_state", content_type=content_type, value_type="C", ) form = CustomForm.objects.create( name="Test", form="operation-010-general", available=True, apply_to_all=True ) CustomFormJsonField.objects.create( custom_form=form, json_field=field, label="Le beau", ) c = Client() c.login(username=self.username, password=self.password) cls_wiz = OperationWizardModifTest() url = reverse(cls_wiz.url_name) # first wizard step step = "selec-operation_modification" cls_wiz.wizard_post(c, url, step, {"pk": self.operations[0].pk}) step = "general-operation_modification" data = { "{}{}-current_step".format(cls_wiz.url_name, cls_wiz.wizard_name): [step], } response = c.post(url, data) self.assertIn( b"Le beau", response.content, msg="json field not displayed on modification wizard", ) cls_wiz.wizard_post(c, url, step, {"pk": self.operations[1].pk}) response = c.post(url, data) self.assertIn( b"Le beau", response.content, msg="json field form: existing value should be presented in select", ) def test_json_search(self): operation = self.operations[0] operation.data = { "groundhog": { "awake_state": "réveillée", "with_feather": True, }, } operation.save() content_type = ContentType.objects.get_for_model(operation) field = JsonDataField.objects.create( name="État d'éveil", key="groundhog__awake_state", content_type=content_type, value_type="C", ) field2 = JsonDataField.objects.create( name="As une plume ?", key="groundhog__with_feather", content_type=content_type, value_type="B", ) form = CustomForm.objects.create( name="Test", form="operation-001-search", available=True, apply_to_all=True ) cf1 = CustomFormJsonField.objects.create( custom_form=form, json_field=field, label="Est-elle éveillée ?", ) cf2 = CustomFormJsonField.objects.create( custom_form=form, json_field=field2, label="Plume", ) c = Client() c.login(username=self.username, password=self.password) response = c.get( reverse("get-operation"), {"search_vector": "{}=endormie".format(slugify(cf1.label))}) result = json.loads(response.content.decode()) self.assertEqual(result["recordsTotal"], 0) response = c.get( reverse("get-operation"), {"search_vector": "{}=réveillée".format(slugify(cf1.label))}) result = json.loads(response.content.decode()) self.assertEqual(result["recordsTotal"], 1) response = c.get( reverse("get-operation"), {"search_vector": "{}=oui".format(slugify(cf2.label))}) result = json.loads(response.content.decode()) self.assertEqual(result["recordsTotal"], 1) class OperationSearchTest(TestCase, OperationInitTest, SearchText): fixtures = FILE_FIXTURES SEARCH_URL = "get-operation" def setUp(self): IshtarSiteProfile.objects.get_or_create(slug="default", active=True) self.username, self.password, self.user = create_superuser() self.alt_username, self.alt_password, self.alt_user = create_user() self.alt_user.user_permissions.add( Permission.objects.get(codename="view_own_operation") ) self.alt_user.user_permissions.add( Permission.objects.get(codename="change_own_operation") ) self.alt_username2, self.alt_password2, self.alt_user2 = create_user( username="luke", password="iamyourfather" ) profile = UserProfile.objects.create( profile_type=ProfileType.objects.get(txt_idx="collaborator"), person=self.alt_user2.ishtaruser.person, ) town = Town.objects.create(name="Tatouine", numero_insee="66000") area = Area.objects.create(label="Galaxie", txt_idx="galaxie") area.towns.add(town) profile.areas.add(area) self.orgas = self.create_orgas(self.user) self.create_operation(self.user, self.orgas[0]) self.create_operation(self.alt_user, self.orgas[0]) self.operations = self.create_operation(self.alt_user, self.orgas[0]) self.operations[2].year = 2018 self.operations[2].save() self.item = self.operations[0] def test_base_search(self): c = Client() response = c.get(reverse("get-operation"), {"year": "2010"}) # no result when no authentication self.assertTrue(not json.loads(response.content.decode())) c.login(username=self.username, password=self.password) response = c.get(reverse("get-operation"), {"year": "2010"}) self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 2) response = c.get( reverse("get-operation"), {pgettext_lazy("key for text search", "operator"): self.orgas[0].name}, ) result = json.loads(response.content.decode()) self.assertEqual(result["recordsTotal"], 3) def test_base_search_vector(self): c = Client() response = c.get(reverse("get-operation"), {"search_vector": "chaTEAU"}) # no result when no authentication self.assertTrue(not json.loads(response.content.decode())) c.login(username=self.username, password=self.password) # test vector search with accents operation = models.Operation.objects.get(pk=self.operations[0].pk) operation.common_name = "Opération : Château de Fougères" operation.save() response = c.get(reverse("get-operation"), {"search_vector": "chaTEAU"}) result = json.loads(response.content.decode()) self.assertEqual(result["recordsTotal"], 1) response = c.get(reverse("get-operation"), {"search_vector": "château"}) result = json.loads(response.content.decode()) self.assertEqual(result["recordsTotal"], 1) # test search with inappropriate minus sign response = c.get(reverse("get-operation"), {"search_vector": "chaTEAU - "}) result = json.loads(response.content.decode()) self.assertEqual(result["recordsTotal"], 1) def test_complex_search_vector(self): c = Client() c.login(username=self.username, password=self.password) operation_1 = models.Operation.objects.get(pk=self.operations[0].pk) operation_2 = models.Operation.objects.get(pk=self.operations[1].pk) operation_3 = models.Operation.objects.get(pk=self.operations[2].pk) operation_1.common_name = "Opération : Château de Fougères" operation_1.save() operation_2.common_name = "Opération : Fougère filicophyta et " "herbe à chat" operation_2.save() operation_3.common_name = "Opération : Château Filicophyta" operation_3.save() # simple separation response = c.get( reverse("get-operation"), {"search_vector": "chaTEAU fougeres"} ) result = json.loads(response.content.decode()) self.assertEqual(result["recordsTotal"], 1) response = c.get(reverse("get-operation"), {"search_vector": "chaTEAU fougere"}) result = json.loads(response.content.decode()) self.assertEqual(result["recordsTotal"], 1) # explicit AND response = c.get( reverse("get-operation"), {"search_vector": "chaTEAU & fougere"} ) result = json.loads(response.content.decode()) self.assertEqual(result["recordsTotal"], 1) # explicit OR response = c.get( reverse("get-operation"), {"search_vector": "chaTEAU | fougere"} ) result = json.loads(response.content.decode()) self.assertEqual(result["recordsTotal"], 3) # query with parenthesis # response = c.get(reverse('get-operation'), # {'search_vector': '2010 & (fougere | filicophyta)'}) # result = json.loads(response.content.decode()) # self.assertEqual(result['recordsTotal'], 2) # query with mistmatch parenthesis response = c.get( reverse("get-operation"), {"search_vector": ")) 2010 &) ((chaTEAU | fougere)"}, ) result = json.loads(response.content.decode()) self.assertEqual(result["recordsTotal"], 2) # open search response = c.get(reverse("get-operation"), {"search_vector": "cha*"}) result = json.loads(response.content.decode()) self.assertEqual(result["recordsTotal"], 3) # exclude response = c.get(reverse("get-operation"), {"search_vector": "-fougere"}) result = json.loads(response.content.decode()) self.assertEqual(result["recordsTotal"], 1) def test_facet_search_vector(self): ope1 = self.operations[0] ope2 = self.operations[1] ope3 = self.operations[2] c = Client() c.login(username=self.username, password=self.password) ope1.year = 2042 ope1.end_date = "2010-01-01" ope1.save() ope2.year = 2020 ope2.save() data = {"numero_insee": "05000", "name": "Champoleon (test)"} town = self.create_towns(datas=data)[-1] ope1.towns.add(town) neo = models.Period.objects.get(txt_idx="neolithic") final_neo = models.Period.objects.get(txt_idx="final-neolithic") gallo = models.Period.objects.get(txt_idx="gallo-roman") ope1.periods.add(final_neo) ope1.periods.add(gallo) ope2.periods.add(neo) ope3.periods.add(gallo) villa = models.RemainType.objects.get(txt_idx="villa") ope1.remains.add(villa) search_period_q = str(pgettext("key for text search", "period")) result = [ ('{}="{}"'.format(search_period_q, str(final_neo)), 1), ] self._test_search(c, result, context="Simple period") search_year_q = str(pgettext("key for text search", "year")) result = [ ('{}="{}"'.format(search_year_q, "2042"), 1), ] self._test_search(c, result, context="Integer") result = [ ('{}="{}"'.format(search_year_q, '2042";"2020'), 2), ] self._test_search(c, result, context="Many integer") search_town_q = str(pgettext("key for text search", "town")) town = Town.objects.get(pk=town.pk) result = [ ('{}="{}"'.format(search_town_q, town.cached_label), 1), ] self._test_search(c, result, context="String search with parenthesis and minus") result = [ ('{}="{}"'.format(search_period_q, str(neo)), 2), ] self._test_search(c, result, context="Hierarchic period") result = [ ('-{}="{}"'.format(search_period_q, str(neo)), 1), ] self._test_search(c, result, context="Period exclude") result = [ ( '{}="{}"'.format( search_period_q, '{}";"{}'.format(neo.label, gallo.label) ), 3, ), ( '{}="{}" {}="{}"'.format( search_period_q, neo.label, search_period_q, gallo.label ), 3, ), ] self._test_search(c, result, context="Period OR") # open search '*' result = [ ('{}="{}*"'.format(search_period_q, neo.label[:3]), 2), ] self._test_search(c, result, context="Period open search") """ self._test_search(c, search_period_q, '{}*'.format(neo.label[:3]), 2, "Open search") """ # non hierarchic search search_remain_q = str(pgettext("key for text search", "remain")) result = [ ('{}="{}"'.format(search_remain_q, str(villa)), 1), ] self._test_search(c, result, context="Non hierarchic remain search") # boolean search search_open_q = str(pgettext("key for text search", "is-open")) result = [ ('{}="{}"'.format(search_open_q, str(_("Yes"))), 2), ] self._test_search(c, result, context="Boolean search") def test_mixed_search_vector(self): operation_1 = models.Operation.objects.get(pk=self.operations[0].pk) operation_1.common_name = "Opération : Château de Fougères" operation_1.year = 2042 operation_1.save() c = Client() c.login(username=self.username, password=self.password) search_year_q = str(pgettext("key for text search", "year")) q = '"chateau fougere" {}="2042"'.format(search_year_q) response = c.get(reverse("get-operation"), {"search_vector": q}) result = json.loads(response.content.decode()) self.assertEqual(result["recordsTotal"], 1) def create_relations(self): rel1 = models.RelationType.objects.create( symmetrical=True, label="Include", txt_idx="include" ) rel2 = models.RelationType.objects.create( symmetrical=False, label="Included", txt_idx="included", inverse_relation=rel1, ) models.RecordRelations.objects.create( left_record=self.operations[0], right_record=self.operations[1], relation_type=rel1, ) return rel1, rel2 def test_related_search(self): c = Client() rel1, rel2 = self.create_relations() self.operations[1].year = 2011 self.operations[1].save() reltype_key = pgettext_lazy("key for text search", "relation-types") search = {"search_vector": 'year=2010 {}="{}"'.format(reltype_key, rel2.name)} response = c.get(reverse("get-operation"), search) # no result when no authentication self.assertTrue(not json.loads(response.content.decode())) c.login(username=self.username, password=self.password) response = c.get(reverse("get-operation"), search) self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 2) def test_search_with_problematic_characters(self): c = Client() c.login(username=self.username, password=self.password) ope = self.operations[0] lbl = "aha = take on me | take me on" ope.common_name = lbl ope.save() search_name_q = str(pgettext("key for text search", "name")) result = [ ('{}="{}"'.format(search_name_q, lbl), 1), ] self._test_search(c, result, context="Facet search with = and | characters") def test_search_with_asterisk_inside_names(self): c = Client() c.login(username=self.username, password=self.password) ope = self.operations[0] ope_type = ope.operation_type ope_type.label = "label*with*asterisk" ope_type.save() search_name_q = str(pgettext("key for text search", "type")) nb = models.Operation.objects.filter(operation_type=ope_type).count() result = [ ('{}="{}"'.format(search_name_q, ope_type.label), nb), ] self._test_search(c, result, context="Facet search with * characters") def test_hierarchic_search(self): ope = self.operations[1] c = Client() neo = models.Period.objects.get(txt_idx="neolithic") final_neo = models.Period.objects.get(txt_idx="final-neolithic") recent_neo = models.Period.objects.get(txt_idx="recent-neolithic") ope.periods.add(final_neo) search = {"periods": final_neo.pk} # no result when no authentication response = c.get(reverse("get-operation"), search) self.assertEqual(response.status_code, 200) self.assertTrue(not json.loads(response.content.decode())) c.login(username=self.username, password=self.password) # one result for exact search response = c.get(reverse("get-operation"), search) self.assertEqual(response.status_code, 200) res = json.loads(response.content.decode()) self.assertTrue(res["recordsTotal"] == 1) # no result for the brother search = {"periods": recent_neo.pk} response = c.get(reverse("get-operation"), search) self.assertEqual(response.status_code, 200) self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 0) # one result for the father search = {"periods": neo.pk} response = c.get(reverse("get-operation"), search) self.assertEqual(response.status_code, 200) self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1) # test on text search period_key = str(pgettext_lazy("key for text search", "period")) result = [ ('{}="{}"'.format(period_key, str(final_neo)), 1), ('{}="{}"'.format(period_key, str(recent_neo)), 0), ('{}="{}"'.format(period_key, str(neo)), 1), ] self._test_search(c, result, context="Text period search") def test_town_search(self): c = Client() c.login(username=self.username, password=self.password) data = {"numero_insee": "98989", "name": "base_town"} base_town = self.create_towns(datas=data)[-1] data = {"numero_insee": "56789", "name": "parent_town"} parent_town = self.create_towns(datas=data)[-1] parent_town.children.add(base_town) data = {"numero_insee": "01234", "name": "child_town"} child_town = self.create_towns(datas=data)[-1] base_town.children.add(child_town) ope = self.operations[1] ope.towns.add(base_town) # simple search search = {"towns": base_town.pk} response = c.get(reverse("get-operation"), search) self.assertEqual(response.status_code, 200) self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1) # parent search search = {"towns": parent_town.pk} response = c.get(reverse("get-operation"), search) self.assertEqual(response.status_code, 200) self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1) # child search search = {"towns": child_town.pk} response = c.get(reverse("get-operation"), search) self.assertEqual(response.status_code, 200) self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1) def test_statistics(self): c = Client() c.login(username=self.username, password=self.password) q = {"stats_modality_1": "year", "stats_modality_2": "operation_type__label"} response = c.get(reverse("get-operation", args=["json-stats"]), q) self.assertEqual(response.status_code, 200) expected_result = [] for ope in models.Operation.objects.all(): years = [y for y, res in expected_result] if ope.year in years: year_idx = years.index(ope.year) else: expected_result.append([ope.year, []]) year_idx = len(expected_result) - 1 current_values = expected_result[year_idx][1] values = [v for v, cnt in current_values] val = ope.operation_type.label if val in values: val_idx = values.index(val) else: current_values.append([val, 0]) val_idx = len(current_values) - 1 current_values[val_idx][1] += 1 values = json.loads(response.content.decode()) self.assertEqual(values["data"], expected_result) class OperationPermissionTest(TestCase, OperationInitTest): fixtures = FILE_FIXTURES def setUp(self): IshtarSiteProfile.objects.get_or_create(slug="default", active=True) self.username, self.password, self.user = create_superuser() self.alt_username, self.alt_password, self.alt_user = create_user() self.alt_user.user_permissions.add( Permission.objects.get(codename="view_own_operation") ) self.alt_user.user_permissions.add( Permission.objects.get(codename="change_own_operation") ) self.alt_username2, self.alt_password2, self.alt_user2 = create_user( username="luke", password="iamyourfather" ) profile_type = ProfileType.objects.get(txt_idx="collaborator") profile = UserProfile.objects.create( profile_type=profile_type, person=self.alt_user2.ishtaruser.person, current=True, ) town = Town.objects.create(name="Tatouine", numero_insee="66000") area = Area.objects.create(label="Galaxie", txt_idx="galaxie") area.towns.add(town) profile.areas.add(area) self.orgas = self.create_orgas(self.user) self.operations = self.create_operation(self.user, self.orgas[0]) self.operations += self.create_operation(self.alt_user, self.orgas[0]) self.operations[1].towns.add(town) self.item = self.operations[0] def test_own_search(self): # no result when no authentification c = Client() response = c.get(reverse("get-operation"), {"year": "2010"}) self.assertTrue(not json.loads(response.content.decode())) # possession c = Client() c.login(username=self.alt_username, password=self.alt_password) response = c.get(reverse("get-operation"), {"year": "2010"}) # only one "own" operation available self.assertTrue(json.loads(response.content.decode())) self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1) operator_key = pgettext_lazy("key for text search", "operator") response = c.get(reverse("get-operation"), {operator_key: self.orgas[0].name}) self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1) response = c.get( reverse("get-operation"), {"search_vector": '{}="{}"'.format(operator_key, self.orgas[0].name)}, ) self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1) # area filter c = Client() c.login(username=self.alt_username2, password=self.alt_password2) response = c.get(reverse("get-operation"), {"year": "2010"}) # only one "own" operation available self.assertTrue(json.loads(response.content.decode())) self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1) response = c.get(reverse("get-operation"), {operator_key: self.orgas[0].name}) self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1) response = c.get( reverse("get-operation"), {"search_vector": '{}="{}"'.format(operator_key, self.orgas[0].name)}, ) self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1) def test_own_modify(self): operation_pk1 = self.operations[0].pk operation_pk2 = self.operations[1].pk modif_url = "/operation_modification/general-operation_modification" # no result when no authentification c = Client() response = c.get(reverse("operation_modify", args=[operation_pk2])) self.assertRedirects(response, "/") # possession c = Client() c.login(username=self.alt_username, password=self.alt_password) response = c.get(reverse("operation_modify", args=[operation_pk2]), follow=True) self.assertRedirects(response, modif_url) response = c.get(modif_url) self.assertEqual(response.status_code, 200) response = c.get(reverse("operation_modify", args=[operation_pk1]), follow=True) self.assertRedirects(response, "/") profile_type = ProfileType.objects.get(txt_idx="collaborator") profile_type.groups.add( Group.objects.get( name="Opérations rattachées : " "modification/suppression" ) ) # area filter c = Client() c.login(username=self.alt_username2, password=self.alt_password2) response = c.get(reverse("operation_modify", args=[operation_pk2]), follow=True) self.assertRedirects(response, modif_url) response = c.get(modif_url) self.assertEqual(response.status_code, 200) response = c.get(reverse("operation_modify", args=[operation_pk1]), follow=True) self.assertRedirects(response, "/") class LabelTest(TestCase, OperationInitTest): fixtures = FILE_FIXTURES def setUp(self): IshtarSiteProfile.objects.get_or_create(slug="default", active=True) self.username, self.password, self.user = create_superuser() self.orgas = self.create_orgas(self.user) self.operations = self.create_operation(self.user, self.orgas[0]) def test_label_generation(self): ope = self.operations[0] ope.year = 1789 ope.save() tpl = open( settings.ROOT_PATH + "../archaeological_operations/tests/labels-8.odt", "rb" ) template = SimpleUploadedFile(tpl.name, tpl.read()) model, __ = ImporterModel.objects.get_or_create( klass="archaeological_operations.models.Operation" ) doc = DocumentTemplate.objects.create( name="Labels", slug="labels", associated_model=model, available=True, for_labels=True, label_per_page=8, template=template, ) c = Client() url = reverse("generate-labels", args=[doc.slug]) response = c.get(url) # no result when no authentication self.assertEqual(response.content, b"") c.login(username=self.username, password=self.password) response = c.get(url) content, z, f = None, None, None try: f = BytesIO(response.content) z = zipfile.ZipFile(f) self.assertIsNone(z.testzip()) content = z.open("content.xml") full_content = content.read() self.assertIn(b"1789", full_content, msg="1789 not in generated label") # jpe file are added for missing pictures / must be filtered self.assertNotIn(b'.jpe"', full_content) finally: if content: content.close() if z: z.close() if f: f.close() class DashboardTest(TestCase, OperationInitTest): fixtures = FILE_FIXTURES def setUp(self): IshtarSiteProfile.objects.get_or_create(slug="default", active=True) self.username, self.password, self.user = create_superuser() self.orgas = self.create_orgas(self.user) self.operations = self.create_operation(self.user, self.orgas[0]) def test_dashboard(self): url = "dashboard-operation" c = Client() c.login(username=self.username, password=self.password) response = c.get(reverse(url)) self.assertEqual(response.status_code, 200) def create_administrativact(user, operation): act_type, created = models.ActType.objects.get_or_create( txt_idx="act_type_O", intented_to="O" ) dct = { "history_modifier": user, "act_type": act_type, "operation": operation, "signature_date": datetime.date(2014, 5, 12), "index": 322, } adminact, created = models.AdministrativeAct.objects.get_or_create(**dct) return [act_type], [adminact] class RegisterTest(TestCase, OperationInitTest): fixtures = FILE_FIXTURES def setUp(self): self.username, self.password, self.user = create_superuser() self.operations = self.create_operation(self.user) self.act_types, self.admin_acts = create_administrativact( self.user, self.operations[0] ) def test_search(self): c = Client() response = c.get(reverse("get-administrativeact"), {"year": "2014"}) # no result when no authentication self.assertTrue(not json.loads(response.content.decode())) c.login(username=self.username, password=self.password) response = c.get(reverse("get-administrativeact"), {"year": "2014"}) self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1) response = c.get(reverse("get-administrativeact"), {"indexed": "2"}) self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1) def test_document_generation(self): tpl = open( settings.ROOT_PATH + "../archaeological_operations/tests/document_reference.odt", "rb", ) template = SimpleUploadedFile(tpl.name, tpl.read()) model, __ = ImporterModel.objects.get_or_create( klass="archaeological_operations.models.AdministrativeAct" ) doc = DocumentTemplate.objects.create( name="Test", slug=True, associated_model=model, available=True, template=template, ) self.act_types[0].associated_template.add(doc) c = Client() data = {"pk": self.admin_acts[0].pk, "document_template": doc.pk} response = c.post(reverse("operation-administrativeact-document"), data) # no result when no authentication self.assertEqual(response.content, b"") c.login(username=self.username, password=self.password) response = c.post(reverse("operation-administrativeact-document"), data) content, z, f = None, None, None try: f = BytesIO(response.content) z = zipfile.ZipFile(f) self.assertIsNone(z.testzip()) content = z.open("content.xml") self.assertIn(b"2014-05-12", content.read()) finally: if content: content.close() if z: z.close() if f: f.close() class OperationWizardCreationTest(WizardTest, OperationInitTest, TestCase): fixtures = FILE_FIXTURES url_name = "operation_creation" wizard_name = "operation_wizard" steps = views.wizard_steps redirect_url = ( "/operation_modification/selec-operation_modification" "?open_item={last_id}" ) model = models.Operation form_datas = [ FormData( "Create a preventive diag", form_datas={ "filechoice": {}, "general": { "code_patriarche": "codeope1", "operation_type": None, "year": 2016, }, "townsgeneral": [], "parcelsgeneral": [], }, ignored=( "towns-operation_creation", "parcels-operation_creation", "judiciary-operation_creation", "preventive-operation_creation", ), ), FormData( "Create another preventive diag with same parcel name", form_datas={ "filechoice": {}, "general": { "code_patriarche": "codeope2", "operation_type": None, "year": 2016, }, "townsgeneral": [], "parcelsgeneral": [], }, ignored=( "towns-operation_creation", "parcels-operation_creation", "judiciary-operation_creation", "preventive-operation_creation", ), ), FormData( "Create an operation related to a file", form_datas={ "filechoice": {}, "general": { "code_patriarche": "codeope3", "operation_type": None, "year": 2016, }, "towns": [], "parcels": [], }, ignored=( "townsgeneral-operation_creation", "parcelsgeneral-operation_creation", "judiciary-operation_creation", "preventive-operation_creation", ), ), ] def pre_wizard(self): profile, created = IshtarSiteProfile.objects.get_or_create( slug="default", active=True ) profile.files = True profile.save() if "townsgeneral" not in self.form_datas[0].form_datas: return super(OperationWizardCreationTest, self).pre_wizard() town = self.create_towns()[0] town_data = {"town": town.pk} parcel_data = { "town": town.pk, "year": 2017, "section": "S", "parcel_number": "42", } for idx in range(2): self.form_datas[idx].append("townsgeneral", town_data) self.form_datas[idx].append("parcelsgeneral", parcel_data) FI = FileInit() FI.create_file() file = FI.item file.towns.add(town) parcel = models.Parcel.objects.create( town=town, year=2017, section="G", parcel_number="43" ) file.parcels.add(parcel) self.form_datas[2].set("filechoice", "associated_file", file.pk) self.form_datas[2].append("parcelsgeneral", {"parcel": parcel.pk}) self.form_datas[2].append("towns", town_data) self.form_datas[2].append("parcels", {"parcel": parcel.pk}) # diagnostic ope_type = models.OperationType.objects.get(txt_idx="arch_diagnostic") self.form_datas[0].set("general", "operation_type", ope_type.pk) self.form_datas[1].set("general", "operation_type", ope_type.pk) self.form_datas[2].set("general", "operation_type", ope_type.pk) self.operation_number = models.Operation.objects.count() self.parcel_number = models.Parcel.objects.count() super(OperationWizardCreationTest, self).pre_wizard() def post_wizard(self): self.assertEqual(models.Operation.objects.count(), self.operation_number + 3) operations = models.Operation.objects.order_by("-pk").all()[:3] parcel_ids = [] for operation in operations: for parcel in operation.parcels.all(): parcel_ids.append(parcel.external_id) self.assertEqual( list(sorted(parcel_ids)), ["codeope1-12345-S42", "codeope2-12345-S42", "codeope3-12345-G43"], ) self.assertEqual(models.Parcel.objects.count(), self.parcel_number + 3) class OperationWizardModifTest(WizardTest, OperationInitTest, TestCase): fixtures = FILE_FIXTURES url_name = "operation_modification" wizard_name = url_name + "_wizard" steps = views.operation_modif_wizard_steps redirect_url = "/{url_name}/selec-{url_name}?open_item={current_id}" model = models.Operation base_ignored_steps = ( "archaeologicalsite-operation_modification", "preventive-operation_modification", "preventivediag-operation_modification", "judiciary-operation_modification", "towns-operation_modification", "parcels-operation_modification", "remains-operation_modification", "periods-operation_modification", "relations-operation_modification", "abstract-operation_modification", ) form_datas = [ FormData( "Update an operation", form_datas={ "selec": {}, "general": {"operation_type": 2, "year": 2017}, "townsgeneral": [], "parcelsgeneral": [], }, ignored=base_ignored_steps, ), FormData( "Operation: try to remove a parcel with attached context record", form_datas={ "selec": {}, "general": { "code_patriarche": "codeope42", "operation_type": 2, "year": 2017, }, "townsgeneral": [], "parcelsgeneral": [], }, ignored=base_ignored_steps, ), FormData( "Operation: remove a parcel with no attached context record", form_datas={ "selec": {}, "general": {"operation_type": 2, "year": 2017}, "townsgeneral": [], "parcelsgeneral": [], }, ignored=base_ignored_steps, ), FormData( "Set an operation to an exiting operation code for this year", form_datas={ "selec": {}, "general": {"operation_type": 2, "operation_code": 42, "year": 2017}, "townsgeneral": [], "parcelsgeneral": [], }, ignored=base_ignored_steps, error_expected="general", ), FormData( "Operation: change a parcel", form_datas={ "selec": {}, "general": {"operation_type": 2, "year": 2017}, "townsgeneral": [], "parcelsgeneral": [], }, ignored=base_ignored_steps, ), FormData( "Operation: change a town", form_datas={ "selec": {}, "general": {"operation_type": 2, "year": 2017}, "townsgeneral": [], "parcelsgeneral": [], }, ignored=base_ignored_steps, ), ] def pre_wizard(self): self.create_operation() operation = self.operations[0] init_town = self.create_towns()[0] operation.towns.add(init_town) init_parcel = self.create_parcel()[0] operation.parcels.add(init_parcel) self.create_operation() operation2 = self.operations[1] operation2.year = 2017 operation2.operation_code = 42 operation2.save() from archaeological_context_records.models import ContextRecord cr_data = { "label": "Context record", "operation": operation, "parcel": init_parcel, "history_modifier": self.get_default_user(), } self.cr = ContextRecord.objects.create(**cr_data) # diagnostic self.ope_type = models.OperationType.objects.get(txt_idx="prev_excavation") self.form_datas[0].set("general", "operation_type", self.ope_type.pk) self.form_datas[1].set("general", "operation_type", self.ope_type.pk) self.form_datas[2].set("general", "operation_type", self.ope_type.pk) self.form_datas[3].set("general", "operation_type", self.ope_type.pk) self.form_datas[4].set("general", "operation_type", self.ope_type.pk) self.form_datas[5].set("general", "operation_type", self.ope_type.pk) data = self.form_datas[0].form_datas data2 = self.form_datas[1].form_datas data3 = self.form_datas[2].form_datas data4 = self.form_datas[3].form_datas data5 = self.form_datas[4].form_datas data6 = self.form_datas[5].form_datas data["selec"]["pk"] = operation.pk data2["selec"]["pk"] = operation.pk data3["selec"]["pk"] = operation.pk data4["selec"]["pk"] = operation.pk data5["selec"]["pk"] = operation.pk data6["selec"]["pk"] = operation.pk town = self.create_towns(datas={"numero_insee": "67890", "name": "Twin Peaks"})[ -1 ] town_2 = self.create_towns( datas={"numero_insee": "85000", "name": "South Peaks"} )[-1] towns = [{"town": town.pk}, {"town": init_town.pk}] parcel_data = { "town": town.pk, "year": 2017, "section": "S", "parcel_number": "42", } for idx in range(0, 5): for t in towns: self.form_datas[idx].append("townsgeneral", t) if idx != 4: self.form_datas[idx].append("parcelsgeneral", parcel_data) parcel_data_2 = { "town": init_parcel.town.pk, "year": init_parcel.year or "", "section": init_parcel.section, "pk": init_parcel.pk, "parcel_number": init_parcel.parcel_number, } data["parcelsgeneral"].append(parcel_data_2) p = parcel_data.copy() p["parcel_number"] = "43" self.form_datas[4].form_datas["parcelsgeneral"] = [p] self.form_datas[5].form_datas["townsgeneral"] = [{"town": town_2.pk}] self.operation_number = models.Operation.objects.count() self.parcel_number = models.Parcel.objects.count() def post_first_wizard(test_object, final_step_response): test_object.assertEqual( models.Operation.objects.count(), test_object.operation_number ) operation = models.Operation.objects.get(pk=test_object.operations[0].pk) test_object.assertEqual(operation.operation_type.pk, self.ope_type.pk) test_object.assertEqual(operation.year, 2017) test_object.assertEqual( models.Parcel.objects.count(), test_object.parcel_number + 1 ) test_object.assertEqual( operation.parcels.count(), test_object.parcel_number + 1 ) def pre_second_wizard(test_object): test_object.form_datas[1].form_datas[ "parcelsgeneral-operation_modification" ][0]["pk"] = models.Parcel.objects.get(parcel_number="42").pk def post_second_wizard(test_object, final_step_response): test_object.assertEqual( models.Operation.objects.count(), test_object.operation_number ) operation = models.Operation.objects.get(pk=test_object.operations[0].pk) test_object.assertEqual(operation.operation_type.pk, self.ope_type.pk) test_object.assertEqual(operation.year, 2017) test_object.assertEqual( models.Parcel.objects.count(), test_object.parcel_number + 1 ) # the init parcel is not submited but have a context record # the init parcel is not detached from the operation test_object.assertEqual( operation.parcels.count(), test_object.parcel_number + 1 ) # update the external id on update cr = ContextRecord.objects.get(pk=self.cr.pk) test_object.assertEqual(cr.external_id, "codeope42-12345-A1-Context record") def pre_third_wizard(test_object): parcel_nb = models.Parcel.objects.count() test_object.cr.delete() test_object.assertEqual(parcel_nb, models.Parcel.objects.count()) def post_third_wizard(test_object, final_step_response): test_object.assertEqual( models.Operation.objects.count(), test_object.operation_number ) operation = models.Operation.objects.get(pk=test_object.operations[0].pk) test_object.assertEqual(operation.operation_type.pk, self.ope_type.pk) test_object.assertEqual(operation.year, 2017) # with no attach the parcel is deleted test_object.assertEqual( operation.parcels.count(), test_object.parcel_number ) # the parcel object is no more automatically deleted test_object.assertEqual( models.Parcel.objects.count(), test_object.parcel_number + 1 ) def pre_fifth_wizard(test_object): test_object.parcel_number = models.Parcel.objects.count() operation = models.Operation.objects.get(pk=test_object.operations[0].pk) test_object.operation_parcel_number = operation.parcels.count() test_object.form_datas[4].form_datas[ "parcelsgeneral-operation_modification" ][0]["pk"] = models.Parcel.objects.get(parcel_number="42").pk def post_fifth_wizard(test_object, final_step_response): test_object.assertEqual( models.Operation.objects.count(), test_object.operation_number ) operation = models.Operation.objects.get(pk=test_object.operations[0].pk) test_object.assertEqual( models.Parcel.objects.count(), test_object.parcel_number ) test_object.assertEqual( operation.parcels.count(), test_object.operation_parcel_number ) def pre_sixth_wizard(test_object): operation = models.Operation.objects.get(pk=test_object.operations[0].pk) operation.parcels.clear() operation.towns.clear() def post_sixth_wizard(test_object, final_step_response): operation = models.Operation.objects.get(pk=test_object.operations[0].pk) town_str = town_2.name test_object.assertEqual( operation.complete_identifier[:len(town_str)], town_str ) self.form_datas[0].extra_tests = [post_first_wizard] self.form_datas[1].pre_tests = [pre_second_wizard] self.form_datas[1].extra_tests = [post_second_wizard] self.form_datas[2].pre_tests = [pre_third_wizard] self.form_datas[2].extra_tests = [post_third_wizard] self.form_datas[4].pre_tests = [pre_fifth_wizard] self.form_datas[4].extra_tests = [post_fifth_wizard] self.form_datas[5].pre_tests = [pre_sixth_wizard] self.form_datas[5].extra_tests = [post_sixth_wizard] super(OperationWizardModifTest, self).pre_wizard() class OperationWizardDeleteTest(OperationWizardCreationTest): fixtures = FILE_FIXTURES url_name = "operation_deletion" wizard_name = "operation_deletion_wizard" steps = views.operation_deletion_steps redirect_url = "/{}/selec-{}".format(url_name, url_name) form_datas = [ FormData( "Wizard deletion test", form_datas={ "selec-operation_deletion": {"pks": None}, }, ) ] def pass_test(self): if not settings.TEST_VIEWS: # with no migration the views are not created return True def pre_wizard(self): self.ope = self.get_default_operation(force=True) self.ope.parcels.add(self.create_parcel()[0]) self.parcel_nb = models.Parcel.objects.count() self.form_datas[0].form_datas["selec-operation_deletion"]["pks"] = self.ope.pk self.operation_number = models.Operation.objects.count() super(OperationWizardDeleteTest, self).pre_wizard() def post_wizard(self): self.assertEqual(self.operation_number - 1, models.Operation.objects.count()) # associated parcel is... no more removed self.assertEqual(self.parcel_nb, models.Parcel.objects.count()) class OperationWizardClosingTest(OperationWizardCreationTest): fixtures = FILE_FIXTURES url_name = "operation_closing" wizard_name = "operation_closing_wizard" steps = views.operation_closing_steps redirect_url = "/operation_closing/done" form_datas = [ FormData( "Wizard closing test", form_datas={ "selec-operation_closing": {"pk": None}, "date-operation_closing": {"end_date": "2016-01-01"}, }, ) ] def pre_wizard(self): self.ope = self.get_default_operation() self.form_datas[0].form_datas["selec-operation_closing"]["pk"] = self.ope.pk self.assertTrue(self.ope.is_active()) super(OperationWizardClosingTest, self).pre_wizard() def post_wizard(self): ope = models.Operation.objects.get(pk=self.ope.pk) self.assertFalse(ope.is_active()) self.assertEqual( ope.closing()["date"].strftime("%Y-%d-%m"), self.form_datas[0].form_datas["date-operation_closing"]["end_date"], ) class OperationAdminActWizardCreationTest(WizardTest, OperationInitTest, TestCase): fixtures = FILE_FIXTURES url_name = "operation_administrativeactop" wizard_name = "operation_administrative_act_wizard" steps = views.administrativeactop_steps form_datas = [ FormData( "Admin act creation", form_datas={ "selec-operation_administrativeactop": {}, "administrativeact-operation_administrativeactop": { "signature_date": str(datetime.date.today()) }, }, ) ] def pre_wizard(self): ope = self.get_default_operation() self.number = models.AdministrativeAct.objects.count() data = self.form_datas[0].form_datas data["selec-operation_administrativeactop"]["pk"] = ope.pk act = models.ActType.objects.filter(intented_to="O").all()[0].pk data["administrativeact-operation_administrativeactop"]["act_type"] = act super(OperationAdminActWizardCreationTest, self).pre_wizard() def post_wizard(self): self.assertEqual(models.AdministrativeAct.objects.count(), self.number + 1) class SiteTest(TestCase, OperationInitTest): fixtures = FILE_FIXTURES def setUp(self): IshtarSiteProfile.objects.get_or_create(slug="default", active=True) self.username, self.password, self.user = create_superuser() self.alt_username, self.alt_password, self.alt_user = create_user() self.alt_user.user_permissions.add( Permission.objects.get(codename="view_own_operation") ) self.orgas = self.create_orgas(self.user) def test_create_or_update_top_operations(self): operation_0 = self.create_operation(self.user, self.orgas[0])[0] operation_1 = self.create_operation(self.alt_user, self.orgas[0])[1] site = models.ArchaeologicalSite.objects.create(reference="ref-site") site.create_or_update_top_operation() q = models.ArchaeologicalSite.objects.filter(reference="ref-site") site = q.all()[0] # creation not forced - no creation self.assertEqual(site.top_operations.count(), 0) site.create_or_update_top_operation(create=True) site = q.all()[0] # a default operation has been created self.assertEqual(site.top_operations.count(), 1) self.assertTrue(site.top_operation.virtual_operation) self.assertEqual(site.top_operation.right_relations.count(), 0) # create with one operation attached site.top_operation.delete() site = q.all()[0] site.operations.add(operation_0) site.create_or_update_top_operation(create=True) site = q.all()[0] self.assertIsNotNone(site.top_operation) self.assertTrue(site.top_operation.virtual_operation) self.assertEqual(site.top_operation.right_relations.count(), 1) self.assertEqual( site.top_operation.right_relations.all()[0].right_record, operation_0 ) # create with two operations attached site.top_operation.delete() site = q.all()[0] site.operations.add(operation_0) site.operations.add(operation_1) site.create_or_update_top_operation(create=True) site = q.all()[0] self.assertIsNotNone(site.top_operation) self.assertTrue(site.top_operation.virtual_operation) self.assertEqual(site.top_operation.right_relations.count(), 2) attached = [ rel.right_record for rel in site.top_operation.right_relations.all() ] self.assertIn(operation_0, attached) self.assertIn(operation_1, attached) # detach one operation site.operations.remove(operation_1) site.create_or_update_top_operation() site = q.all()[0] self.assertIsNotNone(site.top_operation) self.assertTrue(site.top_operation.virtual_operation) self.assertEqual(site.top_operation.right_relations.count(), 1) self.assertEqual( site.top_operation.right_relations.all()[0].right_record, operation_0 ) # reattach it site.operations.add(operation_1) site.create_or_update_top_operation() site = q.all()[0] self.assertIsNotNone(site.top_operation) self.assertTrue(site.top_operation.virtual_operation) self.assertEqual(site.top_operation.right_relations.count(), 2) attached = [ rel.right_record for rel in site.top_operation.right_relations.all() ] self.assertIn(operation_0, attached) self.assertIn(operation_1, attached) def test_search(self): site = models.ArchaeologicalSite.objects.create(reference="reference-site") c = Client() search = {"search_vector": 'reference="reference-site"'} response = c.get(reverse("get-site"), search) # no result when no authentication self.assertTrue(not json.loads(response.content.decode())) c.login(username=self.username, password=self.password) response = c.get(reverse("get-site"), search) self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1) search = {"search_vector": 'reference="reference"'} response = c.get(reverse("get-site"), search) self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 0) search = {"search_vector": 'reference="reference*"'} response = c.get(reverse("get-site"), search) self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1) class GenerateQRCode(OperationInitTest, TestCase): fixtures = FILE_FIXTURES def setUp(self): self.username, self.password, self.user = create_superuser() self.operation = self.create_operation(self.user)[0] def test_display(self): if self.operation.qrcode.name: self.operation.qrcode = None self.operation.save() operation = models.Operation.objects.get(pk=self.operation.pk) self.assertIn(operation.qrcode.name, ["", None]) c = Client() url = reverse( "qrcode-item", args=["archaeological-operations", "operation", operation.pk] ) response = c.get(url) self.assertEqual(response.status_code, 302) c.login(username=self.username, password=self.password) response = c.get(url) self.assertEqual(response.status_code, 200) self.assertEqual(response["Content-Type"], "image/png") operation = models.Operation.objects.get(pk=self.operation.pk) self.assertIsNotNone(operation.qrcode.name) def test_generation(self): self.assertIsNone(self.operation.qrcode.name) self.operation.generate_qrcode() self.assertIsNotNone(self.operation.qrcode.name) self.assertTrue( self.operation.qrcode.name.startswith("operation/2010/OA1/qrcode") ) class DocumentTest(OperationInitTest, TestCase): fixtures = FILE_FIXTURES def setUp(self): self.username, self.password, self.user = create_superuser() self.operation = self.create_operation(self.user)[0] def test_create(self): c = Client() url = reverse("create-document") nb_doc = models.Document.objects.count() nb_doc_ope = self.operation.documents.count() response = c.get(url, {"operation": self.operation.pk}) self.assertEqual(response.status_code, 302) c.login(username=self.username, password=self.password) response = c.get(url, {"operation": self.operation.pk}) self.assertEqual(response.status_code, 200) self.assertIn( 'option value="{}" selected'.format(self.operation.pk), response.content.decode(), ) posted = {"authors": []} for related_key in models.Document.RELATED_MODELS: posted[related_key] = [] posted["operations"] = [str(self.operation.pk)] response = c.post(url, posted) # at least a minimum info have to be given self.assertEqual(response.status_code, 200) self.assertIn("errorlist", response.content.decode()) posted["title"] = "hop" response = c.post(url, posted) self.assertEqual(nb_doc + 1, models.Document.objects.count()) self.assertEqual(nb_doc_ope + 1, self.operation.documents.count()) self.assertRedirects( response, "/document/edit/?open_item={}".format( self.operation.documents.order_by("-pk").all()[0].pk ), ) def test_edit(self): doc = models.Document.objects.create(title="hop2") doc.operations.add(self.operation) c = Client() url = reverse("edit-document", args=[doc.pk]) response = c.get(url) self.assertEqual(response.status_code, 302) c.login(username=self.username, password=self.password) response = c.get(url) self.assertEqual(response.status_code, 200) self.assertIn( 'option value="{}" selected'.format(self.operation.pk), response.content.decode(), ) posted = {"authors": [], "title": "hop2-is-back"} for related_key in models.Document.RELATED_MODELS: posted[related_key] = [] posted["operations"] = [str(self.operation.pk)] response = c.post(url, posted) self.assertRedirects(response, "/document/edit/?open_item={}".format(doc.pk)) response = c.get("/show-document/{}/".format(doc.pk)) self.assertIn(posted["title"], response.content.decode()) class DocumentWizardDeleteTest(WizardTest, OperationInitTest, TestCase): fixtures = FILE_FIXTURES url_name = "document_deletion" url_uri = "document/delete" wizard_name = "document_deletion_wizard" redirect_url = "/{}/selec-{}".format(url_uri, url_name) steps = document_deletion_steps form_datas = [ FormData( "Delete document", form_datas={ "selec": {"pks": None}, }, ), ] def pre_wizard(self): ope = self.get_default_operation() document = Document.objects.create(title="testy") document.operations.add(ope) self.ope_id = ope.pk self.form_datas[0].set("selec", "pks", document.pk) self.doc_nb = Document.objects.count() super(DocumentWizardDeleteTest, self).pre_wizard() def post_wizard(self): self.assertEqual(Document.objects.count(), self.doc_nb - 1) # operation not deleted with the document self.assertEqual(models.Operation.objects.filter(pk=self.ope_id).count(), 1) class AutocompleteTest(AutocompleteTestBase, TestCase): fixtures = OPERATION_FIXTURES models = [ AcItem( models.Operation, "autocomplete-operation", prepare_func="create_operation" ), AcItem( models.ArchaeologicalSite, "autocomplete-archaeologicalsite", "reference" ), AcItem( models.Operation, "autocomplete-patriarche", prepare_func="create_operation_patriarche", id_key="code_patriarche", one_word_search=True, ), ] def create_operation(self, base_name): item, __ = models.Operation.objects.get_or_create( common_name=base_name, operation_type=models.OperationType.objects.all()[0] ) return item, None def create_operation_patriarche(self, base_name): item, __ = models.Operation.objects.get_or_create( code_patriarche=base_name, operation_type=models.OperationType.objects.all()[0], ) return item, None class OperationQATest(OperationInitTest, TestCase): fixtures = OPERATION_FIXTURES model = models.Operation def setUp(self): self.username, self.password, self.user = create_superuser() self.orgas = self.create_orgas(self.user) self.create_operation(self.user, self.orgas[0]) self.create_operation(self.user, self.orgas[0]) self.alt_username, self.alt_password, self.alt_user = create_user() self.alt_user.user_permissions.add( Permission.objects.get(codename="change_operation") ) def test_lock(self): c = Client() op0, op1 = self.operations[0], self.operations[1] pks = "{}-{}".format(op0.pk, op1.pk) url = reverse("operation-qa-lock", args=[pks]) response = c.get(url) self.assertEqual(response.status_code, 404) c.login(username=self.username, password=self.password) response = c.get(reverse("operation-qa-lock", args=[pks])) self.assertEqual(response.status_code, 200) response = c.post(url, {"action": "lock"}) if response.status_code != 200: self.assertRedirects(response, "/success/") for ope in (op0, op1): ope = models.Operation.objects.get(pk=ope.pk) self.assertEqual(ope.locked, True) self.assertEqual(ope.lock_user, self.user) response = c.post(url, {"action": "unlock"}) if response.status_code != 200: self.assertRedirects(response, "/success/") for ope in (op0, op1): ope = models.Operation.objects.get(pk=ope.pk) self.assertEqual(ope.locked, False) self.assertEqual(ope.lock_user, None) c = Client() c.login(username=self.alt_username, password=self.alt_password) response = c.get(reverse("operation-qa-lock", args=[pks])) self.assertEqual(response.status_code, 200) response = c.post(url, {"action": "lock"}) if response.status_code != 200: self.assertRedirects(response, "/success/") for ope in (op0, op1): ope = models.Operation.objects.get(pk=ope.pk) self.assertEqual(ope.locked, True) self.assertEqual(ope.lock_user, self.alt_user) response = c.post(url, {"action": "unlock"}) if response.status_code != 200: self.assertRedirects(response, "/success/") for ope in (op0, op1): ope = models.Operation.objects.get(pk=ope.pk) self.assertEqual(ope.locked, False) self.assertEqual(ope.lock_user, None) # one item lock by another user op0 = models.Operation.objects.get(pk=op0.pk) op0.locked = True op0.lock_user = self.user op0.save() op1 = models.Operation.objects.get(pk=op1.pk) op1.locked = True op1.lock_user = self.alt_user op1.save() response = c.post(url, {"action": "unlock"}) self.assertRedirects(response, "/qa-not-available/locked-by-others/") op0 = models.Operation.objects.get(pk=op0.pk) self.assertEqual(op0.locked, True) self.assertEqual(op0.lock_user, self.user) op1 = models.Operation.objects.get(pk=op1.pk) self.assertEqual(op1.locked, True) self.assertEqual(op1.lock_user, self.alt_user) def test_bulk_update(self): c = Client() pks = "{}-{}".format(self.operations[0].pk, self.operations[1].pk) response = c.get(reverse("operation-qa-bulk-update", args=[pks])) self.assertRedirects(response, "/") c = Client() c.login(username=self.username, password=self.password) response = c.get(reverse("operation-qa-bulk-update", args=[pks])) self.assertEqual(response.status_code, 200) c = Client() c.login(username=self.alt_username, password=self.alt_password) response = c.get(reverse("operation-qa-bulk-update", args=[pks])) self.assertEqual(response.status_code, 200) operation_0 = self.operations[0] operation_1 = self.operations[1] base_desc_0 = "Base description 1" operation_0.description = base_desc_0 operation_0.save() base_desc_1 = "Base description 2" operation_1.description = base_desc_1 operation_1.save() operation_type = models.OperationType.objects.exclude( txt_idx="arch_diagnostic" ).all()[0] self.assertNotEqual( models.Operation.objects.get(pk=operation_0.pk).operation_type, operation_type, ) self.assertNotEqual( models.Operation.objects.get(pk=operation_1.pk).operation_type, operation_type, ) response = c.post( reverse("operation-qa-bulk-update-confirm", args=[pks]), {"qa_operation_type": operation_type.pk}, ) if response.status_code != 200: self.assertRedirects(response, "/success/") self.assertEqual( models.Operation.objects.get(pk=operation_0.pk).operation_type, operation_type, ) self.assertEqual( models.Operation.objects.get(pk=operation_1.pk).operation_type, operation_type, ) # one item lock by another user op0 = models.Operation.objects.get(pk=self.operations[0].pk) op0.locked = True op0.lock_user = self.user op0.save() response = c.post( reverse("operation-qa-bulk-update-confirm", args=[pks]), {"qa_operation_type": operation_type.pk}, ) self.assertRedirects(response, "/qa-not-available/locked/") class DocumentQATest(OperationInitTest, TestCase): fixtures = OPERATION_FIXTURES model = Document def setUp(self): self.username, self.password, self.user = create_superuser() self.alt_username, self.alt_password, self.alt_user = create_user() self.alt_user.user_permissions.add( Permission.objects.get(codename="change_document") ) self.source_1 = models.Document.objects.create( title="Source title", source_type=models.SourceType.objects.all()[0] ) self.source_2 = models.Document.objects.create( title="Source title2", source_type=models.SourceType.objects.all()[0] ) def test_bulk_update(self): c = Client() pks = "{}-{}".format(self.source_1.pk, self.source_2.pk) response = c.get(reverse("document-qa-bulk-update", args=[pks])) self.assertRedirects(response, "/") c = Client() c.login(username=self.username, password=self.password) response = c.get(reverse("document-qa-bulk-update", args=[pks])) self.assertEqual(response.status_code, 200) c = Client() c.login(username=self.alt_username, password=self.alt_password) response = c.get(reverse("document-qa-bulk-update", args=[pks])) self.assertEqual(response.status_code, 200) document_0 = self.source_1 document_1 = self.source_2 source_type = models.SourceType.objects.exclude( txt_idx=self.source_1.source_type.txt_idx ).all()[0] self.assertNotEqual( models.Document.objects.get(pk=document_0.pk).source_type, source_type ) self.assertNotEqual( models.Document.objects.get(pk=document_1.pk).source_type, source_type ) response = c.post( reverse("document-qa-bulk-update-confirm", args=[pks]), {"qa_source_type": source_type.pk}, ) if response.status_code != 200: self.assertRedirects(response, "/success/") self.assertEqual( models.Document.objects.get(pk=document_0.pk).source_type, source_type ) self.assertEqual( models.Document.objects.get(pk=document_1.pk).source_type, source_type )