diff options
author | Étienne Loks <etienne.loks@iggdrasil.net> | 2023-09-20 12:48:57 +0200 |
---|---|---|
committer | Étienne Loks <etienne.loks@iggdrasil.net> | 2024-02-05 10:51:52 +0100 |
commit | 20cb7d66a947f7525fb988fe94a243f9fa4d2819 (patch) | |
tree | 48104e70d1968045028568597bfd52726c3ef5f5 | |
parent | b293908ec15c87ed4f015f6c538678f8f3d67c92 (diff) | |
download | Ishtar-20cb7d66a947f7525fb988fe94a243f9fa4d2819.tar.bz2 Ishtar-20cb7d66a947f7525fb988fe94a243f9fa4d2819.zip |
🧪 Import group archive: failing test
-rw-r--r-- | archaeological_finds/tests.py | 28 | ||||
-rw-r--r-- | archaeological_operations/tests.py | 3 | ||||
-rw-r--r-- | ishtar_common/models_imports.py | 2 | ||||
-rw-r--r-- | ishtar_common/tests.py | 181 |
4 files changed, 161 insertions, 53 deletions
diff --git a/archaeological_finds/tests.py b/archaeological_finds/tests.py index b0a0af7dc..312d1385f 100644 --- a/archaeological_finds/tests.py +++ b/archaeological_finds/tests.py @@ -696,11 +696,7 @@ class ImportFindTest(ImportTest, FindInit, TestCase): self.assertEqual(base_find.main_geodata_id, new[0].pk) def test_group_import(self): - root = os.path.join(settings.LIB_BASE_PATH, "archaeological_finds", "tests") - importer_filename = os.path.join(root, "importer-group.zip") - self.restore_serialized(importer_filename) - imp_group = ImporterGroup.objects.get(slug="chantier-des-depots") - imp_file = open(os.path.join(root, "importer-group.csv"), "rb") + imp_group, imp_file, imp_media = self.get_group_import() file_dict = { "imported_file": SimpleUploadedFile(imp_file.name, imp_file.read()) } @@ -716,33 +712,13 @@ class ImportFindTest(ImportTest, FindInit, TestCase): ) self.assertFalse(form.is_valid()) self.assertIn(str(_("This importer need a document archive.")), form.errors["__all__"]) - imp_media = open(os.path.join(root, "importer-group-media.zip"), "rb") file_dict["imported_images"] = SimpleUploadedFile(imp_media.name, imp_media.read()) form = forms_common.NewImportGroupForm( data=post_dict, files=file_dict, user=self.user ) self.assertTrue(form.is_valid()) impt = form.save(self.ishtar_user) - impt.initialize() - - Operation.objects.get_or_create( - code_patriarche="123456", - operation_type=OperationType.objects.all()[0] - ) - wt, __ = WarehouseType.objects.get_or_create(label="WT", txt_idx="WT") - w, __ = Warehouse.objects.get_or_create( - external_id="warh", - defaults={"name": "Warehouse test", "warehouse_type": wt}, - ) - div1, __ = ContainerType.objects.get_or_create(label="Div1", txt_idx="DIV1") - div2, __ = ContainerType.objects.get_or_create(label="Div2", txt_idx="DIV2") - WarehouseDivisionLink.objects.get_or_create( - warehouse=w, container_type=div1, order=10 - ) - WarehouseDivisionLink.objects.get_or_create( - warehouse=w, container_type=div2, order=20 - ) - ContainerType.objects.get_or_create(label="CT", txt_idx="CT") + self.init_group_import(impt) nb_base_find = models.BaseFind.objects.count() nb_find = models.Find.objects.count() diff --git a/archaeological_operations/tests.py b/archaeological_operations/tests.py index 423eae635..baffffa26 100644 --- a/archaeological_operations/tests.py +++ b/archaeological_operations/tests.py @@ -95,6 +95,7 @@ from archaeological_files.models import File, FileType from ishtar_common import forms_common from ishtar_common.tests import ( + BaseImportTest, WizardTest, WizardTestFormData as FormData, create_superuser, @@ -158,7 +159,7 @@ class FileInit(object): self.item.save() -class ImportTest: +class ImportTest(BaseImportTest): def setUp(self): self.username, self.password, self.user = create_superuser() self.ishtar_user = IshtarUser.objects.get(pk=self.user.pk) diff --git a/ishtar_common/models_imports.py b/ishtar_common/models_imports.py index 85b859a5d..9e505392d 100644 --- a/ishtar_common/models_imports.py +++ b/ishtar_common/models_imports.py @@ -2036,6 +2036,8 @@ class Import(BaseImport): def initialize(self, user=None, session_key=None): self.state = "AP" self.end_date = datetime.datetime.now() + if user and not self.user: + self.user = user self.save() try: self.get_importer_instance().initialize( diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py index 8726dc559..952002e89 100644 --- a/ishtar_common/tests.py +++ b/ishtar_common/tests.py @@ -62,7 +62,7 @@ from django.test.runner import DiscoverRunner from django.utils.translation import ugettext_lazy as _ from django.urls import reverse -from ishtar_common import models, models_common +from ishtar_common import models, models_common, forms_common from ishtar_common import views from ishtar_common.apps import admin_site from ishtar_common.serializers import ( @@ -2467,7 +2467,8 @@ class ShortMenuTest(TestCase): self.assertEqual(response.status_code, 200) -class ImportTest(TestCase): +class BaseImportTest(TestCase): + def create_import(self): create_user() imp_model = models.ImporterModel.objects.create( @@ -2479,7 +2480,7 @@ class ImportTest(TestCase): LIB_BASE_PATH + "archaeological_operations/tests/MCC-operations-example.csv", dest, - ) + ) with open(dest, "rb") as f: mcc_operation_file = DjangoFile(f) imprt = models.Import.objects.create( @@ -2489,6 +2490,72 @@ class ImportTest(TestCase): ) return imprt + def init_group_import(self, impt): + impt.initialize() + Town = apps.get_model("ishtar_common", "Town") + Town.objects.get_or_create(numero_insee="59350", name="Nordweiler") + Operation = apps.get_model("archaeological_operations", "Operation") + OperationType = apps.get_model("ishtar_common", "OperationType") + ope_type = OperationType.objects.create( + label="Diag", + ) + Operation.objects.get_or_create( + code_patriarche="123456", + operation_type=ope_type + ) + Warehouse = apps.get_model("archaeological_warehouse", "Warehouse") + WarehouseType = apps.get_model("archaeological_warehouse", "WarehouseType") + ContainerType = apps.get_model("archaeological_warehouse", "ContainerType") + WarehouseDivisionLink = apps.get_model("archaeological_warehouse", "WarehouseDivisionLink") + wt, __ = WarehouseType.objects.get_or_create(label="WT", txt_idx="WT") + w, __ = Warehouse.objects.get_or_create( + external_id="warh", + defaults={"name": "Warehouse test", "warehouse_type": wt}, + ) + div1, __ = ContainerType.objects.get_or_create(label="Div1", txt_idx="DIV1") + div2, __ = ContainerType.objects.get_or_create(label="Div2", txt_idx="DIV2") + WarehouseDivisionLink.objects.get_or_create( + warehouse=w, container_type=div1, order=10 + ) + WarehouseDivisionLink.objects.get_or_create( + warehouse=w, container_type=div2, order=20 + ) + ContainerType.objects.get_or_create(label="CT", txt_idx="CT") + + def get_group_import(self): + root = os.path.join(settings.LIB_BASE_PATH, "archaeological_finds", "tests") + importer_filename = os.path.join(root, "importer-group.zip") + restore_serialized(importer_filename) + imp_group = models.ImporterGroup.objects.get(slug="chantier-des-depots") + imp_file = open(os.path.join(root, "importer-group.csv"), "rb") + imp_media = open(os.path.join(root, "importer-group-media.zip"), "rb") + return imp_group, imp_file, imp_media + + def create_group_import(self, init=True): + imp_group, imp_file, imp_media = self.get_group_import() + create_user() + ishtar_user = models.IshtarUser.objects.all()[0] + file_dict = { + "imported_file": SimpleUploadedFile(imp_file.name, imp_file.read()), + "imported_images": SimpleUploadedFile(imp_media.name, imp_media.read()) + } + post_dict = { + "importer_type": imp_group.pk, + "name": "find_group_import", + "encoding": "utf-8", + "skip_lines": 1, + "csv_sep": ",", + } + form = forms_common.NewImportGroupForm( + data=post_dict, files=file_dict, user=ishtar_user + ) + self.assertTrue(form.is_valid()) + impt = form.save(ishtar_user) + if init: + impt.initialize(user=ishtar_user) + self.init_group_import(impt) + return impt + def create_importer_model(self): return models.ImporterModel.objects.create( klass="ishtar_common.models.Parcel", name="Parcel" @@ -2497,6 +2564,9 @@ class ImportTest(TestCase): def create_importer_type(self, imp_model): return models.ImporterType.objects.create(associated_models=imp_model) + +class ImportTest(BaseImportTest): + def test_edit_import(self): username, password, user = create_superuser() imprt = self.create_import() @@ -2613,16 +2683,75 @@ class ImportTest(TestCase): profile.delete_image_zip_on_archive = False profile.save() imprt.archive() - imprt = models.Import.objects.get(pk=imprt.pk) - self.assertEqual(imprt.state, "AC") - self.assertFalse(imprt.error_file) - self.assertFalse(imprt.result_file) - self.assertFalse(imprt.match_file) - self.assertTrue(imprt.imported_images) - self.assertTrue(imprt.archive_file) - self.assertTrue(zipfile.is_zipfile(imprt.archive_file)) + group_import = models.ImportGroup.objects.get(pk=imprt.pk) + self.assertEqual(group_import.state, "AC") + for imprt in group_import.imports.all(): + self.assertEqual(imprt.state, "AC") + self.assertFalse(imprt.error_file) + self.assertFalse(imprt.result_file) + self.assertFalse(imprt.match_file) + self.assertTrue(imprt.imported_images) + self.assertTrue(imprt.archive_file) + self.assertTrue(zipfile.is_zipfile(imprt.archive_file)) + with tempfile.TemporaryDirectory() as tmpdir: + current_zip = zipfile.ZipFile(imprt.archive_file.path, "r") + name_list = current_zip.namelist() + self.assertIn("content.json", name_list) + current_zip.extract("content.json", tmpdir) + content_name = os.path.join(tmpdir, "content.json") + with open(content_name, "r") as content: + files = json.loads(content.read()) + self.assertIn("imported_file", files.keys()) + self.assertIn(files["imported_file"], name_list) + self.assertIn("error_file", files.keys()) + self.assertIn(files["error_file"], name_list) + self.assertIn("result_file", files.keys()) + self.assertIn(files["result_file"], name_list) + self.assertIn("match_file", files.keys()) + self.assertIn(files["match_file"], name_list) + rev_dict = {v: k for k, v in files.items()} + for name in name_list: + current_zip.extract(name, tmpdir) + if name.endswith(".txt"): + with open(os.path.join(tmpdir, name), "r") as f: + self.assertEqual(f.read(), "test" + rev_dict[name]) + elif name.endswith(".csv"): # imported file + with open(os.path.join(tmpdir, name), "r") as f: + self.assertEqual(f.read(), csv_content) + + group_import.unarchive("FE") + group_import = models.ImportGroup.objects.get(pk=imprt.pk) + for imprt in group_import.imports.all(): + self.assertEqual(imprt.state, "FE") + for k in ("error_file", "result_file", "match_file", "imported_images"): + field = getattr(imprt, k) + self.assertTrue(field, "{} is missing in unarchive".format(k)) + with open(field.path, "r") as f: + self.assertEqual(f.read(), "test" + k) + field = getattr(imprt, "imported_file") + self.assertTrue(field, "{} is missing in unarchive".format(k)) + with open(field.path, "r") as f: + self.assertEqual(f.read(), csv_content) + + def test_archive_group_import(self): + group_import = self.create_group_import() + group_import.importation() + profile = models.get_current_profile() + profile.delete_image_zip_on_archive = False + profile.save() + + csv_content = "...." + group_import.archive() + group_import = models.Import.objects.get(pk=group_import.pk) + self.assertEqual(group_import.state, "AC") + self.assertFalse(group_import.error_file) + self.assertFalse(group_import.result_file) + self.assertFalse(group_import.match_file) + self.assertTrue(group_import.imported_images) + self.assertTrue(group_import.archive_file) + self.assertTrue(zipfile.is_zipfile(group_import.archive_file)) with tempfile.TemporaryDirectory() as tmpdir: - current_zip = zipfile.ZipFile(imprt.archive_file.path, "r") + current_zip = zipfile.ZipFile(group_import.archive_file.path, "r") name_list = current_zip.namelist() self.assertIn("content.json", name_list) current_zip.extract("content.json", tmpdir) @@ -2647,15 +2776,15 @@ class ImportTest(TestCase): with open(os.path.join(tmpdir, name), "r") as f: self.assertEqual(f.read(), csv_content) - imprt.unarchive("FE") - imprt = models.Import.objects.get(pk=imprt.pk) - self.assertEqual(imprt.state, "FE") + group_import.unarchive("FE") + group_import = models.Import.objects.get(pk=group_import.pk) + self.assertEqual(group_import.state, "FE") for k in ("error_file", "result_file", "match_file", "imported_images"): - field = getattr(imprt, k) + field = getattr(group_import, k) self.assertTrue(field, "{} is missing in unarchive".format(k)) with open(field.path, "r") as f: self.assertEqual(f.read(), "test" + k) - field = getattr(imprt, "imported_file") + field = getattr(group_import, "imported_file") self.assertTrue(field, "{} is missing in unarchive".format(k)) with open(field.path, "r") as f: self.assertEqual(f.read(), csv_content) @@ -2663,17 +2792,17 @@ class ImportTest(TestCase): profile = models.get_current_profile() profile.delete_image_zip_on_archive = True profile.save() - imprt = models.Import.objects.get(pk=imprt.pk) - image_filename = imprt.imported_images.path + group_import = models.Import.objects.get(pk=group_import.pk) + image_filename = group_import.imported_images.path self.assertTrue(os.path.isfile(image_filename)) - imprt.archive() - imprt = models.Import.objects.get(pk=imprt.pk) - self.assertFalse(imprt.imported_images) + group_import.archive() + group_import = models.Import.objects.get(pk=group_import.pk) + self.assertFalse(group_import.imported_images) self.assertFalse(os.path.isfile(image_filename)) - imprt.unarchive("F") - imprt = models.Import.objects.get(pk=imprt.pk) - self.assertEqual(imprt.state, "FE") # as an error file so state fixed - self.assertFalse(imprt.imported_images) + group_import.unarchive("F") + group_import = models.Import.objects.get(pk=group_import.pk) + self.assertEqual(group_import.state, "FE") # as an error file so state fixed + self.assertFalse(group_import.imported_images) def test_delete_related(self): town = models.Town.objects.create(name="my-test") |