diff options
Diffstat (limited to 'archaeological_finds/tests.py')
-rw-r--r-- | archaeological_finds/tests.py | 214 |
1 files changed, 157 insertions, 57 deletions
diff --git a/archaeological_finds/tests.py b/archaeological_finds/tests.py index 8d3f3607d..46f11e3e6 100644 --- a/archaeological_finds/tests.py +++ b/archaeological_finds/tests.py @@ -24,6 +24,8 @@ import os import shutil import tempfile from zipfile import ZipFile +import subprocess +from osgeo import ogr, osr from rest_framework.test import APITestCase from rest_framework.authtoken.models import Token @@ -1017,62 +1019,60 @@ class ImportFindTest(BaseImportFindTest): :function: Test if all the files of the QField zipped folder are correct """ # Definition of the path to test importer data for GIS data - root = settings.LIB_BASE_PATH + "archaeological_finds/tests/" # Ne pas ch - filename = os.path.join(root, "qfield-prospection.zip") + root = settings.LIB_BASE_PATH + "archaeological_finds/tests/" # Opening of the .zip - zip_file = ZipFile(filename, 'r') - # Verification of the number of files in the .zip - self.assertEqual(len(zip_file.namelist()),2) - # Verification of the names of the files in the .zip - list_files=["Qfield_prospection.qgs","Qfield_prospection_attachments.zip"] - self.assertEqual(zip_file.namelist(), list_files) - # Closing of the .zip - zip_file.close() + with ZipFile(os.path.join(root, "qfield-prospection.zip"), 'r') as zip_file: + # Verification of the number of files in the .zip + self.assertEqual(len(zip_file.namelist()),2) + # Verification of the names of the files in the .zip + list_files = ["Qfield_prospection.qgs","Qfield_prospection_attachments.zip"] + self.assertEqual(zip_file.namelist(), list_files) + # Closing of the .zip + zip_file.close() def test_add_file_qfield_zip(self): """ :function: Try the addition of a file in the zip for QField that will be dowloaded """ # Definition of the path to test importer data for GIS data - root = settings.LIB_BASE_PATH + "archaeological_finds/tests/" # Ne pas ch + root = settings.LIB_BASE_PATH + "archaeological_finds/tests/" filename = os.path.join(root, "qfield-prospection.zip") # Opening of the .zip - zip_file = ZipFile(filename, 'a') - # Verification of the number of files in the .zip before adding a new one - self.assertEqual(len(zip_file.namelist()), 2) - # Recovery of the .csv to add for the test - csv=os.path.join(root, "Finds.csv") - # Adding the .csv to the .zip - zip_file.write(csv, os.path.basename(csv)) - # Verification of the number of files in the .zip after adding the .csv - self.assertEqual(len(zip_file.namelist()), 3) - # Verification of the names of the files in the .zip - list_files = ["Qfield_prospection.qgs", "Qfield_prospection_attachments.zip","Finds.csv"] - self.assertEqual(zip_file.namelist(), list_files) - # Cloning and deletion of the .zip to have 2 files once again - zip_temp=filename+".temp" - with ZipFile(filename, 'r') as zip_orig: + with ZipFile(os.path.join(root, "qfield-prospection.zip"), 'a') as zip_file: + # Verification of the number of files in the .zip before adding a new one + self.assertEqual(len(zip_file.namelist()), 2) + # Recovery of the .csv to add for the test + data = os.path.join(root, "Finds.csv") + # Adding the .csv to the .zip + zip_file.write(data, os.path.basename(data)) + # Verification of the number of files in the .zip after adding the .csv + self.assertEqual(len(zip_file.namelist()), 3) + # Verification of the names of the files in the .zip + list_files = ["Qfield_prospection.qgs", "Qfield_prospection_attachments.zip","Finds.csv"] + self.assertEqual(zip_file.namelist(), list_files) + # Cloning and deletion of the .zip to have 2 files once again + zip_temp = filename+".temp" with ZipFile(zip_temp, 'w') as zip_new: - for item in zip_orig.infolist(): - if item.filename!= "Finds.csv" : - zip_new.writestr(item,zip_orig.read(item.filename)) - # Closing of the old .zip - zip_file.close() + for item in zip_file.infolist(): + if item.filename != "Finds.csv" : + zip_new.writestr(item, zip_file.read(item.filename)) + # Closing of the old .zip + zip_file.close() # Squashing the old .zip with the new one os.replace(zip_temp,filename) # Opening of the new .zip - zip_file = ZipFile(filename, 'r') - # Verification of the number of files in the .zip after deleting the .csv - self.assertEqual(len(zip_file.namelist()), 2) - # Closing of the new .zip - zip_file.close() + with ZipFile(os.path.join(root, "qfield-prospection.zip"), 'r') as zip_file: + # Verification of the number of files in the .zip after deleting the .csv + self.assertEqual(len(zip_file.namelist()), 2) + # Closing of the new .zip + zip_file.close() def test_qfield_import_finds(self): """ - :function: Try the importation of finds link to QField + :function: Try the importation of finds linked to QField """ # Definition of the path to test importer data for GIS data - root = settings.LIB_BASE_PATH + "archaeological_finds/tests/" # Ne pas ch + root = settings.LIB_BASE_PATH + "archaeological_finds/tests/" filename = os.path.join(root, "qfield-mobilier-test.zip") self.restore_serialized(filename) # Uses of a class in ishtar_commons.model_import to retrieve a model via its slug (?) @@ -1116,24 +1116,36 @@ class ImportFindTest(BaseImportFindTest): self.assertEqual(models.Find.objects.count(), nb_find + 1) self.assertEqual(Document.objects.count(), nb_docs + 1) # Verification of the imported values + new = models.BaseFind.objects.order_by("-pk").all()[:1] + for data in new: + self.assertEqual(data.label, "123") + self.assertEqual(str(data.discovery_date), "2025-04-07") + self.assertEqual(data.point_2d, "SRID=4326;POINT (-2.26868001391598 47.3849390721505)") + new = models.Find.objects.order_by("-pk").all()[:1] + for data in new: + self.assertEqual(data.label, "123") + self.assertEqual(str(data.material_types), "archaeological_finds.MaterialType.None") + self.assertEqual(data.description, "Test") + new = ContextRecord.objects.order_by("-pk").all()[:1] + for cr in new: + self.assertEqual(cr.label, "CR") new = GeoVectorData.objects.order_by("-pk").all()[:1] for geo in new: self.assertTrue(geo.x) + self.assertEqual(geo.x, 14) self.assertTrue(geo.y) + self.assertEqual(geo.y, 3) self.assertTrue(geo.z) - self.assertEqual(new[0].x, 14) - self.assertEqual(new[0].y, 3) - self.assertEqual(new[0].z, 2000) + self.assertEqual(geo.z, 2000) def test_qfield_import_group(self): """ :function: Try the importation of datas from a QField prodject (context record, finds and documents) - CURRENTLY BUGGED """ # Definition of the path to test importer data for GIS data root = os.path.join(settings.LIB_BASE_PATH, "archaeological_finds", "tests") self.root = root - importer_filename = os.path.join(root, "qfield-importeur-test.zip") + importer_filename = os.path.join(root, "qfield-csv-test.zip") restore_serialized(importer_filename) # Uses of a class in ishtar_commons.model_import to retrieve a model via its slug (?) imp_group = ImporterGroup.objects.get(slug="qfield-csv-test") @@ -1152,14 +1164,8 @@ class ImportFindTest(BaseImportFindTest): "skip_lines": 1, "csv_sep": ",", } - # Initialization of error values - form = forms_common.NewImportGroupForm( - data=post_dict, files=file_dict, user=self.user - ) - self.assertFalse(form.is_valid()) - self.assertIn(str(_("This importer need a document archive.")), - form.errors["__all__"]) file_dict["imported_images"] = imp_media + # Initialization of error values form = forms_common.NewImportGroupForm( data=post_dict, files=file_dict, user=self.user ) @@ -1172,25 +1178,119 @@ class ImportFindTest(BaseImportFindTest): ope, __ = Operation.objects.get_or_create( code_patriarche="OP", operation_type=OperationType.objects.all()[0]) - cr, __ = ContextRecord.objects.get_or_create( - operation=ope, - label="CR" - ) - # Getting referential values (nb objects, containers,docs, etc.) nb_base_find = models.BaseFind.objects.count() nb_find = models.Find.objects.count() nb_docs = Document.objects.count() - # Beggining of importation impt.importation() - # Getting values after modifications self.assertEqual(models.BaseFind.objects.count(), nb_base_find + 1) self.assertEqual(models.Find.objects.count(), nb_find + 1) self.assertEqual(Document.objects.count(), nb_docs + 1) self.assertFalse(any(imp.error_file for imp in impt.imports.all()), msg="Error on group import") + def test_csv_to_gpkg(self): + """ + :function: Creation of a .gpkg file from the data of an imported .csv + """ + # Step 1 : Importation of data + # Definition of the path to test importer data for GIS data + root = os.path.join(settings.LIB_BASE_PATH, "archaeological_finds", "tests") + self.root = root + filename = os.path.join(root, "qfield-mobilier-test.zip") + self.restore_serialized(filename) + # Uses of a class in ishtar_commons.model_import to retrieve a model via its slug (?) + imp_type = ImporterType.objects.get( + slug="qfield-mobilier-test") # Change the name with the slug of the importeur !!! + # Opening of the CSV + with open(os.path.join(root, "qfield-importeur-data.csv"), "rb") as imp_file: + file_dict = { + "imported_file": SimpleUploadedFile(imp_file.name, imp_file.read()) + } + post_dict = { + "importer_type": imp_type.pk, + "name": "find_geo_import", + "encoding": "utf-8", + "skip_lines": 1, + "csv_sep": ",", + } + # Preparation of the data import + form = forms_common.NewImportGISForm( + data=post_dict, files=file_dict, user=self.user + ) + self.assertTrue(form.is_valid()) + impt = form.save(self.ishtar_user) + # Import initialization + impt.initialize() + # Creation of an operation and a context record for the importation + ope, __ = Operation.objects.get_or_create( + code_patriarche="GOA", + operation_type=OperationType.objects.all()[0]) + cr, __ = ContextRecord.objects.get_or_create( + operation=ope, + label="CR" + ) + # Getting referential values (nb objects, containers,docs, etc.) + nb_base_find = models.BaseFind.objects.count() + nb_find = models.Find.objects.count() + nb_docs = Document.objects.count() + # Beggining of importation + impt.importation() + # Step 2 : Convertion to .gpkg + gpkg = os.path.join(root, "Finds.gpkg") + layer_name = "Finds" + # Deletion of the .gpkg if already existing + if os.path.exists(gpkg): + os.remove(gpkg) + # Getting necessary information from OsGeo to create the .gpkg + driver = ogr.GetDriverByName("GPKG") + datasource = driver.CreateDataSource(gpkg) + srs = osr.SpatialReference() + srs.ImportFromEPSG(4326) + # Layer creation + layer = datasource.CreateLayer("Finds", srs, ogr.wkbPoint) + # Attributes creation + layer.CreateField(ogr.FieldDefn("identifiant", ogr.OFTString)) + layer.CreateField(ogr.FieldDefn("operation", ogr.OFTString)) + layer.CreateField(ogr.FieldDefn("date", ogr.OFTDate)) + layer.CreateField(ogr.FieldDefn("x", ogr.OFTReal)) + layer.CreateField(ogr.FieldDefn("y", ogr.OFTReal)) + layer.CreateField(ogr.FieldDefn("z", ogr.OFTReal)) + layer.CreateField(ogr.FieldDefn("materiau(x)", ogr.OFTString)) + layer.CreateField(ogr.FieldDefn("description", ogr.OFTString)) + layer.CreateField(ogr.FieldDefn("wkt", ogr.OFTString)) + # Importation of the data + feature = ogr.Feature(layer.GetLayerDefn()) + new = models.BaseFind.objects.order_by("-pk").all()[:1] + for find in new : + feature.SetField("identifiant", find.label) + feature.SetField("date", + int(find.discovery_date.year), + int(find.discovery_date.month), + int(find.discovery_date.day), + 0, 0, 0, 0) + feature.SetField("wkt", str(find.point_2d)) + new = models.Find.objects.order_by("-pk").all()[:1] + for find in new: + feature.SetField("materiau(x)", str(find.material_types)) + feature.SetField("description", str(find.description)) + new = ContextRecord.objects.order_by("-pk").all()[:1] + for cr in new: + feature.SetField("operation", cr.label) + new = GeoVectorData.objects.order_by("-pk").all()[:1] + for geo in new: + feature.SetField("x", geo.x) + feature.SetField("y", geo.y) + feature.SetField("z", geo.z) + # Geometry creation + point = ogr.Geometry(ogr.wkbPoint) + point.AddPoint(geo.x, geo.y) + feature.SetGeometry(point) + layer.CreateFeature(feature) + feature = None + datasource = None + class ExportTest(FindInit, TestCase): fixtures = FIND_TOWNS_FIXTURES |