#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2015-2025 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. from copy import deepcopy import csv from io import StringIO import json import os import shutil import tempfile from zipfile import ZipFile import subprocess from osgeo import ogr, osr from rest_framework.test import APITestCase from rest_framework.authtoken.models import Token from django.conf import settings from django.contrib.auth.models import User, Permission, ContentType, Group from django.core.files import File from django.core.files.uploadedfile import SimpleUploadedFile from django.db.utils import IntegrityError from django.test import LiveServerTestCase from django.test.client import Client from django.urls import reverse from ishtar_common.models import ( ImporterType, IshtarUser, ImporterColumn, FormaterType, ImportTarget, IshtarSiteProfile, ProfileType, ImporterModel, DocumentTemplate, GeoVectorData, ImporterGroup ) from django.utils.text import slugify from django.utils.translation import pgettext_lazy, gettext_lazy as _ from ishtar_common.models import ( Person, get_current_profile, UserProfile, Town, Organization, OrganizationType, Area, Document, ) from archaeological_operations.models import AdministrativeAct from archaeological_context_records.models import ( Period, Dating, ContextRecord, DatingType, DatingQuality, ) from archaeological_finds import models, views from archaeological_warehouse.models import ( Warehouse, WarehouseType, ContainerType, Container, WarehouseDivisionLink, ) from archaeological_operations.models import Operation, OperationType from ishtar_common import forms_common from ishtar_common.serializers import restore_serialized from ishtar_common.tests import ( WizardTest, WizardTestFormData as FormData, TestCase, create_user, create_superuser, AutocompleteTestBase, AcItem, FIND_FIXTURES, FIND_TOWNS_FIXTURES, WAREHOUSE_FIXTURES, COMMON_FIXTURES, GenericSerializationTest, SearchText, ) from archaeological_operations.tests import ImportTest, create_operation, \ create_administrativact, TestPermissionQuery from archaeological_context_records.tests import ContextRecordInit from archaeological_operations.serializers import operation_serialization from archaeological_context_records.serializers import cr_serialization from archaeological_finds import serializers class FindInit(ContextRecordInit): test_context_records = False def create_finds(self, data_base=None, data=None, user=None, force=False): if not data_base: data_base = {} if not data: data = {} if not getattr(self, "finds", None): self.finds = [] if not getattr(self, "base_finds", None): self.base_finds = [] default = {"label": "Base find"} if user: data_base["history_modifier"] = user elif ( not data_base.get("history_modifier") or not data_base["history_modifier"].pk ): user = self.get_default_user() user.save() data_base["history_modifier"] = user if force or not data_base.get("context_record"): data_base["context_record"] = self.get_default_context_record( force=force, user=user ) default.update(data_base) base_find = models.BaseFind.objects.create(**default) self.base_finds.append(base_find) data["history_modifier"] = data_base["history_modifier"] find = models.Find.objects.create(**data) find.base_finds.add(base_find) self.finds.append(find) return self.finds, self.base_finds def get_default_find(self, force=False): finds, base_finds = self.create_finds(force=force) if force: return finds[-1], base_finds[-1] return finds[0], base_finds[0] def tearDown(self): super(FindInit, self).tearDown() # nosec: quick and dirty cleanup do not care to catch exceptions if hasattr(self, "finds"): for f in self.finds: try: f.delete() except: # nosec pass self.finds = [] if hasattr(self, "base_finds"): for f in self.base_finds: try: f.delete() except: # nosec pass self.base_find = [] class SerializationTest(GenericSerializationTest, FindInit, TestCase): fixtures = COMMON_FIXTURES + WAREHOUSE_FIXTURES def _fixture_teardown(self): try: super()._fixture_teardown() except IntegrityError: print("Strange error patch...") # TODO: remove pass def setUp(self): ope1 = self.create_operation()[0] ope2 = self.create_operation()[1] cr = self.create_context_record(data={"label": "CR 1", "operation": ope1})[0] cr2 = self.create_context_record(data={"label": "CR 2", "operation": ope2})[1] self.create_finds(data_base={"context_record": cr})[0] self.create_finds(data_base={"context_record": cr2})[1] # basket = models.FindBasket.objects.create(label="Hophop") # basket.items.add(self.finds[0]) # basket.items.add(self.finds[1]) def test_serialization(self): res = self.generic_serialization_test(serializers.find_serialization) find_json = json.loads(res[("finds", "archaeological_finds__Find")]) self.assertEqual(len(find_json), 2) bfind_json = json.loads(res[("finds", "archaeological_finds__BaseFind")]) self.assertEqual(len(bfind_json), 2) result_queryset = Operation.objects.filter(uuid=self.operations[0].uuid) res = self.generic_serialization_test( serializers.find_serialization, no_test=True, kwargs={"operation_queryset": result_queryset}, ) find_json = json.loads(res[("finds", "archaeological_finds__Find")]) self.assertEqual(len(find_json), 1) bfind_json = json.loads(res[("finds", "archaeological_finds__BaseFind")]) self.assertEqual(len(bfind_json), 1) result_queryset = ContextRecord.objects.filter( uuid=self.context_records[0].uuid ) res = self.generic_serialization_test( serializers.find_serialization, no_test=True, kwargs={"cr_queryset": result_queryset}, ) find_json = json.loads(res[("finds", "archaeological_finds__Find")]) self.assertEqual(len(find_json), 1) bfind_json = json.loads(res[("finds", "archaeological_finds__BaseFind")]) self.assertEqual(len(bfind_json), 1) result_queryset = models.Find.objects.filter(uuid=self.finds[0].uuid) res = self.generic_serialization_test( serializers.find_serialization, no_test=True, kwargs={"find_queryset": result_queryset}, ) find_json = json.loads(res[("finds", "archaeological_finds__Find")]) self.assertEqual(len(find_json), 1) bfind_json = json.loads(res[("finds", "archaeological_finds__BaseFind")]) self.assertEqual(len(bfind_json), 1) def test_ope_serialization_with_find_filter(self): res = self.generic_serialization_test( operation_serialization, no_test=True, ) ope_json = json.loads( res[("operations", "archaeological_operations__Operation")] ) self.assertEqual(len(ope_json), 2) result_queryset = models.Find.objects.filter(uuid=self.finds[0].uuid) res = self.generic_serialization_test( operation_serialization, no_test=True, kwargs={"find_queryset": result_queryset}, ) ope_json = json.loads( res[("operations", "archaeological_operations__Operation")] ) self.assertEqual(len(ope_json), 1) def test_cr_serialization_with_find_filter(self): res = self.generic_serialization_test( cr_serialization, no_test=True, ) cr_json = json.loads( res[("context_records", "archaeological_context_records__ContextRecord")] ) self.assertEqual(len(cr_json), 2) result_queryset = models.Find.objects.filter(uuid=self.finds[0].uuid) res = self.generic_serialization_test( cr_serialization, no_test=True, kwargs={"find_queryset": result_queryset} ) cr_json = json.loads( res[("context_records", "archaeological_context_records__ContextRecord")] ) self.assertEqual(len(cr_json), 1) def test_restore(self): current_number, zip_filename = self.generic_restore_test_genzip( serializers.FIND_MODEL_LIST, serializers.find_serialization ) self.generic_restore_test( zip_filename, current_number, serializers.FIND_MODEL_LIST ) class FindWizardCreationTest(WizardTest, FindInit, TestCase): fixtures = WAREHOUSE_FIXTURES url_name = "find_creation" wizard_name = "find_wizard" steps = views.find_creation_steps redirect_url = [ "/find_modification/selec-find_modification?open_item={last_id}", "/find_modification/selecw-find_modification?open_item={last_id}", ] model = models.Find form_datas = [ FormData( "Find creation", form_datas={ "selecrecord-find_creation": {"pk": 1}, "find-find_creation": { "label": "hop", "checked": "NC", "check_date": "2016-01-01", }, "dating-find_creation": [ { "period": None, "start_date": "0", "end_date": "200", }, { "period": None, "start_date": "0", "end_date": "200", }, ], }, ignored=["preservation-find_creation"], ) ] def pre_wizard(self): cr = self.create_context_record( data={"parcel": self.create_parcel()[-1]}, force=True )[-1] self.form_datas[0].form_datas["selecrecord-find_creation"]["pk"] = cr.pk period = Period.objects.all()[0].pk self.form_datas[0].form_datas["dating-find_creation"][0]["period"] = period self.form_datas[0].form_datas["dating-find_creation"][1]["period"] = period self.find_number = models.Find.objects.count() self.basefind_number = models.BaseFind.objects.count() super(FindWizardCreationTest, self).pre_wizard() def post_wizard(self): self.assertEqual(models.BaseFind.objects.count(), self.basefind_number + 1) self.assertEqual(models.Find.objects.count(), self.find_number + 1) # identical datings, only one should be finaly save f = models.Find.objects.order_by("-pk").all()[0] self.assertEqual(f.datings.count(), 1) class FindWizardModificationTest(WizardTest, FindInit, TestCase): fixtures = WAREHOUSE_FIXTURES url_name = "find_modification" wizard_name = url_name + "_wizard" steps = views.find_modification_steps redirect_url = "/find_modification/selec-find_modification?open_item=" "{last_id}" model = models.Find form_datas = [ FormData( "Find modification", form_datas={ "selec-find_modification": {"pk": ""}, "selecrecord-find_modification": { "get_first_base_find__context_record": "" }, "find-find_modification": { "label": "hop", "checked": "NC", "check_date": "2016-01-01", }, "dating-find_modification": [ { "period": None, "start_date": "", "end_date": "", }, ], }, ignored=[ "preservation-find_modification", "selecw-find_modification", "simplefind-find_modification", ], ) ] def pre_wizard(self): profile, created = IshtarSiteProfile.objects.get_or_create( slug="default", active=True ) profile.warehouse = False profile.save() find, __ = self.get_default_find(force=True) self.find = find find.label = "base-hop" find.check_date = "2020-07-01" find.save() data = self.form_datas[0].form_datas data["selec-find_modification"]["pk"] = find.pk data["selecrecord-find_modification"][ "get_first_base_find__context_record" ] = find.base_finds.all()[0].context_record.pk self.period = Period.objects.all()[0] self.period2 = Period.objects.all()[1] find.datings.add( Dating.objects.create(period=self.period, start_date="0", end_date="200") ) find.datings.add(Dating.objects.create(period=self.period2)) data["dating-find_modification"][0]["period"] = self.period.pk self.find_number = models.Find.objects.count() self.basefind_number = models.BaseFind.objects.count() super(FindWizardModificationTest, self).pre_wizard() def post_wizard(self): # no creation self.assertEqual(models.BaseFind.objects.count(), self.basefind_number) self.assertEqual(models.Find.objects.count(), self.find_number) f = models.Find.objects.get(pk=self.find.pk) self.assertEqual(f.datings.count(), 1) dating = f.datings.all()[0] self.assertEqual(dating.period, self.period) self.assertEqual(dating.end_date, None) self.assertEqual(dating.start_date, None) class FindWizardDeletionWithWarehouseModTest(WizardTest, FindInit, TestCase): fixtures = WAREHOUSE_FIXTURES url_name = "find_deletion" wizard_name = "find_deletion_wizard" steps = views.find_deletion_steps redirect_url = "/{}/selecw-{}".format(url_name, url_name) form_datas = [ FormData( "Find deletion", form_datas={ "selecw": {}, }, ignored=["selec-find_deletion"], ) ] def pre_wizard(self): profile, created = IshtarSiteProfile.objects.get_or_create( slug="default", active=True ) profile.warehouse = True profile.save() find, bf = self.get_default_find(force=True) self.form_datas[0].set("selecw", "pks", find.pk) self.find_number = models.Find.objects.count() super(FindWizardDeletionWithWarehouseModTest, self).pre_wizard() def post_wizard(self): self.assertEqual(models.Find.objects.count(), self.find_number - 1) class TreatmentWizardCreationTest(WizardTest, FindInit, TestCase): fixtures = WAREHOUSE_FIXTURES url_name = "treatment_creation" wizard_name = "treatment_wizard" steps = views.treatment_wizard_steps redirect_url = "/treatment_search/general-treatment_search?" "open_item={last_id}" model = models.Treatment form_datas = [ FormData( "Move treatment (planned)", form_datas={ "file": {}, "basetreatment": { "treatment_type": None, "person": 1, # doer "location": 1, # associated warehouse "year": 2016, "container": None, }, "selecfind": {"pk": 1, "resulting_pk": 1}, }, ignored=( "resultfind-treatment_creation", "selecbasket-treatment_creation", "resultfinds-treatment_creation", ), ), FormData( "Move treatment (done)", form_datas={ "file": {}, "basetreatment": { "treatment_type": None, "person": 1, # doer "location": 1, # associated warehouse "year": 2016, "container": None, }, "selecfind": {"pk": 1, "resulting_pk": 1}, }, ignored=( "resultfind-treatment_creation", "selecbasket-treatment_creation", "resultfinds-treatment_creation", ), ), FormData( "Loan treatment (done)", form_datas={ "file": {}, "basetreatment": { "treatment_type": None, "person": 1, # doer "location": 1, # associated warehouse "year": 2016, "container": None, }, "selecfind": {"pk": 1, "resulting_pk": 1}, }, ignored=( "resultfind-treatment_creation", "selecbasket-treatment_creation", "resultfinds-treatment_creation", ), ), ] def pre_wizard(self): q = Warehouse.objects.filter(pk=1) if not q.count(): warehouse = Warehouse.objects.create( name="default", warehouse_type=WarehouseType.objects.all()[0] ) warehouse.id = 1 warehouse.save() else: warehouse = q.all()[0] q = Person.objects.filter(pk=1) if not q.count(): person = Person.objects.create(name="default") person.id = 1 person.save() self.container_ref = Container.objects.create( location=warehouse, responsible=warehouse, container_type=ContainerType.objects.all()[0], ) self.container1 = Container.objects.create( location=warehouse, responsible=warehouse, container_type=ContainerType.objects.all()[0], ) self.container2 = Container.objects.create( location=warehouse, responsible=warehouse, container_type=ContainerType.objects.all()[0], ) self.find, base_find = self.get_default_find(force=True) self.find.container_ref = self.container_ref self.find.container = self.container_ref self.find.save() for idx in (0, 1): self.form_datas[idx].form_datas["selecfind"]["pk"] = self.find.pk self.form_datas[idx].form_datas["selecfind"]["resulting_pk"] = self.find.pk self.form_datas[idx].set("basetreatment", "container", self.container1.pk) self.find_2, base_find = self.get_default_find(force=True) self.find_2.container_ref = self.container_ref self.find_2.container = self.container_ref self.find_2.save() self.form_datas[2].form_datas["selecfind"]["pk"] = self.find_2.pk self.form_datas[2].form_datas["selecfind"]["resulting_pk"] = self.find_2.pk self.form_datas[2].set("basetreatment", "container", self.container2.pk) moving = models.TreatmentType.objects.get(txt_idx="moving") moving.change_reference_location = True moving.change_current_location = True moving.save() self.form_datas[0].set("basetreatment", "treatment_type", moving.pk) self.form_datas[1].set("basetreatment", "treatment_type", moving.pk) loan = models.TreatmentType.objects.get(txt_idx="loan") loan.change_reference_location = False loan.change_current_location = True loan.save() self.form_datas[2].set("basetreatment", "treatment_type", loan.pk) planned, __ = models.TreatmentInputStatus.objects.get_or_create( txt_idx="draft", defaults={"executed": False, "label": "Draft"} ) planned.executed = False planned.save() self.form_datas[0].set("basetreatment", "input_status", planned.pk) completed, created = models.TreatmentInputStatus.objects.get_or_create( txt_idx="validated", defaults={"executed": True, "label": "Validated"} ) completed.executed = True completed.save() self.form_datas[1].set("basetreatment", "input_status", completed.pk) self.form_datas[2].set("basetreatment", "input_status", completed.pk) status = models.TreatmentStatus.objects.all()[0] for form_data in self.form_datas: form_data.set("basetreatment", "treatment_status", status.pk) self.treatment_number = models.Treatment.objects.count() def post_first_wizard(test_object, final_step_response): # treatment planned - no changes find = models.Find.objects.get(pk=test_object.find.pk) test_object.assertEqual(find.container, test_object.container_ref) test_object.assertEqual(find.container_ref, test_object.container_ref) self.form_datas[0].extra_tests = [post_first_wizard] def post_second_wizard(test_object, final_step_response): # moving done find = models.Find.objects.get(pk=test_object.find.pk) test_object.assertEqual(find.container, test_object.container1) test_object.assertEqual(find.container_ref, test_object.container1) self.form_datas[1].extra_tests = [post_second_wizard] def post_third_wizard(test_object, final_step_response): # loan done find = models.Find.objects.get(pk=test_object.find_2.pk) test_object.assertEqual(find.container_ref, test_object.container_ref) test_object.assertEqual(find.container, test_object.container2) self.form_datas[2].extra_tests = [post_third_wizard] super(TreatmentWizardCreationTest, self).pre_wizard() def post_wizard(self): self.assertEqual(models.Treatment.objects.count(), self.treatment_number + 3) self.find = models.Find.objects.get(pk=self.find.pk) self.assertEqual(self.find.treatments.count(), 2) self.find_2 = models.Find.objects.get(pk=self.find_2.pk) self.assertEqual(self.find_2.treatments.count(), 1) # TODO: test treatment with new find creation # treat = models.Treatment.objects.order_by('-pk').all()[0] # self.assertEqual(self.find.downstream_treatment, # treat) class BaseImportFindTest(ImportTest, FindInit, TestCase): fixtures = FIND_TOWNS_FIXTURES + [ settings.LIB_BASE_PATH + "archaeological_finds/tests/import_loca_test.json", ] def setUp(self): super().setUp() self.tmpdir = tempfile.TemporaryDirectory() def tearDown(self): self.tmpdir.cleanup() class ImportFindTest(BaseImportFindTest): def test_geo_import_csv(self): self._test_geo_import("importer-GIS-find", 1) def _test_geo_import(self, data_name, new_nb): root = settings.LIB_BASE_PATH + "archaeological_finds/tests/" filename = root + data_name + ".zip" self.restore_serialized(filename) imp_type = ImporterType.objects.get(slug="topographie-mobilier") with open(root + data_name + ".csv", "rb") as imp_file: file_dict = { "imported_file": SimpleUploadedFile(imp_file.name, imp_file.read()) } post_dict = { "importer_type": imp_type.pk, "name": "find_geo_import", "encoding": "utf-8", "skip_lines": 1, "csv_sep": ",", } form = forms_common.NewImportGISForm( data=post_dict, files=file_dict, user=self.user ) self.assertTrue(form.is_valid()) impt = form.save(self.ishtar_user) impt.initialize() nb = GeoVectorData.objects.count() ope, __ = Operation.objects.get_or_create( code_patriarche="GOA", operation_type=OperationType.objects.all()[0]) cr, __ = ContextRecord.objects.get_or_create( operation=ope, label="CR" ) base_find, __ = models.BaseFind.objects.get_or_create( context_record=cr, label="GOA-528", external_id="GOA-528", auto_external_id=False ) #base_find.external_id = "GOA-528" #base_find.auto_external_id = False #base_find.save() base_find = models.BaseFind.objects.get(pk=base_find.pk) impt.importation() if impt.error_file: self.assertIsNone( impt.error_file, msg="Error on import. Content of error file: " + impt.error_file.read().decode("utf-8")) self.assertEqual(GeoVectorData.objects.count() - nb, new_nb) new = GeoVectorData.objects.order_by("-pk").all()[:new_nb] for geo in new: self.assertTrue(geo.x) self.assertTrue(geo.y) self.assertTrue(geo.z) self.assertEqual(new[0].x, 352107.689) base_find = models.BaseFind.objects.get(pk=base_find.pk) self.assertEqual(base_find.main_geodata_id, new[0].pk) def test_group_import(self): imp_group, imp_file, imp_media = self.get_group_import() file_dict = { "imported_file": imp_file } post_dict = { "importer_type": imp_group.pk, "name": "find_group_import", "encoding": "utf-8", "skip_lines": 1, "csv_sep": ",", } form = forms_common.NewImportGroupForm( data=post_dict, files=file_dict, user=self.user ) self.assertFalse(form.is_valid()) self.assertIn(str(_("This importer need a document archive.")), form.errors["__all__"]) file_dict["imported_images"] = imp_media form = forms_common.NewImportGroupForm( data=post_dict, files=file_dict, user=self.user ) self.assertTrue(form.is_valid()) impt = form.save(self.ishtar_user) self.init_group_import(impt) nb_base_find = models.BaseFind.objects.count() nb_find = models.Find.objects.count() nb_container = Container.objects.count() nb_docs = Document.objects.count() impt.importation() self.assertEqual(models.BaseFind.objects.count(), nb_base_find + 1) self.assertEqual(models.Find.objects.count(), nb_find + 1) self.assertEqual(Container.objects.count(), nb_container + 3) # 1 new container + 2 divisions self.assertEqual(Document.objects.count(), nb_docs + 1) self.assertFalse(any(imp.error_file for imp in impt.imports.all()), msg="Error on group import") def test_mcc_import_finds(self): self.init_context_record() old_nb = models.BaseFind.objects.count() old_nb_find = models.Find.objects.count() MCC = ImporterType.objects.get(name="MCC - Mobilier") col = ImporterColumn.objects.create(col_number=25, importer_type_id=MCC.pk) formater = FormaterType.objects.filter(formater_type="FileFormater").all()[0] ImportTarget.objects.create( target="documents__image", formater_type_id=formater.pk, column_id=col.pk ) file_dict = {} with open( settings.LIB_BASE_PATH + "archaeological_finds/tests/MCC-finds-example.csv", "rb") as mcc_file: file_dict["imported_file"] = SimpleUploadedFile( mcc_file.name, mcc_file.read() ) with open( settings.LIB_BASE_PATH + "archaeological_finds/tests/images.zip", "rb") as mcc_file: file_dict["imported_images"] = SimpleUploadedFile( mcc_file.name, mcc_file.read() ) post_dict = { "importer_type": MCC.pk, "skip_lines": 1, "encoding": "utf-8", "name": "init_find_import", "csv_sep": ",", } form = forms_common.BaseImportForm( data=post_dict, files=file_dict, user=self.user ) form.is_valid() self.assertTrue(form.is_valid()) impt = form.save(self.ishtar_user) impt.initialize() # doing manual connections ceram = models.MaterialType.objects.get(txt_idx="ceramic").pk glass = models.MaterialType.objects.get(txt_idx="glass").pk self.set_target_key("material_types", "terre-cuite", ceram) self.set_target_key("material_types", "verre", glass) impt.importation() # new finds has now been imported current_nb = models.BaseFind.objects.count() self.assertEqual(current_nb, (old_nb + 4)) current_nb = models.Find.objects.count() self.assertEqual(current_nb, (old_nb_find + 4)) self.assertEqual( models.Find.objects.filter(material_types__pk=ceram).count(), 4 ) self.assertEqual( models.Find.objects.filter(material_types__pk=glass).count(), 1 ) images = [] for find in models.Find.objects.all(): images += [1 for im in find.images.all() if im.image.name] self.assertEqual(len(images), 1) # check index bfs = list(models.BaseFind.objects.order_by("-pk").all()) for idx in range(4): bf = bfs[idx] expected_index = 4 - idx self.assertEqual( bf.index, expected_index, "Bad index for imported base find: {} where {} is " "expected".format(bf.index, expected_index), ) f = bf.find.all()[0] self.assertEqual( f.index, expected_index, "Bad index for imported find: {} where {} is expected".format( f.index, expected_index ), ) def test_import_locations(self): self.create_finds() self.create_finds() self.create_finds() self.create_finds() external_ids = [] for idx, f in enumerate(self.finds): f.label = "Find {}".format(idx) f.save() external_ids.append(f.external_id) wt, __ = WarehouseType.objects.get_or_create(label="WT", txt_idx="WT") warehouse, __ = Warehouse.objects.get_or_create( external_id="warehouse-test", defaults={"name": "Warehouse test", "warehouse_type": wt}, ) container_types = [] levels = ["Building", "Area", "Shelf", "Box"] for level in levels: container_type, __ = ContainerType.objects.get_or_create( label=level, txt_idx=slugify(level) ) container_types.append(container_type) for idx in range(len(levels[:-1])): WarehouseDivisionLink.objects.get_or_create( warehouse=warehouse, container_type=container_types[idx], order=(idx + 1) * 10, ) old_nb = models.BaseFind.objects.count() old_nb_find = models.Find.objects.count() old_nb_container = Container.objects.count() importer_type = ImporterType.objects.get(slug="importeur-test") imp_filename = os.path.join(self.tmpdir.name, "imp_locations.csv") with open(imp_filename, "w") as impfile: w = csv.writer(impfile) w.writerow( [ "External ID", "Warehouse", "Ref.", "Container type", "Localisation 1", "Localisation 2", "Localisation 3", ] ) for idx, ext_id in enumerate(external_ids): if idx < 2: w.writerow( [ ext_id, "warehouse-test", "Réf. {}".format((idx + 1) * 10), container_types[-1].name, "A", "42", idx + 1, ] ) else: w.writerow( [ ext_id, "warehouse-test", "Réf. {}".format((idx + 1) * 10), container_types[-1].name, "A", 43, ] ) with open(imp_filename, "rb") as imp_file: file_dict = { "imported_file": SimpleUploadedFile(imp_file.name, imp_file.read()) } post_dict = { "importer_type": importer_type.pk, "skip_lines": 1, "encoding": "utf-8", "name": "init_find_import", "csv_sep": ",", } form = forms_common.BaseImportForm( data=post_dict, files=file_dict, user=self.user ) form.is_valid() self.assertTrue(form.is_valid()) impt = form.save(self.ishtar_user) impt.initialize() impt.importation() # no new finds has now been imported current_nb = models.BaseFind.objects.count() self.assertEqual(current_nb, old_nb) current_nb = models.Find.objects.count() self.assertEqual(current_nb, old_nb_find) current_nb = Container.objects.count() self.assertEqual(current_nb, old_nb_container + 4 + 2 + 2 + 1) containers = list(Container.objects.all()) for container in containers: self.assertEqual(container.location, warehouse) q = Container.objects.filter(container_type=container_types[0]) self.assertEqual(q.count(), 1) building = q.all()[0] self.assertIsNone(building.parent) q = Container.objects.filter(container_type=container_types[1]) self.assertEqual(q.count(), 2) areas = list(q.all()) area = q.all()[0] self.assertEqual(area.parent, building) q = Container.objects.filter(container_type=container_types[2]) self.assertEqual(q.count(), 2) shelves = list(q.all()) for shelf in shelves: self.assertEqual(shelf.parent, area) q = Container.objects.filter(container_type=container_types[3]) self.assertEqual(q.count(), 4) boxes = list(q.all().order_by("id")) previous_shelf = None for box in boxes[:2]: if not previous_shelf: previous_shelf = box.parent else: # on a different shelf self.assertNotEqual(previous_shelf, box.parent) self.assertIn(box.parent_id, [s.pk for s in shelves]) previous_area = None for box in boxes[2:]: if not previous_area: previous_area = box.parent else: self.assertEqual(previous_area, box.parent) # on the same area self.assertIn(box.parent_id, [s.pk for s in areas]) importer_type = ImporterType.objects.get(slug="importeur-test") cols = list( ImporterColumn.objects.filter( importer_type=importer_type, col_number__gte=5 ).values_list("id", flat=True) ) for target in ImportTarget.objects.filter(column_id__in=cols): target.target = target.target.replace( "set_localisation", "set_static_localisation" ) target.save() # delete area 43 and all boxes Container.objects.filter(reference="43").delete() Container.objects.filter(container_type=container_types[-1]).delete() # reimport impt.initialize() impt.importation() # check errors self.assertEqual(len(impt.errors), 2) error_msg = str( _("The division {} {} do not exist for the location {}.") ).format("Area", "43", "Warehouse test") for error in impt.errors: self.assertEqual(error["error"], error_msg) # test ignore errors impt.importer_type.ignore_errors = error_msg[:-5] impt.importer_type.save() impt.importation() # check errors self.assertEqual(len(impt.errors), 0) def test_verify_qfield_zip(self): """ :function: Test if all the files of the QField zipped folder are correct """ # Definition of the path to test importer data for GIS data root = settings.LIB_BASE_PATH + "archaeological_finds/tests/" # Opening of the .zip with ZipFile(os.path.join(root, "qfield-prospection.zip"), 'r') as zip_file: # Verification of the number of files in the .zip self.assertEqual(len(zip_file.namelist()),2) # Verification of the names of the files in the .zip list_files = ["Qfield_prospection.qgs","Qfield_prospection_attachments.zip"] self.assertEqual(zip_file.namelist(), list_files) # Closing of the .zip zip_file.close() def test_add_file_qfield_zip(self): """ :function: Try the addition of a file in the zip for QField that will be dowloaded """ # Definition of the path to test importer data for GIS data root = settings.LIB_BASE_PATH + "archaeological_finds/tests/" filename = os.path.join(root, "qfield-prospection.zip") # Opening of the .zip with ZipFile(os.path.join(root, "qfield-prospection.zip"), 'a') as zip_file: # Verification of the number of files in the .zip before adding a new one self.assertEqual(len(zip_file.namelist()), 2) # Recovery of the .csv to add for the test data = os.path.join(root, "Finds.csv") # Adding the .csv to the .zip zip_file.write(data, os.path.basename(data)) # Verification of the number of files in the .zip after adding the .csv self.assertEqual(len(zip_file.namelist()), 3) # Verification of the names of the files in the .zip list_files = ["Qfield_prospection.qgs", "Qfield_prospection_attachments.zip","Finds.csv"] self.assertEqual(zip_file.namelist(), list_files) # Cloning and deletion of the .zip to have 2 files once again zip_temp = filename+".temp" with ZipFile(zip_temp, 'w') as zip_new: for item in zip_file.infolist(): if item.filename != "Finds.csv" : zip_new.writestr(item, zip_file.read(item.filename)) # Closing of the old .zip zip_file.close() # Squashing the old .zip with the new one os.replace(zip_temp,filename) # Opening of the new .zip with ZipFile(os.path.join(root, "qfield-prospection.zip"), 'r') as zip_file: # Verification of the number of files in the .zip after deleting the .csv self.assertEqual(len(zip_file.namelist()), 2) # Closing of the new .zip zip_file.close() def test_qfield_import_finds(self): """ :function: Try the importation of finds linked to QField """ # Definition of the path to test importer data for GIS data root = settings.LIB_BASE_PATH + "archaeological_finds/tests/" filename = os.path.join(root, "qfield-mobilier-test.zip") self.restore_serialized(filename) # Uses of a class in ishtar_commons.model_import to retrieve a model via its slug (?) imp_type = ImporterType.objects.get(slug="qfield-mobilier-test") # Change the name with the slug of the importeur !!! # Opening of the CSV with open(os.path.join(root, "qfield-importeur-data.csv"), "rb") as imp_file : file_dict = { "imported_file": SimpleUploadedFile(imp_file.name, imp_file.read()) } post_dict = { "importer_type": imp_type.pk, "name": "find_geo_import", "encoding": "utf-8", "skip_lines": 1, "csv_sep": ",", } # Preparation of the data import form = forms_common.NewImportGISForm( data=post_dict, files=file_dict, user=self.user ) self.assertTrue(form.is_valid()) impt = form.save(self.ishtar_user) # Import initialization impt.initialize() # Creation of an operation and a context record for the importation ope, __ = Operation.objects.get_or_create( code_patriarche="GOA", operation_type=OperationType.objects.all()[0]) cr, __ = ContextRecord.objects.get_or_create( operation=ope, label="CR" ) # Getting referential values (nb objects, containers,docs, etc.) nb_base_find = models.BaseFind.objects.count() nb_find = models.Find.objects.count() nb_docs = Document.objects.count() # Beggining of importation impt.importation() # Getting values after modifications self.assertEqual(models.BaseFind.objects.count(), nb_base_find + 1) self.assertEqual(models.Find.objects.count(), nb_find + 1) self.assertEqual(Document.objects.count(), nb_docs + 1) # Verification of the imported values new = models.BaseFind.objects.order_by("-pk").all()[:1] for data in new: self.assertEqual(data.label, "123") self.assertEqual(str(data.discovery_date), "2025-04-07") self.assertEqual(data.point_2d, "SRID=4326;POINT (-2.26868001391598 47.3849390721505)") new = models.Find.objects.order_by("-pk").all()[:1] for data in new: self.assertEqual(data.label, "123") self.assertEqual(str(data.material_types), "archaeological_finds.MaterialType.None") self.assertEqual(data.description, "Test") new = ContextRecord.objects.order_by("-pk").all()[:1] for cr in new: self.assertEqual(cr.label, "CR") new = GeoVectorData.objects.order_by("-pk").all()[:1] for geo in new: self.assertTrue(geo.x) self.assertEqual(geo.x, 14) self.assertTrue(geo.y) self.assertEqual(geo.y, 3) self.assertTrue(geo.z) self.assertEqual(geo.z, 2000) def test_qfield_import_group(self): """ :function: Try the importation of datas from a QField prodject (context record, finds and documents) """ # Definition of the path to test importer data for GIS data root = os.path.join(settings.LIB_BASE_PATH, "archaeological_finds", "tests") self.root = root importer_filename = os.path.join(root, "qfield-csv-test.zip") restore_serialized(importer_filename) # Uses of a class in ishtar_commons.model_import to retrieve a model via its slug (?) imp_group = ImporterGroup.objects.get(slug="qfield-csv-test") # Opening of the CSV and the .zip of the media with open(os.path.join(root, "qfield-importeur-data.csv"), "rb") as imp: imp_file = SimpleUploadedFile(imp.name, imp.read()) with open(os.path.join(root, "qfield-importeur-images.zip"), "rb") as imp: imp_media = SimpleUploadedFile(imp.name, imp.read()) file_dict = { "imported_file": imp_file } post_dict = { "importer_type": imp_group.pk, "name": "find_group_import", "encoding": "utf-8", "skip_lines": 1, "csv_sep": ",", } file_dict["imported_images"] = imp_media # Initialization of error values form = forms_common.NewImportGroupForm( data=post_dict, files=file_dict, user=self.user ) self.assertTrue(form.is_valid()) impt = form.save(self.ishtar_user) # Import initialization impt.initialize() # Creation of an operation and a context record for the importation ope, __ = Operation.objects.get_or_create( code_patriarche="OP", operation_type=OperationType.objects.all()[0]) # Getting referential values (nb objects, containers,docs, etc.) nb_base_find = models.BaseFind.objects.count() nb_find = models.Find.objects.count() nb_docs = Document.objects.count() # Beggining of importation impt.importation() # Getting values after modifications self.assertEqual(models.BaseFind.objects.count(), nb_base_find + 1) self.assertEqual(models.Find.objects.count(), nb_find + 1) self.assertEqual(Document.objects.count(), nb_docs + 1) self.assertFalse(any(imp.error_file for imp in impt.imports.all()), msg="Error on group import") def test_csv_to_gpkg(self): """ :function: Creation of a .gpkg file from the data of an imported .csv """ # Step 1 : Importation of data # Definition of the path to test importer data for GIS data root = os.path.join(settings.LIB_BASE_PATH, "archaeological_finds", "tests") self.root = root filename = os.path.join(root, "qfield-mobilier-test.zip") self.restore_serialized(filename) # Uses of a class in ishtar_commons.model_import to retrieve a model via its slug (?) imp_type = ImporterType.objects.get( slug="qfield-mobilier-test") # Change the name with the slug of the importeur !!! # Opening of the CSV with open(os.path.join(root, "qfield-importeur-data.csv"), "rb") as imp_file: file_dict = { "imported_file": SimpleUploadedFile(imp_file.name, imp_file.read()) } post_dict = { "importer_type": imp_type.pk, "name": "find_geo_import", "encoding": "utf-8", "skip_lines": 1, "csv_sep": ",", } # Preparation of the data import form = forms_common.NewImportGISForm( data=post_dict, files=file_dict, user=self.user ) self.assertTrue(form.is_valid()) impt = form.save(self.ishtar_user) # Import initialization impt.initialize() # Creation of an operation and a context record for the importation ope, __ = Operation.objects.get_or_create( code_patriarche="GOA", operation_type=OperationType.objects.all()[0]) cr, __ = ContextRecord.objects.get_or_create( operation=ope, label="CR" ) # Getting referential values (nb objects, containers,docs, etc.) nb_base_find = models.BaseFind.objects.count() nb_find = models.Find.objects.count() nb_docs = Document.objects.count() # Beggining of importation impt.importation() # Step 2 : Convertion to .gpkg gpkg = os.path.join(root, "Finds.gpkg") layer_name = "Finds" # Deletion of the .gpkg if already existing if os.path.exists(gpkg): os.remove(gpkg) # Getting necessary information from OsGeo to create the .gpkg driver = ogr.GetDriverByName("GPKG") datasource = driver.CreateDataSource(gpkg) srs = osr.SpatialReference() srs.ImportFromEPSG(4326) # Layer creation layer = datasource.CreateLayer("Finds", srs, ogr.wkbPoint) # Attributes creation layer.CreateField(ogr.FieldDefn("identifiant", ogr.OFTString)) layer.CreateField(ogr.FieldDefn("operation", ogr.OFTString)) layer.CreateField(ogr.FieldDefn("date", ogr.OFTDate)) layer.CreateField(ogr.FieldDefn("x", ogr.OFTReal)) layer.CreateField(ogr.FieldDefn("y", ogr.OFTReal)) layer.CreateField(ogr.FieldDefn("z", ogr.OFTReal)) layer.CreateField(ogr.FieldDefn("materiau(x)", ogr.OFTString)) layer.CreateField(ogr.FieldDefn("description", ogr.OFTString)) layer.CreateField(ogr.FieldDefn("wkt", ogr.OFTString)) # Importation of the data feature = ogr.Feature(layer.GetLayerDefn()) new = models.BaseFind.objects.order_by("-pk").all()[:1] for find in new : feature.SetField("identifiant", find.label) feature.SetField("date", int(find.discovery_date.year), int(find.discovery_date.month), int(find.discovery_date.day), 0, 0, 0, 0) feature.SetField("wkt", str(find.point_2d)) new = models.Find.objects.order_by("-pk").all()[:1] for find in new: feature.SetField("materiau(x)", str(find.material_types)) feature.SetField("description", str(find.description)) new = ContextRecord.objects.order_by("-pk").all()[:1] for cr in new: feature.SetField("operation", cr.label) new = GeoVectorData.objects.order_by("-pk").all()[:1] for geo in new: feature.SetField("x", geo.x) feature.SetField("y", geo.y) feature.SetField("z", geo.z) # Geometry creation point = ogr.Geometry(ogr.wkbPoint) point.AddPoint(geo.x, geo.y) feature.SetGeometry(point) layer.CreateFeature(feature) feature = None datasource = None class ExportTest(FindInit, TestCase): fixtures = FIND_TOWNS_FIXTURES def setUp(self): super().setUp() self.username, self.password, self.user = create_superuser() def test_ishtar_export_find(self): self.create_finds(data={"label": "1234"}) self.create_finds(data={"label": "567"}) MCC = ImporterType.objects.get(name="MCC - Mobilier") c = Client() url = reverse( "get-by-importer", kwargs={"slug": MCC.slug, "type": "csv"} ) response = c.get(url) # no result when no authentication self.assertTrue(not response.content) c.login(username=self.username, password=self.password) response = c.get(url) ENCODING = settings.ENCODING or "utf-8" rows = list(csv.reader(StringIO(response.content.decode(ENCODING)))) # one header + two find self.assertEqual(len(rows), 3) row_cr = rows[1] for row in rows[1:]: self.assertEqual(row[0], "1") self.assertEqual(row[1], "12345") self.assertEqual(row[2], "A1") self.assertEqual(row[3], "Context record") self.assertEqual(row[4], "Base find") self.assertEqual(rows[1][11], "1234") self.assertEqual(rows[2][11], "567") class ImportFindLiveServerTest(LiveServerTestCase, BaseImportFindTest): def setUp(self): import ishtar_common path = ["/"] + ishtar_common.__file__.split(os.sep)[:-1] + ["tests", "test.png"] shutil.copy( os.path.join(*path), os.path.join(settings.ROOT_PATH, "media", "image-1.jpg"), ) super().setUp() def test_mcc_import_finds_with_image_link(self): self.init_context_record() old_nb = models.BaseFind.objects.count() old_nb_find = models.Find.objects.count() MCC = ImporterType.objects.get(name="MCC - Mobilier") col = ImporterColumn.objects.create(col_number=25, importer_type_id=MCC.pk) formater = FormaterType.objects.filter(formater_type="FileFormater").all()[0] ImportTarget.objects.create( target="documents__image", formater_type_id=formater.pk, column_id=col.pk ) with open( settings.LIB_BASE_PATH + "archaeological_finds/tests/MCC-finds-example.csv", "rb") as mcc_file: file_dict = { "imported_file": SimpleUploadedFile(mcc_file.name, mcc_file.read()), } post_dict = { "importer_type": MCC.pk, "skip_lines": 1, "encoding": "utf-8", "name": "init_find_import", "csv_sep": ",", "imported_media_link": f"{self.live_server_url}/media/" } form = forms_common.BaseImportForm( data=post_dict, files=file_dict, user=self.user ) form.is_valid() self.assertTrue(form.is_valid()) impt = form.save(self.ishtar_user) impt.initialize() impt.importation() # new finds has now been imported current_nb = models.BaseFind.objects.count() self.assertEqual(current_nb, (old_nb + 4)) current_nb = models.Find.objects.count() self.assertEqual(current_nb, (old_nb_find + 4)) images = [] for find in models.Find.objects.all(): images += [1 for im in find.images.all() if im.image.name] self.assertEqual(len(images), 1) class FindTest(FindInit, TestCase): fixtures = FIND_FIXTURES model = models.Find def setUp(self): self.create_finds(force=True) self.password = "mypassword" self.username = "myuser" self.user = User.objects.create_superuser( self.username, "myemail@test.com", self.password ) self.client = Client() self.client.login(username=self.username, password=self.password) def test_cascade_delete_operation(self): # test delete an operation find = self.finds[0] find_id = find.id nb_f = models.Find.objects.count() bf = find.base_finds.all()[0] bf_id = bf.id nb_bf = models.BaseFind.objects.count() cr = bf.context_record cr_id = cr.id nb_cr = ContextRecord.objects.count() operation = cr.operation operation_id = operation.id nb_ope = Operation.objects.count() # attach an admin act __, admin_acts = create_administrativact(self.user, operation) admin_act_id = admin_acts[0].id nb_admin_act = AdministrativeAct.objects.count() operation.delete() self.assertEqual(Operation.objects.filter(pk=operation_id).count(), 0) self.assertEqual(Operation.objects.count(), nb_ope - 1) self.assertEqual(AdministrativeAct.objects.filter(pk=admin_act_id).count(), 0) self.assertEqual(AdministrativeAct.objects.count(), nb_admin_act - 1) self.assertEqual(ContextRecord.objects.filter(pk=cr_id).count(), 0) self.assertEqual(ContextRecord.objects.count(), nb_cr - 1) self.assertEqual(models.BaseFind.objects.filter(pk=bf_id).count(), 0) self.assertEqual(models.BaseFind.objects.count(), nb_bf - 1) self.assertEqual(models.Find.objects.filter(pk=find_id).count(), 0) self.assertEqual(models.Find.objects.count(), nb_f - 1) def test_cascade_delete_find(self): # test delete associated base find find = self.finds[0] find_id = find.id nb_f = models.Find.objects.count() bf = find.base_finds.all()[0] bf_id = bf.id nb_bf = models.BaseFind.objects.count() find.delete() self.assertEqual(models.BaseFind.objects.filter(pk=bf_id).count(), 0) self.assertEqual(models.BaseFind.objects.count(), nb_bf - 1) self.assertEqual(models.Find.objects.filter(pk=find_id).count(), 0) self.assertEqual(models.Find.objects.count(), nb_f - 1) def test_external_id(self): find = self.finds[0] base_find = find.base_finds.all()[0] self.assertEqual( find.external_id, "{}-{}".format( find.get_first_base_find().context_record.external_id, find.label ), ) self.assertEqual( base_find.external_id, "{}-{}".format(base_find.context_record.external_id, base_find.label), ) base_find.label = "New label" base_find.save() base_find = models.BaseFind.objects.get(pk=base_find.pk) self.assertEqual( base_find.external_id, "{}-{}".format(base_find.context_record.external_id, "New label"), ) cr = ContextRecord.objects.get(pk=base_find.context_record.pk) cr.label = "new-label-too" cr.skip_history_when_saving = True cr._no_down_model_update = False cr.save() base_find = models.BaseFind.objects.get(pk=base_find.pk) find = models.Find.objects.get(pk=find.pk) cr = ContextRecord.objects.get(pk=cr.pk) self.assertIn("new-label-too", find.external_id) self.assertIn("new-label-too", base_find.external_id) cr.operation.code_patriarche = "PAT" cr.operation.skip_history_when_saving = True cr.operation._no_down_model_update = False cr.operation.save() base_find = models.BaseFind.objects.get(pk=base_find.pk) find = models.Find.objects.get(pk=find.pk) ContextRecord.objects.get(pk=cr.pk) self.assertIn("PAT", find.external_id) self.assertIn("PAT", base_find.external_id) find.label = "hop" find.save() find = models.Find.objects.get(pk=find.pk) # default: {get_first_base_find__context_record__external_id}-{label} self.assertEqual(find.external_id, "PAT-12345-A1-new-label-too-hop") profile = get_current_profile(force=True) profile.find_external_id = ( "{get_first_base_find__context_record__external_id}-{label}-" "{label}" ) profile.save() find.save() find = models.Find.objects.get(pk=find.pk) self.assertEqual(find.external_id, "PAT-12345-A1-new-label-too-hop-hop") profile.find_external_id = ( "{get_first_base_find__context_record__external_id}-{label}-" "{label}||lower||deduplicate" ) profile.save() find.save() find = models.Find.objects.get(pk=find.pk) self.assertEqual(find.external_id, "pat-12345-a1-new-label-too-hop") # jinja 2 profile.find_external_id = ( "{{get_first_base_find__context_record__external_id}}-{{label}}-" "{{label}}" ) profile.save() find.save() find = models.Find.objects.get(pk=find.pk) self.assertEqual(find.external_id, "PAT-12345-A1-new-label-too-hop-hop") # jinja 2 + custom filters profile.find_external_id = ( "{{get_first_base_find__context_record__external_id}}-{{label}}-" "{{label}}||lower||deduplicate" ) profile.save() find.save() find = models.Find.objects.get(pk=find.pk) self.assertEqual(find.external_id, "pat-12345-a1-new-label-too-hop") def testIndex(self): profile = get_current_profile() profile.find_index = "O" profile.save() get_current_profile(force=True) op1 = self.create_operation()[-1] op2 = self.create_operation()[-1] op1_cr1 = self.create_context_record(data={"label": "CR1", "operation": op1})[ -1 ] op1_cr2 = self.create_context_record(data={"label": "CR2", "operation": op1})[ -1 ] op2_cr1 = self.create_context_record(data={"label": "CR3", "operation": op2})[ -1 ] self.create_finds(data_base={"context_record": op1_cr1}) find_1 = self.finds[-1] bf_1 = models.BaseFind.objects.get(pk=self.base_finds[-1].pk) self.assertEqual(find_1.index, 1) self.assertEqual(bf_1.index, 1) # index is based on operations self.create_finds(data_base={"context_record": op1_cr2}) find_2 = self.finds[-1] bf_2 = models.BaseFind.objects.get(pk=self.base_finds[-1].pk) self.assertEqual(find_2.index, 2) self.assertEqual(bf_2.index, 2) self.create_finds(data_base={"context_record": op2_cr1}) find_3 = self.finds[-1] bf_3 = models.BaseFind.objects.get(pk=self.base_finds[-1].pk) self.assertEqual(find_3.index, 1) self.assertEqual(bf_3.index, 1) profile = get_current_profile(force=True) profile.find_index = "CR" profile.save() profile = get_current_profile(force=True) op3 = self.create_operation()[-1] op3_cr1 = self.create_context_record(data={"label": "CR1", "operation": op3})[ -1 ] op3_cr2 = self.create_context_record(data={"label": "CR2", "operation": op3})[ -1 ] self.create_finds(data_base={"context_record": op3_cr1}) find_1b = self.finds[-1] bf_1b = models.BaseFind.objects.get(pk=self.base_finds[-1].pk) self.assertEqual(find_1b.index, 1) self.assertEqual(bf_1b.index, 1) # index now based on context records self.create_finds(data_base={"context_record": op3_cr2}) find_2b = self.finds[-1] bf_2b = models.BaseFind.objects.get(pk=self.base_finds[-1].pk) self.assertEqual(find_2b.index, 1) self.assertEqual(bf_2b.index, 1) self.create_finds(data_base={"context_record": op3_cr2}) find_3b = self.finds[-1] bf_3b = models.BaseFind.objects.get(pk=self.base_finds[-1].pk) self.assertEqual(find_3b.index, 2) self.assertEqual(bf_3b.index, 2) def test_show(self): obj = self.finds[0] response = self.client.get(reverse("display-find", args=[obj.pk])) self.assertEqual(response.status_code, 200) self.assertIn( 'load_window("/show-find/{}/");'.format(obj.pk), response.content.decode() ) c = Client() response = c.get(reverse("show-find", kwargs={"pk": obj.pk})) # empty content when not allowed self.assertRedirects(response, "/") c.login(username=self.username, password=self.password) response = self.client.get(reverse("show-find", kwargs={"pk": obj.pk})) self.assertEqual(response.status_code, 200) self.assertIn(b'class="card sheet"', response.content) def test_delete(self): self.create_finds(force=True) first_bf = self.base_finds[0] self.finds[1].base_finds.add(first_bf) self.finds[0].delete() # on delete the selected base find is not deleted if another find # is related to it self.assertEqual( models.BaseFind.objects.filter(pk=self.base_finds[0].pk).count(), 1 ) self.finds[1].delete() self.assertEqual( models.BaseFind.objects.filter(pk=self.base_finds[0].pk).count(), 0 ) def test_get_material_types(self): mat0 = models.MaterialType.objects.all()[0] mat1 = models.MaterialType.objects.all()[1] self.create_finds() find0 = self.finds[0] self.assertEqual(find0.get_material_types(), "") self.assertEqual(find0.get_material_types_code(), "") find0.material_types.add(mat0) find0.material_types.add(mat1) self.assertEqual( find0.get_material_types(), ", ".join(sorted([mat0.label, mat1.label])) ) self.assertEqual( find0.get_material_types_code(), "|".join(sorted([mat0.code, mat1.code])) ) class FindBasketTest(FindInit, TestCase): fixtures = WAREHOUSE_FIXTURES model = models.FindBasket SEARCH_URL = "get-findbasket" def setUp(self): self.create_finds(force=True) self.create_finds(force=True) self.username = "myuser" self.password = "mypassword" u = User.objects.create_superuser( self.username, "myemail@test.com", self.password ) self.client = Client() b = models.FindBasket.objects.create(label="test basket", user_id=u.pk) b.items.add(self.finds[0]) b.items.add(self.finds[1]) models.FindBasket.objects.create(label="other", user_id=u.pk) def test_search(self): c = Client() search = {} # no result when no authentication response = c.get(reverse("get-findbasket"), search) content = response.content.decode() res = json.loads(content) self.assertEqual(res, []) c.login(username=self.username, password=self.password) response = c.get(reverse("get-findbasket"), search) self.assertEqual(response.status_code, 200) content = response.content.decode() res = json.loads(content) self.assertEqual(res["recordsTotal"], 2) search = {"search_vector": "other"} response = c.get(reverse("get-findbasket"), search) content = response.content.decode() res = json.loads(content) self.assertEqual(res["recordsTotal"], 1) class FindSearchTest(FindInit, TestCase, SearchText): fixtures = WAREHOUSE_FIXTURES model = models.Find SEARCH_URL = "get-find" def setUp(self): self.create_finds(force=True) self.create_finds(force=True) self.username = "myuser" self.password = "mypassword" self.user = User.objects.create_superuser( self.username, "myemail@test.com", self.password ) self.client = Client() def test_item_count(self): find_1 = self.finds[0] ope2 = self.create_operation(self.get_default_user())[-1] context_record2 = self.create_context_record({"operation": ope2})[-1] base_find_2 = self.create_finds(data_base={"context_record": context_record2})[1][-1] c = Client() # no result when no authentication response = c.get(reverse("get-find"), {}) self.assertRedirects(response, "/") c.login(username=self.username, password=self.password) # classic search response = c.get(reverse("get-find"), {}) self.assertEqual(response.status_code, 200) content = response.content.decode() res = json.loads(content) self.assertEqual(res["recordsTotal"], len(self.finds)) # add the base find of find_2 to find_1 - grouping or other like treatment find_1.base_finds.add(base_find_2) # on find search there is column with context record that duplicate the line response = c.get(reverse("get-find"), {}) self.assertEqual(response.status_code, 200) content = response.content.decode() res = json.loads(content) self.assertEqual(res["recordsTotal"], len(self.finds) + 1) def test_material_type_hierarchic_search(self): find = self.finds[0] c = Client() metal = models.MaterialType.objects.get(txt_idx="metal") iron_metal = models.MaterialType.objects.get(txt_idx="iron_metal") not_iron_metal = models.MaterialType.objects.get(txt_idx="not_iron_metal") mineral = models.MaterialType.objects.get(txt_idx="mineral").pk find.material_types.add(iron_metal) find = models.Find.objects.get(pk=find.pk) find.save() search = {"material_types": iron_metal.pk} # no result when no authentication response = c.get(reverse("get-find"), search) self.assertRedirects(response, "/") c.login(username=self.username, password=self.password) # one result for exact search response = c.get(reverse("get-find"), search) self.assertEqual(response.status_code, 200) content = response.content.decode() res = json.loads(content) self.assertEqual(res["recordsTotal"], 1) self.assertEqual(res["rows"][0]["cached_materials"], str(iron_metal)) # no result for the brother search = {"material_types": not_iron_metal.pk} response = c.get(reverse("get-find"), search) self.assertEqual(response.status_code, 200) content = response.content.decode() self.assertEqual(json.loads(content)["recordsTotal"], 0) # one result for the father search = {"material_types": metal.pk} response = c.get(reverse("get-find"), search) self.assertEqual(response.status_code, 200) content = response.content.decode() self.assertEqual(json.loads(content)["recordsTotal"], 1) find.material_types.add(mineral) find = models.Find.objects.get(pk=find.pk) find.save() # test on text search material_key = str(pgettext_lazy("key for text search", "material")) result = [ ('{}="{}"'.format(material_key, str(iron_metal)), 1), ('{}="{}"'.format(material_key, str(not_iron_metal)), 0), ('{}="{}"'.format(material_key, str(metal)), 1), ('minéral', 1), ] self._test_search(c, result, context="Text material search") def test_pinned_search(self): c = Client() c.login(username=self.username, password=self.password) # operation with no associated find operation = create_operation(self.user, values={"year": 2017}) c.get(reverse("pin", args=["operation", operation.pk])) response = c.get(reverse("get-find"), {}) # empty search -> check pined self.assertEqual(response.status_code, 200) content = response.content.decode() self.assertEqual(json.loads(content)["recordsTotal"], 0) # pinned operation with current find find = self.finds[0] c.get( reverse( "pin", args=[ "operation", find.get_first_base_find().context_record.operation.pk, ], ) ) response = c.get(reverse("get-find"), {}) # empty search -> check pined self.assertEqual(response.status_code, 200) content = response.content.decode() self.assertEqual(json.loads(content)["recordsTotal"], 1) def test_period_hierarchic_search(self): find = self.finds[0] c = Client() neo = Period.objects.get(txt_idx="neolithic") final_neo = Period.objects.get(txt_idx="final-neolithic") recent_neo = Period.objects.get(txt_idx="recent-neolithic") dating = Dating.objects.create(period=final_neo) find.datings.add(dating) find = models.Find.objects.get(pk=find.pk) find.save() search = {"datings__period": final_neo.pk} # no result when no authentication response = c.get(reverse("get-find"), search) self.assertRedirects(response, "/") # one result for exact search c.login(username=self.username, password=self.password) response = c.get(reverse("get-find"), search) self.assertEqual(response.status_code, 200) res = json.loads(response.content.decode()) self.assertEqual(res["recordsTotal"], 1) self.assertEqual(res["rows"][0]["cached_periods"], str(final_neo)) # no result for the brother search = {"datings__period": recent_neo.pk} response = c.get(reverse("get-find"), search) self.assertEqual(response.status_code, 200) self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 0) # one result for the father search = {"datings__period": neo.pk} response = c.get(reverse("get-find"), search) self.assertEqual(response.status_code, 200) self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1) # test on text search period_key = str(pgettext_lazy("key for text search", "datings-period")) result = [ ('{}="{}"'.format(period_key, str(final_neo)), 1), ('{}="{}"'.format(period_key, str(recent_neo)), 0), ('{}="{}"'.format(period_key, str(neo)), 1), ] self._test_search(c, result, context="Text period search") def test_operation_search(self): values = [ str(f.operation.code_patriarche) for f in self.finds ] c = Client() c.login(username=self.username, password=self.password) key = str(pgettext_lazy("key for text search", "code-patriarche")) result = [ (f'{key}="{values[0]}"', 1), (f'-{key}="{values[0]}"', 1), (f'{key}="{values[0]}" {key}="{values[1]}"', 2), (f'-{key}="{values[0]}" -{key}="{values[1]}"', 0), ] self._test_search(c, result, context="Operation") def test_operator_search(self): operation = self.operations[0] operator = Organization.objects.create( name="My Orga", organization_type=OrganizationType.objects.all()[0] ) operation.operator = operator operation.save() c = Client() c.login(username=self.username, password=self.password) key = str(pgettext_lazy("key for text search", "operator")) result = [('{}="{}"'.format(key, "My Orga"), 1)] self._test_search(c, result, context="Text operator search") def test_common_name_operation_search(self): operation = self.operations[0] operation.common_name = "Operation Common Name" operation.save() c = Client() c.login(username=self.username, password=self.password) key = str(pgettext_lazy("key for text search", "operation-name")) result = [('{}="{}"'.format(key, "Operation Common Name"), 1)] self._test_search(c, result, context="Text Operation Common Name Search") def test_description_search(self): find = self.finds[0] find.description = "A simple description." find.save() find2 = self.finds[1] find2.description = "" find2.save() self.create_finds() self.create_finds() c = Client() c.login(username=self.username, password=self.password) key = str(pgettext_lazy("key for text search", "description")) result = [('{}="{}"'.format(key, "A simple description."), 1)] self._test_search(c, result, context="Text description Search") result = [('{}="{}"'.format(key, "*"), 1)] self._test_search(c, result, context="* description Search") result = [('-{}="{}"'.format(key, "*"), models.Find.objects.count() - 1)] self._test_search(c, result, context="-* description Search") def test_number_search(self): # test year search with "=>" notation ope1 = self.operations[0] ope1.year = 2000 ope1.save() ope2 = self.operations[1] ope2.year = 2020 ope2.save() self.create_finds() self.create_finds() c = Client() c.login(username=self.username, password=self.password) key = str(pgettext_lazy("key for text search", "year")) result = [('{}="{}"'.format(key, 2000), 3)] self._test_search(c, result, context="Exact year search") result = [('{}="{}"'.format(key, 2020), 1)] self._test_search(c, result, context="Exact year search") result = [('{}=>"{}"'.format(key, 2019), 1)] self._test_search(c, result, context="=> year search") result = [('{}=>"{}"'.format(key, 1999), 4)] self._test_search(c, result, context="=> year search") result = [('{}=<"{}"'.format(key, 2001), 3)] self._test_search(c, result, context="=< year search") result = [('{}=>"{}" {}=<"{}"'.format(key, 2001, key, 2022), 1)] self._test_search(c, result, context="=> and =< year search") def test_address_operation_search(self): operation = self.operations[0] operation.address = "Street somewhere 29478 NOWHERE" operation.save() c = Client() c.login(username=self.username, password=self.password) key = str(pgettext_lazy("key for text search", "operation-address")) result = [('{}="{}"'.format(key, "Street somewhere 29478 NOWHERE"), 1)] self._test_search(c, result, context="Text Operation Address Search") def test_person_in_charge_search(self): operation = self.operations[0] operation.in_charge = Person.objects.create(name="HISNAME", surname="Michel") operation.save() c = Client() c.login(username=self.username, password=self.password) key = str(pgettext_lazy("key for text search", "in-charge")) result = [('{}="{}"'.format(key, "HISNAME Michel"), 1)] self._test_search(c, result, context="Text Person In Charge Search") def test_conservatory_state_hierarchic_search(self): find = self.finds[0] c = Client() cs1 = models.ConservatoryState.objects.all()[0] cs1.parent = None cs1.save() cs2 = models.ConservatoryState.objects.all()[1] cs2.parent = cs1 cs2.save() cs3 = models.ConservatoryState.objects.all()[2] cs3.parent = cs1 cs3.save() find.conservatory_states.add(cs2) find.save() search = {"search_vector": f'conservatory="{cs2.name}"'} # no result when no authentication response = c.get(reverse("get-find"), search) self.assertRedirects(response, "/") c.login(username=self.username, password=self.password) # one result for exact search response = c.get(reverse("get-find"), search) self.assertEqual(response.status_code, 200) content = response.content.decode() self.assertEqual(json.loads(content)["recordsTotal"], 1) # no result for the brother search = {"search_vector": f'conservatory="{cs3.name}"'} response = c.get(reverse("get-find"), search) self.assertEqual(response.status_code, 200) content = response.content.decode() self.assertEqual(json.loads(content)["recordsTotal"], 0) # one result for the father search = {"search_vector": f'conservatory="{cs1.name}"'} response = c.get(reverse("get-find"), search) self.assertEqual(response.status_code, 200) content = response.content.decode() self.assertEqual(json.loads(content)["recordsTotal"], 1) # test on text search key = str(pgettext_lazy("key for text search", "conservatory")) result = [ ('{}="{}"'.format(key, str(cs2)), 1), ('{}="{}"'.format(key, str(cs3)), 0), ('{}="{}"'.format(key, str(cs1)), 1), ] self._test_search(c, result, context="Text period search") def test_image_search(self): c = Client() c.login(username=self.username, password=self.password) search = {"documents__image__isnull": 2} # 2 for nullboolfield is None response = c.get(reverse("get-find"), search) self.assertEqual(response.status_code, 200) res = json.loads(response.content.decode()) self.assertEqual(res["recordsTotal"], 0) # add an image to the first find document = Document.objects.create(title="Image!") image_path = ( settings.LIB_BASE_PATH + "ishtar_common/static/media/images/ishtar-bg.jpg" ) with open(image_path, "rb") as content: document.image = SimpleUploadedFile( name="ishtar-bg.jpg", content=content.read(), content_type="image/jpeg", ) document.save() self.finds[0].documents.add(document) self.finds[0].save() response = c.get(reverse("get-find"), search) self.assertEqual(response.status_code, 200) res = json.loads(response.content.decode()) self.assertEqual(res["recordsTotal"], 1) def test_mixed_search(self): ope_values = [ str(f.operation.code_patriarche) for f in self.finds ] neo = Period.objects.get(txt_idx="neolithic") final_neo = Period.objects.get(txt_idx="final-neolithic") metal = models.MaterialType.objects.get(txt_idx="metal") iron_metal = models.MaterialType.objects.get(txt_idx="iron_metal") find = self.finds[0] find2 = self.finds[1] dating = Dating.objects.create(period=final_neo) find.datings.add(dating) find.material_types.add(iron_metal) find2.material_types.add(iron_metal) find = models.Find.objects.get(pk=find.pk) find.save() find2 = models.Find.objects.get(pk=find2.pk) find2.save() material_key = str(pgettext_lazy("key for text search", "material")) period_key = str(pgettext_lazy("key for text search", "datings-period")) ope_key = str(pgettext_lazy("key for text search", "code-patriarche")) result = [ (f'{ope_key}="{ope_values[0]}" {period_key}="{neo}" ' f'{material_key}={iron_metal}', 1), (f'-{ope_key}="{ope_values[0]}" {period_key}="{neo}"', 0), (f'-{ope_key}="{ope_values[0]}" -{period_key}="{neo}"', 1), (f'-{ope_key}="{ope_values[0]}" -{period_key}="{neo}" ' f'-{material_key}={metal}', 0), ] c = Client() c.login(username=self.username, password=self.password) self._test_search(c, result, context="Operation") def test_search_with_callable(self): find = self.finds[0] find2 = self.finds[1] c = Client() c.login(username=self.username, password=self.password) loan_key = str(pgettext_lazy("key for text search", "loan")) result = [ ('{}="{}"'.format(loan_key, str(_("Yes"))), 0), ('{}="{}"'.format(loan_key, str(_("No"))), 0), ] self._test_search(c, result, context="No container defined") warehouse = Warehouse.objects.create( name="Lambda warehouse", warehouse_type=WarehouseType.objects.all()[0] ) container = Container.objects.create( location=warehouse, responsible=warehouse, container_type=ContainerType.objects.all()[0], ) find.container_ref = container find.container = container find.save() container2 = Container.objects.create( location=warehouse, responsible=warehouse, container_type=ContainerType.objects.all()[0], ) find2.container_ref = container2 find2.container = container2 find2.save() result = [ ('{}="{}"'.format(loan_key, str(_("Yes"))), 0), ('{}="{}"'.format(loan_key, str(_("No"))), 2), ] self._test_search( c, result, context="All container in their " "reference location" ) find2.container = container find2.save() result = [ ('{}="{}"'.format(loan_key, str(_("Yes"))), 1), ('{}="{}"'.format(loan_key, str(_("No"))), 1), ] self._test_search( c, result, context="One container in his " "reference location" ) class FindBasketWizardModificationTest(WizardTest, FindInit, TestCase): fixtures = WAREHOUSE_FIXTURES url_name = "find_basket_modification" wizard_name = "find_basket_edit_wizard" steps = views.basket_modification_steps redirect_url = "/find_basket_modification/selec-find_basket_modification?"\ "open_item={last_id}" model = models.FindBasket form_datas = [ FormData( "Basket modification", form_datas={ "selec-find_basket_modification": {"pk": ""}, "basket-find_basket_modification": { "label": "Super OK", "slug": "super-ok", "comment": "No", "shared_with": [], "shared_write_with": [], }, }, ) ] def pre_wizard(self): self.basket = models.FindBasket.objects.create( label="OK", user=IshtarUser.objects.get(pk=self.user.pk), ) self.basket_number = models.FindBasket.objects.count() user1 = User.objects.create_user( "user1", "nomail@nomail.com", "user1" ) self.ishtar_user1 = IshtarUser.objects.get(user_ptr=user1) user2 = User.objects.create_user( "user2", "nomail@nomail.com", "user2" ) self.ishtar_user2 = IshtarUser.objects.get(user_ptr=user2) data = self.form_datas[0].form_datas data["selec-find_basket_modification"]["pk"] = self.basket.pk data["basket-find_basket_modification"]["shared_with"] = [self.ishtar_user1.pk] data["basket-find_basket_modification"]["shared_write_with"] = [ self.ishtar_user2.pk, self.ishtar_user1.pk ] super().pre_wizard() def post_wizard(self): # no creation self.assertEqual(models.FindBasket.objects.count(), self.basket_number) basket = models.FindBasket.objects.get(pk=self.basket.pk) self.assertEqual( list(basket.shared_with.values_list("pk", flat=True)), [self.ishtar_user1.pk] ) self.assertEqual( list(basket.shared_write_with.values_list("pk", flat=True)), [self.ishtar_user1.pk, self.ishtar_user2.pk] ) class FindAutocompleteTest(FindInit, TestCase): fixtures = WAREHOUSE_FIXTURES model = models.Find def setUp(self): self.create_finds(force=True) self.create_finds(force=True) self.create_finds(force=True) self.create_finds(force=True) self.username = "myuser" self.password = "mypassword" User.objects.create_superuser(self.username, "myemail@test.com", self.password) self.client = Client() def test_autocomplete(self): find = self.finds[0] find.label = "test 012" find.save() find2 = self.finds[1] find2.label = "test 12" find2.save() find3 = self.finds[2] find3.label = "test 1" find3.save() find4 = self.finds[3] find4.label = "test 0120" find4.save() c = Client() c.login(username=self.username, password=self.password) response = c.get(reverse("autocomplete-find") + "?term=12") self.assertEqual(response.status_code, 200) content = response.content.decode() res = json.loads(content) self.assertEqual(len(res), 3) self.assertEqual(res[0]["id"], find2.pk) # " 12" - startswith self.assertEqual(res[1]["id"], find.pk) # 12 - endswith self.assertEqual(res[2]["id"], find4.pk) # 12 - contains class FindOldPermissionTest(FindInit, TestCase): fixtures = FIND_FIXTURES model = models.Find def setUp(self): print("Theses tests should fail on v5") profile_type = ProfileType.objects.create( label="xxCollaborateur", txt_idx="xxcollaborator", ) gp = Group.objects.create(name="xxMobilier rattachées : voir et modification") ct_find = ContentType.objects.get( app_label="archaeological_finds", model="find" ) gp.permissions.add( Permission.objects.get( codename="view_own_find", content_type=ct_find ) ) gp.permissions.add( Permission.objects.get( codename="change_own_find", content_type=ct_find ) ) profile_type.groups.add(gp) self.username, self.password, self.user = create_superuser() self.alt_username, self.alt_password, self.alt_user = create_user() profile = UserProfile.objects.create( profile_type=profile_type, person=self.alt_user.ishtaruser.person, current=True, ) # nosec: hard coded password for test purposes self.alt_username2, self.alt_password2, self.alt_user2 = create_user( # nosec username="luke", password="iamyourfather" ) profile = UserProfile.objects.create( profile_type=profile_type, person=self.alt_user2.ishtaruser.person, current=True, ) town = Town.objects.create(name="Tatouine", numero_insee="66000") area = Area.objects.create(label="Galaxie", txt_idx="galaxie") area.towns.add(town) profile.areas.add(area) self.orgas = self.create_orgas(self.user) self.create_operation(self.user, self.orgas[0]) self.create_operation(self.alt_user, self.orgas[0]) self.create_context_record( user=self.user, data={"label": "CR 1", "operation": self.operations[0]} ) self.create_context_record( user=self.alt_user, data={"label": "CR 2", "operation": self.operations[1]} ) self.cr_1 = self.context_records[-2] self.cr_2 = self.context_records[-1] self.create_finds( data_base={"context_record": self.cr_1}, user=self.user, force=True ) self.create_finds( data_base={"context_record": self.cr_2}, user=self.alt_user, force=True ) self.find_1 = self.finds[-2] self.find_2 = self.finds[-1] self.operations[-1].towns.add(town) self.alt_user.ishtaruser.generate_permission() self.alt_user2.ishtaruser.generate_permission() def test_own_search(self): # no result when no authentification c = Client() response = c.get(reverse("get-find")) self.assertTrue(not response.content or not json.loads(response.content)) # possession c = Client() c.login(username=self.alt_username, password=self.alt_password) response = c.get(reverse("get-find")) # only one "own" context record available content = response.content.decode() self.assertTrue(content) self.assertEqual(json.loads(content)["recordsTotal"], 1) # area filter c = Client() c.login(username=self.alt_username2, password=self.alt_password2) response = c.get(reverse("get-find")) # only one "own" operation available content = response.content.decode() self.assertTrue(content) self.assertTrue(json.loads(content)) self.assertEqual(json.loads(content)["recordsTotal"], 1) class FindPermissionTest(FindInit, TestPermissionQuery, TestCase): fixtures = FIND_FIXTURES model = models.Find def setUp(self): self.setup_permission_queries( "find", "find", permissions=["view_own_find", "change_own_find"], perm_requests=['id="new-*"', 'excavator="{USER}"'] ) self.users = {} username, password, user = create_superuser() self.users["superuser"] = (username, password, user) upstream_username, upstream_password, upstream_user = create_user( username="up", password="up" ) UserProfile.objects.create( profile_type=self.profile_types["find_upstream"], person=upstream_user.ishtaruser.person, current=True, ) self.users["upstream"] = (upstream_username, upstream_password, upstream_user) # nosec: hard coded password for test purposes areas_username, areas_password, areas_user = create_user( # nosec username="luke", password="iamyourfather" ) profile = UserProfile.objects.create( profile_type=self.profile_types["find_areas"], person=areas_user.ishtaruser.person, current=True, ) self.users["areas"] = ( areas_username, areas_password, areas_user ) town = Town.objects.create(name="Tatouine", numero_insee="66000") area = Area.objects.create(label="Galaxie", txt_idx="galaxie") area.towns.add(town) profile.areas.add(area) self.orgas = self.create_orgas(user) self.create_operation(user, self.orgas[0]) self.create_operation(areas_user, self.orgas[0]) self.create_context_record( user=user, data={"label": "CR 1", "operation": self.operations[0]} ) self.create_context_record( user=areas_user, data={"label": "CR 2", "operation": self.operations[1]} ) self.cr_1 = self.context_records[-2] self.cr_2 = self.context_records[-1] self.create_finds( data_base={"context_record": self.cr_1}, user=user, force=True ) self.create_finds( data_base={"context_record": self.cr_2}, user=areas_user, force=True ) self.find_1 = self.finds[-2] self.find_2 = self.finds[-1] self.operations[-1].towns.add(town) self.operations[-1].context_record.all()[0].ishtar_users.add( upstream_user.ishtaruser ) associated_username, associated_password, associated_user = create_user( username="as", password="as" ) UserProfile.objects.create( profile_type=self.profile_types["find_associated_items"], person=associated_user.ishtaruser.person, current=True, ) self.users["associated"] = ( associated_username, associated_password, associated_user ) # read permission self.basket = models.FindBasket.objects.create( label="My basket", user=IshtarUser.objects.get(pk=user.pk), ) self.basket.items.add(self.find_1) self.basket.shared_with.add(associated_user.ishtaruser) upstream_user.ishtaruser.generate_permission() areas_user.ishtaruser.generate_permission() associated_user.ishtaruser.generate_permission() def test_own_search(self): # no result when no authentification c = Client() response = c.get(reverse("get-find")) self.assertTrue(not response.content or not json.loads(response.content)) url = reverse("get-find") # possession of associated operation # only one "own" context record available self._test_search( url, 'possession', self.users["upstream"], 1 ) # area filter # only one "own" operation available self._test_search( url, 'areas filter', self.users["areas"], 1 ) # filter associated by basket self._test_search( url, 'associated basket filter', self.users["associated"], 1 ) def test_own_modify(self): # no result when no authentification c = Client() response = c.get(reverse("find_modify", args=[self.cr_2.pk])) self.assertRedirects(response, "/") modif_url = "/find_modification/find-find_modification" # upstream c = Client() upstream_username, upstream_password, upstream_user = self.users["upstream"] c.login(username=upstream_username, password=upstream_password) response = c.get(reverse("find_modify", args=[self.find_2.pk]), follow=True) self.assertRedirects(response, modif_url) response = c.get(modif_url) self.assertEqual(response.status_code, 200) response = c.get(reverse("find_modify", args=[self.find_1.pk]), follow=True) self.assertRedirects(response, "/") # area filter c = Client() areas_username, areas_password, areas_user = self.users["areas"] c.login(username=areas_username, password=areas_password) response = c.get(reverse("find_modify", args=[self.find_2.pk]), follow=True) self.assertRedirects(response, modif_url) response = c.get(modif_url) self.assertEqual(response.status_code, 200) response = c.get(reverse("find_modify", args=[self.find_1.pk]), follow=True) self.assertRedirects(response, "/") # basket filter c = Client() basket_username, basket_password, basket_user = self.users["associated"] c.login(username=basket_username, password=basket_password) response = c.get(reverse("find_modify", args=[self.find_1.pk]), follow=True) self.assertRedirects(response, "/") self.basket.shared_write_with.add(basket_user.ishtaruser) basket_user.ishtaruser.generate_permission() response = c.get(reverse("find_modify", args=[self.find_1.pk]), follow=True) self.assertRedirects(response, modif_url) response = c.get(modif_url) self.assertEqual(response.status_code, 200) response = c.get(reverse("find_modify", args=[self.find_2.pk]), follow=True) self.assertRedirects(response, "/") class FindQATest(FindInit, TestCase): fixtures = WAREHOUSE_FIXTURES model = models.Find def setUp(self): self.create_finds(data_base={"label": "Find 1"}, force=True) self.create_finds(data_base={"label": "Find 2"}, force=True) self.alt_username, self.alt_password, self.alt_user = create_user() self.alt_user.user_permissions.add( Permission.objects.get(codename="change_find") ) def get_default_user(self): self.username, self.password, self.user = create_superuser() return self.user def test_duplicate(self): t1, __ = models.Treatment.objects.get_or_create( label="Treatment 1", input_status=models.TreatmentInputStatus.objects.all()[0] ) t2, __ = models.Treatment.objects.get_or_create( label="Treatment 2", input_status=models.TreatmentInputStatus.objects.all()[0] ) find = self.finds[0] default_desc = "Description for duplicate" find.description = default_desc find.upstream_treatment = t1 find.save() d = Document.objects.create() d.finds.add(find) find.treatments.add(t2) c = Client() url = reverse("find-qa-duplicate", args=[find.pk]) response = c.get(url) self.assertRedirects(response, "/") c = Client() c.login(username=self.username, password=self.password) response = c.get(url) self.assertEqual(response.status_code, 200) c = Client() c.login(username=self.alt_username, password=self.alt_password) response = c.get(url) self.assertEqual(response.status_code, 200) data = {"denomination": "clone", "label": "clone - free id"} nb_find = models.Find.objects.count() nb_bf = models.BaseFind.objects.count() response = c.post(url, data) self.assertEqual(response.status_code, 302) # success redirect self.assertEqual(models.Find.objects.count(), nb_find + 1) self.assertEqual(models.BaseFind.objects.count(), nb_bf + 1) new = models.Find.objects.order_by("-pk").all()[0] self.assertEqual(find.history_creator, self.user) self.assertEqual(new.history_creator, self.alt_user) self.assertEqual(new.base_finds.all()[0].history_creator, self.alt_user) self.assertEqual(new.description, default_desc) new_bf = models.BaseFind.objects.order_by("-pk").all()[0] base_bf = find.get_first_base_find() self.assertEqual(new_bf.context_record, base_bf.context_record) self.assertNotIn(d, list(new.documents.all())) # associated treatments are removed #4850 self.assertIsNone(new.upstream_treatment) self.assertIsNone(new.downstream_treatment) self.assertNotIn(t2, list(new.treatments.all())) def test_bulk_update(self): c = Client() pks = "{}-{}".format(self.finds[0].pk, self.finds[1].pk) response = c.get(reverse("find-qa-bulk-update", args=[pks])) self.assertRedirects(response, "/") c = Client() c.login(username=self.username, password=self.password) response = c.get(reverse("find-qa-bulk-update", args=[pks])) self.assertEqual(response.status_code, 200) c = Client() c.login(username=self.alt_username, password=self.alt_password) response = c.get(reverse("find-qa-bulk-update", args=[pks])) self.assertEqual(response.status_code, 200) find_0 = self.finds[0] find_1 = self.finds[1] base_desc_0 = "Base description 1" find_0.description = base_desc_0 find_0.save() base_desc_1 = "Base description 2" find_1.description = base_desc_1 find_1.save() period = Period.objects.all()[0].pk self.assertNotIn(period, [dating.period.pk for dating in find_0.datings.all()]) self.assertNotIn(period, [dating.period.pk for dating in find_1.datings.all()]) extra_desc = "Extra description" response = c.post( reverse("find-qa-bulk-update-confirm", args=[pks]), {"qa_period": period, "qa_description": extra_desc}, ) if response.status_code != 200: self.assertRedirects(response, "/success/") self.assertIn(period, [dating.period.pk for dating in find_0.datings.all()]) self.assertIn(period, [dating.period.pk for dating in find_1.datings.all()]) self.assertEqual( models.Find.objects.get(pk=find_0.pk).description, base_desc_0 + "\n" + extra_desc, ) self.assertEqual( models.Find.objects.get(pk=find_1.pk).description, base_desc_1 + "\n" + extra_desc, ) def test_basket(self): find_0 = self.finds[0] find_1 = self.finds[1] nb_basket = models.FindBasket.objects.count() url = reverse("find-qa-basket", args=[find_0.pk]) c = Client() response = c.get(url) self.assertRedirects(response, "/") c.login(username=self.username, password=self.password) response = c.get(url) self.assertEqual(response.status_code, 200) label = "Test - basket" data = {"qa_bf_create_or_update": "create", "qa_bf_label": label} response = c.post(url, data) if response.status_code != 200: self.assertRedirects(response, "/success/") self.assertEqual(nb_basket + 1, models.FindBasket.objects.count()) q = models.FindBasket.objects.filter(user=self.user.ishtaruser, label=label) self.assertEqual(q.count(), 1) # no duplicate with same user, same name c.post(url, data) self.assertEqual(q.count(), 1) basket = q.all()[0] self.assertEqual(basket.items.count(), 1) # update url = reverse("find-qa-basket", args=[find_1.pk]) self.assertEqual(basket.items.count(), 1) data = {"qa_bf_create_or_update": "update", "qa_bf_basket": basket.pk} c.post(url, data) self.assertEqual(basket.items.count(), 2) self.assertIn(find_1, list(basket.items.all())) # no permission basket.user = self.alt_user.ishtaruser basket.items.clear() basket.save() self.assertEqual(basket.items.count(), 0) c.post(url, data) # no update self.assertEqual(basket.items.count(), 0) # write permission basket.shared_write_with.add(self.user.ishtaruser) c.post(url, data) self.assertEqual(basket.items.count(), 1) def test_packaging(self): c = Client() find_0 = self.finds[0] find_1 = self.finds[1] pks = "{}-{}".format(find_0.pk, find_1.pk) url = reverse("find-qa-packaging", args=[pks]) response = c.get(url) self.assertRedirects(response, "/") profile, created = IshtarSiteProfile.objects.get_or_create( slug="default", active=True ) profile.warehouse = False profile.save() c = Client() c.login(username=self.username, password=self.password) response = c.get(url) # warehouse profile must be activated self.assertEqual(response.status_code, 404) profile.warehouse = True profile.save() response = c.get(url) self.assertEqual(response.status_code, 200) c = Client() c.login(username=self.alt_username, password=self.alt_password) response = c.get(url) self.assertEqual(response.status_code, 200) main_warehouse = Warehouse.objects.create( name="Main", warehouse_type=WarehouseType.objects.all()[0] ) container = Container.objects.create( reference="Test", responsible=main_warehouse, location=main_warehouse, container_type=ContainerType.objects.all()[0], ) container2 = Container.objects.create( reference="Test2", responsible=main_warehouse, location=main_warehouse, container_type=ContainerType.objects.all()[0], ) container2_base = Container.objects.create( reference="Test2 base", responsible=main_warehouse, location=main_warehouse, container_type=ContainerType.objects.all()[0], ) packaging = models.TreatmentType.objects.get(txt_idx="packaging") packaging.change_reference_location = True packaging.change_current_location = False packaging.save() packaging2, __ = models.TreatmentType.objects.get_or_create( txt_idx="packaging-2", defaults={"virtual": False} ) packaging2.change_reference_location = True packaging2.change_current_location = True packaging2.save() packaging3, __ = models.TreatmentType.objects.get_or_create( txt_idx="packaging-3", defaults={"virtual": False} ) packaging3.change_reference_location = False packaging3.change_current_location = True packaging3.save() data_check_lst = [ ( { "qa-packaging-container": container.pk, "qa-packaging-container_to_change": "reference", }, {"container_ref": container, "container": None}, 0, ), ( { "qa-packaging-container": container2.pk, "qa-packaging-container_to_change": "current", }, {"container_ref": None, "container": container2}, 0, ), ( { "qa-packaging-container": container.pk, "qa-packaging-container_to_change": "current-and-reference", }, {"container_ref": container, "container": container}, 0, ), ( { "qa-packaging-container": container2.pk, "qa-packaging-container_to_change": "reference", "qa-packaging-create_treatment": True, "qa-packaging-year": 2019, "qa-packaging-treatment_type": packaging.pk, }, {"container_ref": container2, "container": None}, 1, ), ( { "qa-packaging-container": container2.pk, "qa-packaging-container_to_change": "current-and-reference", "qa-packaging-create_treatment": True, "qa-packaging-year": 2019, "qa-packaging-treatment_type": packaging2.pk, }, {"container_ref": container2, "container": container2}, 1, ), ( { "qa-packaging-container": container2.pk, "qa-packaging-container_to_change": "current", "qa-packaging-create_treatment": True, "qa-packaging-year": 2019, "qa-packaging-treatment_type": packaging3.pk, }, {"container_ref": None, "container": container2}, 1, ), ] for idx, lst in enumerate(data_check_lst): data, check, nb_treat = lst # reinit find_0.container, find_0.container_ref = None, None find_0.skip_history_when_saving = True find_0.save() find_1.container, find_1.container_ref = container2_base, container2_base find_1.skip_history_when_saving = True find_1.save() init_nb_treat = models.Treatment.objects.count() response = c.post(reverse("find-qa-packaging", args=[pks]), data) self.assertRedirects(response, "/success/") for k in check: value = check[k] value2 = check[k] if k == "container" and data["qa-packaging-container_to_change"] == "reference": # if current not set -> auto set container to container_ref value = check["container_ref"] # current set for find_1 -> keep it value2 = Container.objects.get(reference="Test2 base") if k == "container_ref" and data["qa-packaging-container_to_change"] == "current": value = None # ref set for find_1 -> keep it value2 = Container.objects.get(reference="Test2 base") find = models.Find.objects.get(pk=find_0.pk) self.assertEqual(getattr(find, k), value, f"find-qa-packaging: find_0 {k} not set for data {data}") find = models.Find.objects.get(pk=find_1.pk) self.assertEqual(getattr(find, k), value2, f"find-qa-packaging: find_1 {k} not set for data {data}") final_nb_treat = models.Treatment.objects.count() self.assertEqual(init_nb_treat + nb_treat, final_nb_treat) if not final_nb_treat: self.assertEqual(models.FindTreatment.objects.filter(find=find_0).count(), 0) continue treatment = models.Treatment.objects.order_by("-pk")[0] q = models.FindTreatment.objects.filter(treatment=treatment) self.assertEqual(q.filter(find=find_0).count(), 1) ft = q.all()[0] self.assertTrue(ft.full_location) self.assertTrue(ft.location_type) class FindHistoryTest(FindInit, TestCase): fixtures = FIND_FIXTURES model = models.Find def setUp(self): self.create_finds(data_base={"label": "Find 1"}, force=True) self.username, self.password, self.user = create_superuser() self.client = Client() self.client.login(username=self.username, password=self.password) def _add_datings(self, find): d1_attrs = { "period": Period.objects.get(txt_idx="neolithic"), "start_date": 5000, "end_date": 5001, "dating_type": DatingType.objects.get(txt_idx="from_absolute_dating"), "quality": DatingQuality.objects.get(txt_idx="sure"), "precise_dating": "Blah !!!", } d1 = Dating.objects.create(**d1_attrs) d2_attrs = { "period": Period.objects.get(txt_idx="paleolithic"), } d2 = Dating.objects.create(**d2_attrs) d1_dct, d2_dct = {}, {} for k in Dating.HISTORY_ATTR: for dct, attr in ((d1_dct, d1_attrs), (d2_dct, d2_attrs)): if k in attr: if hasattr(attr[k], "txt_idx"): dct[k] = attr[k].txt_idx else: dct[k] = str(attr[k]) else: dct[k] = "" find.datings.add(d1) find.datings.add(d2) return d1_dct, d2_dct def test_m2m_history_save(self): find = self.finds[0] user = self.get_default_user() ceram = models.MaterialType.objects.get(txt_idx="ceramic").pk glass = models.MaterialType.objects.get(txt_idx="glass").pk find = models.Find.objects.get(pk=find.pk) nb_hist = find.history.count() find.label = "hop hop hop1" find.history_modifier = user find._force_history = True find.save() find.material_types.add(ceram) find.material_types.add(glass) d1_txt, d2_txt = self._add_datings(find) find = models.Find.objects.get(pk=find.pk) self.assertIsNotNone(find.history_m2m) self.assertIn("material_types", find.history_m2m) self.assertIn( find.history_m2m["material_types"], [["ceramic", "glass"], ["glass", "ceramic"]], # order do not ) # matter self.assertIn("datings", find.history_m2m) self.assertIn( find.history_m2m["datings"], [[d1_txt, d2_txt], [d2_txt, d1_txt]], # order do not ) # matter self.assertEqual(find.history.count(), nb_hist + 1) historical_material_types = find.history_m2m["material_types"] find = models.Find.objects.get(pk=find.pk) find.label = "hop hop hop2" find.history_modifier = user if hasattr(find, "skip_history_when_saving"): delattr(find, "skip_history_when_saving") find._force_history = True find.save() find.material_types.remove(ceram) find.datings.clear() find = models.Find.objects.get(pk=find.pk) self.assertEqual(find.history_m2m["material_types"], ["glass"]) self.assertEqual(find.history_m2m["datings"], []) self.assertEqual(find.history.count(), nb_hist + 2) self.assertEqual( find.history.all()[1].history_m2m["material_types"], historical_material_types, ) self.assertEqual(find.history.all()[0].history_m2m["material_types"], ["glass"]) def _init_m2m(self, find, user): ceram = models.MaterialType.objects.get(txt_idx="ceramic").pk glass = models.MaterialType.objects.get(txt_idx="glass").pk find = models.Find.objects.get(pk=find.pk) find.history_modifier = user find.label = "hop hop hop1" find._force_history = True if hasattr(find, "skip_history_when_saving"): delattr(find, "skip_history_when_saving") find.save() find.material_types.add(ceram) find.material_types.add(glass) self.d1_dct, self.d2_dct = self._add_datings(find) find = models.Find.objects.get(pk=find.pk) find.history_modifier = user find.label = "hop hop hop2" find._force_history = True if hasattr(find, "skip_history_when_saving"): delattr(find, "skip_history_when_saving") find.save() find.datings.clear() find.material_types.remove(ceram) def test_m2m_history_display(self): c = Client() user = self.get_default_user() find = self.finds[0] self._init_m2m(find, user) find = models.Find.objects.get(pk=find.pk) history_date = ( find.history.order_by("-history_date") .all()[1] .history_date.strftime("%Y-%m-%dT%H:%M:%S.%f") ) c.login(username=self.username, password=self.password) response = c.get(reverse("show-find", kwargs={"pk": find.pk})) self.assertEqual(response.status_code, 200) self.assertIn('class="card sheet"', response.content.decode("utf-8")) content = response.content.decode("utf-8") self.assertNotIn(Period.objects.get(txt_idx="neolithic").label, content) self.assertNotIn("5001", content) response = c.get( reverse( "show-historized-find", kwargs={"pk": find.pk, "date": history_date} ) ) self.assertEqual(response.status_code, 200) content = response.content.decode("utf-8") self.assertIn('class="card sheet"', content) self.assertIn( models.MaterialType.objects.get(txt_idx="ceramic").label, content, msg="ceramic not found in historical sheet", ) self.assertIn("5\xa0001", content, msg="5 001 not found in historical sheet") self.assertIn(Period.objects.get(txt_idx="neolithic").label, content) def test_m2m_history_restore(self): user = self.get_default_user() find = self.finds[0] self._init_m2m(find, user) find = models.Find.objects.get(pk=find.pk) ceram = models.MaterialType.objects.get(txt_idx="ceramic") glass = models.MaterialType.objects.get(txt_idx="glass") materials = list(find.material_types.all()) self.assertNotIn(ceram, materials) self.assertIn(glass, materials) self.assertEqual(list(find.datings.all()), []) history_date = find.history.order_by("-history_date").all()[1].history_date find.rollback(history_date) find = models.Find.objects.get(pk=find.pk) materials = list(find.material_types.all()) self.assertIn(ceram, materials) self.assertIn(glass, materials) current_datings = [] for dating in find.datings.all(): dating_dct = {} for k in self.d1_dct.keys(): if not getattr(dating, k): dating_dct[k] = "" continue dating_dct[k] = getattr(dating, k) if hasattr(dating_dct[k], "txt_idx"): dating_dct[k] = dating_dct[k].txt_idx dating_dct[k] = str(dating_dct[k]) current_datings.append(dating_dct) self.assertIn(self.d1_dct, current_datings) self.assertIn(self.d2_dct, current_datings) class TreatmentTest(FindInit, TestCase): fixtures = FIND_FIXTURES model = models.Find def setUp(self): base_img = ( settings.LIB_BASE_PATH + "ishtar_common/static/media/images/ishtar-bg.jpg" ) temp_dir = tempfile.gettempdir() img = os.path.join(temp_dir, "ishtar-bg.jpg") shutil.copy2(base_img, img) self.create_finds(data_base={"label": "Find 1"}, force=True) self.create_finds(data_base={"label": "Find 2"}, force=True) image = Document.objects.create(title="Image!") with open(img, "rb") as cimg: image.image.save("ishtar-bg.jpg", File(cimg)) self.finds[0].documents.add(image) self.finds[0].save() self.basket = models.FindBasket.objects.create( label="My basket", user=IshtarUser.objects.get(pk=self.get_default_user().pk), ) self.other_basket = models.FindBasket.objects.create( label="My other basket", user=IshtarUser.objects.get(pk=self.get_default_user().pk), ) for find in self.finds: self.basket.items.add(find) self.other_basket.items.add(find) def test_packaging_with_new_find_creation(self): treatment_type = models.TreatmentType.objects.get(txt_idx="packaging") # make packaging a treatment with a new version of the find created treatment_type.create_new_find = True treatment_type.save() treatment = models.Treatment() items_nb = models.Find.objects.count() first_find = self.finds[0] completed, created = models.TreatmentInputStatus.objects.get_or_create( txt_idx="validated", defaults={"executed": True, "label": "Validated"} ) completed.executed = True completed.save() treatment.input_status = completed treatment.save( user=self.get_default_user(), items=self.basket, treatment_type_list=[treatment_type], ) treatment.treatment_types.add(treatment_type) self.assertEqual( items_nb + self.basket.items.count(), models.Find.objects.count(), msg="Packaging doesn't generate enough new finds", ) resulting_find = models.Find.objects.get( upstream_treatment__upstream=first_find, base_finds__pk=first_find.base_finds.all()[0].pk, ) # image names used to be altered on save: check for this bug self.assertEqual( resulting_find.documents.all()[0].title, models.Find.objects.get(pk=first_find.pk).documents.all()[0].title, ) # new version of the find is in the basket for item in self.basket.items.all(): self.assertNotIn( item, self.finds, msg="Original basket have not been upgraded after packaging", ) for item in self.other_basket.items.all(): self.assertNotIn( item, self.finds, msg="Other basket have not been upgraded after packaging", ) def test_simple_delete(self): treatment_type = models.TreatmentType.objects.get(txt_idx="packaging") treatment_type.create_new_find = False treatment_type.save() nb_find = models.Find.objects.count() treatment = models.Treatment() initial_find = self.finds[0] completed, created = models.TreatmentInputStatus.objects.get_or_create( txt_idx="validated", defaults={"executed": True, "label": "Validated"} ) completed.executed = True completed.save() treatment.input_status = completed treatment.save( user=self.get_default_user(), items=self.basket, treatment_type_list=[treatment_type], ) treatment.treatment_types.add(treatment_type) self.assertEqual(nb_find, models.Find.objects.count()) treatment.delete() self.assertEqual(models.Treatment.objects.filter(pk=treatment.pk).count(), 0) q = models.Find.objects.filter(pk=initial_find.pk) # initial find not deleted self.assertEqual(q.count(), 1) initial_find = q.all()[0] self.assertEqual(initial_find.upstream_treatment, None) def test_upstream_find_delete(self): treatment_type = models.TreatmentType.objects.get(txt_idx="packaging") # make packaging a treatment with a new version of the find created treatment_type.create_new_find = True treatment_type.save() nb_find = models.Find.objects.count() treatment = models.Treatment() initial_find = self.finds[0] completed, created = models.TreatmentInputStatus.objects.get_or_create( txt_idx="validated", defaults={"executed": True, "label": "Validated"} ) completed.executed = True completed.save() treatment.input_status = completed treatment.save( user=self.get_default_user(), items=self.basket, treatment_type_list=[treatment_type], ) treatment.treatment_types.add(treatment_type) nb_b = self.basket.items.count() self.assertEqual(nb_find + nb_b, models.Find.objects.count()) resulting_find = models.Find.objects.get( upstream_treatment__upstream=initial_find, base_finds__pk=initial_find.base_finds.all()[0].pk, ) resulting_find.delete() self.assertEqual(models.Treatment.objects.filter(pk=treatment.pk).count(), 0) q = models.Find.objects.filter(pk=initial_find.pk) # initial find not deleted self.assertEqual(q.count(), 1) initial_find = q.all()[0] self.assertEqual(initial_find.upstream_treatment, None) def test_treatment_delete(self): treatment_type = models.TreatmentType.objects.get(txt_idx="packaging") treatment_type.create_new_find = True treatment_type.save() nb_find = models.Find.objects.count() treatment = models.Treatment() initial_find = self.finds[0] completed, created = models.TreatmentInputStatus.objects.get_or_create( txt_idx="validated", defaults={"executed": True, "label": "Validated"} ) completed.executed = True completed.save() treatment.input_status = completed treatment.save( user=self.get_default_user(), items=self.basket, treatment_type_list=[treatment_type], ) treatment.treatment_types.add(treatment_type) nb_b = self.basket.items.count() self.assertEqual(nb_find + nb_b, models.Find.objects.count()) treatment.delete() self.assertEqual(nb_find, models.Find.objects.count()) self.assertEqual(models.Treatment.objects.filter(pk=treatment.pk).count(), 0) q = models.Find.objects.filter(pk=initial_find.pk) # initial find not deleted self.assertEqual(q.count(), 1) initial_find = q.all()[0] self.assertEqual(initial_find.upstream_treatment, None) class AutocompleteTest(AutocompleteTestBase, TestCase): fixtures = FIND_FIXTURES models = [ AcItem(models.ObjectType, "autocomplete-objecttype"), AcItem(models.MaterialType, "autocomplete-materialtype"), AcItem( models.TreatmentType, "autocomplete-treatmenttype", default_values={"virtual": False}, ), AcItem(models.IntegrityType, "autocomplete-integritytype"), AcItem( models.TreatmentFile, "autocomplete-treatmentfile", prepare_func="create_treatmentfile", ), AcItem( models.FindBasket, "autocomplete-findbasket", prepare_func="create_findbasket", ), AcItem(models.Find, "autocomplete-find", prepare_func="create_find"), AcItem( models.Treatment, "autocomplete-treatment", prepare_func="create_treatment" ), ] def create_treatmentfile(self, base_name): item, __ = models.TreatmentFile.objects.get_or_create( name=base_name, type=models.TreatmentFileType.objects.all()[0] ) return item, None def create_treatment(self, base_name): item, __ = models.Treatment.objects.get_or_create( label=base_name, input_status=models.TreatmentInputStatus.objects.all()[0] ) return item, None def create_findbasket(self, base_name): item, __ = models.FindBasket.objects.get_or_create( label=base_name, ) item.user = self.user.ishtaruser item.save() return item, None def create_find(self, base_name): ope, __ = Operation.objects.get_or_create( common_name="Test", operation_type=OperationType.objects.all()[0] ) cr, __ = ContextRecord.objects.get_or_create(operation=ope, label=base_name) base_find = models.BaseFind.objects.create(context_record=cr) find = models.Find.objects.create( label=base_name, ) find.base_finds.add(base_find) return find, None class PublicAPITest(FindInit, APITestCase): fixtures = FIND_FIXTURES model = models.Find def setUp(self): self.create_finds(data_base={"label": "Find 1"}, force=True) self.basket = models.FindBasket.objects.create( slug="my-basket", label="My basket", user=IshtarUser.objects.get(pk=self.get_default_user().pk), ) for find in self.finds: self.basket.items.add(find) self.username = "myuser" self.password = "mypassword" user = User.objects.create_user( self.username, "myemail@test.com", self.password ) self.auth_token = "Token " + Token.objects.create(user=user).key def test_authentication(self): url = reverse("api-public-find") + "?basket=my-basket" response = self.client.get(url, format="json") self.assertEqual(response.status_code, 401) response = self.client.get( url, format="json", HTTP_AUTHORIZATION=self.auth_token ) self.assertEqual(response.status_code, 200) def test_basket(self): # unknown basket url = reverse("api-public-find") + "?basket=non-exist-basket" response = self.client.get( url, format="json", HTTP_AUTHORIZATION=self.auth_token ) self.assertEqual(response.status_code, 200) content = response.content.decode() res = json.loads(content) self.assertEqual(res, []) # basket is not public url = reverse("api-public-find") + "?basket=my-basket" response = self.client.get( url, format="json", HTTP_AUTHORIZATION=self.auth_token ) self.assertEqual(response.status_code, 200) content = response.content.decode() res = json.loads(content) self.assertEqual(res, []) # ok self.basket.public = True self.basket.save() response = self.client.get( url, format="json", HTTP_AUTHORIZATION=self.auth_token ) self.assertEqual(response.status_code, 200) content = response.content.decode() res = json.loads(content) self.assertEqual(len(res), 1) def test_basket_content(self): find = self.finds[0] find.denomination = "denomination" ceram = models.MaterialType.objects.get(txt_idx="ceramic") find.material_types.add(ceram) find.save() url = reverse("api-public-find") + "?basket=my-basket" self.basket.public = True self.basket.save() response = self.client.get( url, format="json", HTTP_AUTHORIZATION=self.auth_token ) self.assertEqual(response.status_code, 200) content = response.content.decode() res = json.loads(content) expected_results = [ ( ( 0, "denomination", ), "denomination", ), ((0, "materials", 0), "Céramique"), ((0, "base-finds", 0, "context-record", "label"), "Context record"), ((0, "base-finds", 0, "context-record", "town"), "default_town (12345)"), ((0, "base-finds", 0, "context-record", "operation", "year"), 2010), ] for path, result in expected_results: path = list(reversed(path)) value = deepcopy(res) while path: key = path.pop() if isinstance(key, int): self.assertTrue(len(value) > key) else: self.assertIn(key, value) value = value[key] self.assertEqual(value, result) class LabelTest(FindInit, TestCase): fixtures = FIND_FIXTURES model = models.Find def setUp(self): templates = [ settings.LIB_BASE_PATH + "archaeological_finds/tests/" + t for t in ( "etiquettes-mobilier.odt", "etiquettes-mobilier", "etiquettes-mobilier.txt", "bad_lo.zip", "truncated_xml.zip", ) ] self.templates = [] for template in templates: filename = template.split("/")[-1] shutil.copy( template, os.path.join(settings.MEDIA_ROOT, filename), follow_symlinks=True, ) self.templates.append(os.path.join(settings.MEDIA_ROOT, filename)) def tearDown(self): for tpl in self.templates: if os.path.exists(tpl): os.remove(tpl) def test_label(self): base_targets = ";".join("Cadre{}".format(idx) for idx in range(1, 25)) base_tpl, missing_ext, text_file, bad_lo, trunc_xml = self.templates dataset = ( (base_tpl, base_targets, True, "OK"), (base_tpl, "", False, "no target"), (base_tpl, "-;Cadre2;Cadre3", False, "bad first target"), ( base_tpl, "Cadre1;Frame2;Frame3", True, "first target OK, silently failed other targets", ), (missing_ext, base_targets, True, "missing extension"), (text_file, base_targets, False, "text file"), (bad_lo, base_targets, False, "missing content.xml"), (trunc_xml, base_targets, False, "truncated content.xml"), ) for tpl_file, targets, is_ok, msg in dataset: with open(tpl_file, "rb") as tpl: template = SimpleUploadedFile("etiquettes-mobilier.odt", tpl.read()) model, __ = ImporterModel.objects.get_or_create( klass="archaeological_finds.models.Find" ) q = DocumentTemplate.objects.filter(slug="test") if q.count(): q.all()[0].delete() doc = DocumentTemplate.objects.create( name="Test", slug="test", associated_model=model, available=True, label_targets=targets, label_template=template, ) self.templates.append(doc.label_template.path) doc = DocumentTemplate.objects.get(pk=doc.pk) msg = "Fail on dataset: " + msg if is_ok: self.assertTrue(doc.template.name, msg=msg) self.templates.append(doc.template.path) else: self.assertFalse(doc.template.name, msg=msg) class TemplateTest(FindInit, TestCase): fixtures = FIND_FIXTURES model = models.Find def setUp(self): template = ( settings.LIB_BASE_PATH + "archaeological_finds/tests/notices-panier.odt" ) filename = template.split("/")[-1] shutil.copy( template, os.path.join(settings.MEDIA_ROOT, filename), follow_symlinks=True ) self.template = os.path.join(settings.MEDIA_ROOT, filename) self.templates = [self.template] ope1 = self.create_operation()[0] ope2 = self.create_operation()[1] cr = self.create_context_record(data={"label": "CR 1", "operation": ope1})[0] cr2 = self.create_context_record(data={"label": "CR 2", "operation": ope2})[1] self.create_finds(data_base={"context_record": cr})[0] self.create_finds(data_base={"context_record": cr2})[1] self.basket = models.FindBasket.objects.create(label="Hophop") self.basket.items.add(self.finds[0]) self.basket.items.add(self.finds[1]) wt, __ = WarehouseType.objects.get_or_create(label="WT", txt_idx="WT") location = Warehouse.objects.create(name="Warehouse", warehouse_type=wt) ct, __ = ContainerType.objects.get_or_create(label="CT", txt_idx="CT") self.container = Container.objects.create( location=location, reference="Reférence", container_type=ct ) def test_label(self): with open(self.template, "rb") as tpl: template = SimpleUploadedFile("template.odt", tpl.read()) model, __ = ImporterModel.objects.get_or_create( klass="archaeological_finds.models.Find" ) q = DocumentTemplate.objects.filter(slug="test") if q.count(): q.all()[0].delete() doc = DocumentTemplate.objects.create( name="Test", slug="test", associated_model=model, available=True, template=template, ) self.templates.append(doc.template.path) doc = DocumentTemplate.objects.get(pk=doc.pk) result = doc.publish(self.basket) if result: self.templates.append(result) def tearDown(self): for tpl in self.templates: if os.path.exists(tpl): os.remove(tpl)