summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorÉtienne Loks <etienne.loks@iggdrasil.net>2023-09-20 12:48:57 +0200
committerÉtienne Loks <etienne.loks@iggdrasil.net>2024-04-16 16:38:32 +0200
commit72babdcfefa3899d70b7da3a978fddf90ad9ee9d (patch)
treee65cd260dd35ae481f30fc2a3b69dd65ced5082e
parent5e4b5ff9ff801aacbfc173873036437b2ab54fa7 (diff)
downloadIshtar-72babdcfefa3899d70b7da3a978fddf90ad9ee9d.tar.bz2
Ishtar-72babdcfefa3899d70b7da3a978fddf90ad9ee9d.zip
🧪 Import group archive: failing test
-rw-r--r--archaeological_finds/tests.py28
-rw-r--r--archaeological_operations/tests.py3
-rw-r--r--ishtar_common/models_imports.py2
-rw-r--r--ishtar_common/tests.py181
4 files changed, 161 insertions, 53 deletions
diff --git a/archaeological_finds/tests.py b/archaeological_finds/tests.py
index b0a0af7dc..312d1385f 100644
--- a/archaeological_finds/tests.py
+++ b/archaeological_finds/tests.py
@@ -696,11 +696,7 @@ class ImportFindTest(ImportTest, FindInit, TestCase):
self.assertEqual(base_find.main_geodata_id, new[0].pk)
def test_group_import(self):
- root = os.path.join(settings.LIB_BASE_PATH, "archaeological_finds", "tests")
- importer_filename = os.path.join(root, "importer-group.zip")
- self.restore_serialized(importer_filename)
- imp_group = ImporterGroup.objects.get(slug="chantier-des-depots")
- imp_file = open(os.path.join(root, "importer-group.csv"), "rb")
+ imp_group, imp_file, imp_media = self.get_group_import()
file_dict = {
"imported_file": SimpleUploadedFile(imp_file.name, imp_file.read())
}
@@ -716,33 +712,13 @@ class ImportFindTest(ImportTest, FindInit, TestCase):
)
self.assertFalse(form.is_valid())
self.assertIn(str(_("This importer need a document archive.")), form.errors["__all__"])
- imp_media = open(os.path.join(root, "importer-group-media.zip"), "rb")
file_dict["imported_images"] = SimpleUploadedFile(imp_media.name, imp_media.read())
form = forms_common.NewImportGroupForm(
data=post_dict, files=file_dict, user=self.user
)
self.assertTrue(form.is_valid())
impt = form.save(self.ishtar_user)
- impt.initialize()
-
- Operation.objects.get_or_create(
- code_patriarche="123456",
- operation_type=OperationType.objects.all()[0]
- )
- wt, __ = WarehouseType.objects.get_or_create(label="WT", txt_idx="WT")
- w, __ = Warehouse.objects.get_or_create(
- external_id="warh",
- defaults={"name": "Warehouse test", "warehouse_type": wt},
- )
- div1, __ = ContainerType.objects.get_or_create(label="Div1", txt_idx="DIV1")
- div2, __ = ContainerType.objects.get_or_create(label="Div2", txt_idx="DIV2")
- WarehouseDivisionLink.objects.get_or_create(
- warehouse=w, container_type=div1, order=10
- )
- WarehouseDivisionLink.objects.get_or_create(
- warehouse=w, container_type=div2, order=20
- )
- ContainerType.objects.get_or_create(label="CT", txt_idx="CT")
+ self.init_group_import(impt)
nb_base_find = models.BaseFind.objects.count()
nb_find = models.Find.objects.count()
diff --git a/archaeological_operations/tests.py b/archaeological_operations/tests.py
index 423eae635..baffffa26 100644
--- a/archaeological_operations/tests.py
+++ b/archaeological_operations/tests.py
@@ -95,6 +95,7 @@ from archaeological_files.models import File, FileType
from ishtar_common import forms_common
from ishtar_common.tests import (
+ BaseImportTest,
WizardTest,
WizardTestFormData as FormData,
create_superuser,
@@ -158,7 +159,7 @@ class FileInit(object):
self.item.save()
-class ImportTest:
+class ImportTest(BaseImportTest):
def setUp(self):
self.username, self.password, self.user = create_superuser()
self.ishtar_user = IshtarUser.objects.get(pk=self.user.pk)
diff --git a/ishtar_common/models_imports.py b/ishtar_common/models_imports.py
index 85b859a5d..9e505392d 100644
--- a/ishtar_common/models_imports.py
+++ b/ishtar_common/models_imports.py
@@ -2036,6 +2036,8 @@ class Import(BaseImport):
def initialize(self, user=None, session_key=None):
self.state = "AP"
self.end_date = datetime.datetime.now()
+ if user and not self.user:
+ self.user = user
self.save()
try:
self.get_importer_instance().initialize(
diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py
index 8726dc559..952002e89 100644
--- a/ishtar_common/tests.py
+++ b/ishtar_common/tests.py
@@ -62,7 +62,7 @@ from django.test.runner import DiscoverRunner
from django.utils.translation import ugettext_lazy as _
from django.urls import reverse
-from ishtar_common import models, models_common
+from ishtar_common import models, models_common, forms_common
from ishtar_common import views
from ishtar_common.apps import admin_site
from ishtar_common.serializers import (
@@ -2467,7 +2467,8 @@ class ShortMenuTest(TestCase):
self.assertEqual(response.status_code, 200)
-class ImportTest(TestCase):
+class BaseImportTest(TestCase):
+
def create_import(self):
create_user()
imp_model = models.ImporterModel.objects.create(
@@ -2479,7 +2480,7 @@ class ImportTest(TestCase):
LIB_BASE_PATH
+ "archaeological_operations/tests/MCC-operations-example.csv",
dest,
- )
+ )
with open(dest, "rb") as f:
mcc_operation_file = DjangoFile(f)
imprt = models.Import.objects.create(
@@ -2489,6 +2490,72 @@ class ImportTest(TestCase):
)
return imprt
+ def init_group_import(self, impt):
+ impt.initialize()
+ Town = apps.get_model("ishtar_common", "Town")
+ Town.objects.get_or_create(numero_insee="59350", name="Nordweiler")
+ Operation = apps.get_model("archaeological_operations", "Operation")
+ OperationType = apps.get_model("ishtar_common", "OperationType")
+ ope_type = OperationType.objects.create(
+ label="Diag",
+ )
+ Operation.objects.get_or_create(
+ code_patriarche="123456",
+ operation_type=ope_type
+ )
+ Warehouse = apps.get_model("archaeological_warehouse", "Warehouse")
+ WarehouseType = apps.get_model("archaeological_warehouse", "WarehouseType")
+ ContainerType = apps.get_model("archaeological_warehouse", "ContainerType")
+ WarehouseDivisionLink = apps.get_model("archaeological_warehouse", "WarehouseDivisionLink")
+ wt, __ = WarehouseType.objects.get_or_create(label="WT", txt_idx="WT")
+ w, __ = Warehouse.objects.get_or_create(
+ external_id="warh",
+ defaults={"name": "Warehouse test", "warehouse_type": wt},
+ )
+ div1, __ = ContainerType.objects.get_or_create(label="Div1", txt_idx="DIV1")
+ div2, __ = ContainerType.objects.get_or_create(label="Div2", txt_idx="DIV2")
+ WarehouseDivisionLink.objects.get_or_create(
+ warehouse=w, container_type=div1, order=10
+ )
+ WarehouseDivisionLink.objects.get_or_create(
+ warehouse=w, container_type=div2, order=20
+ )
+ ContainerType.objects.get_or_create(label="CT", txt_idx="CT")
+
+ def get_group_import(self):
+ root = os.path.join(settings.LIB_BASE_PATH, "archaeological_finds", "tests")
+ importer_filename = os.path.join(root, "importer-group.zip")
+ restore_serialized(importer_filename)
+ imp_group = models.ImporterGroup.objects.get(slug="chantier-des-depots")
+ imp_file = open(os.path.join(root, "importer-group.csv"), "rb")
+ imp_media = open(os.path.join(root, "importer-group-media.zip"), "rb")
+ return imp_group, imp_file, imp_media
+
+ def create_group_import(self, init=True):
+ imp_group, imp_file, imp_media = self.get_group_import()
+ create_user()
+ ishtar_user = models.IshtarUser.objects.all()[0]
+ file_dict = {
+ "imported_file": SimpleUploadedFile(imp_file.name, imp_file.read()),
+ "imported_images": SimpleUploadedFile(imp_media.name, imp_media.read())
+ }
+ post_dict = {
+ "importer_type": imp_group.pk,
+ "name": "find_group_import",
+ "encoding": "utf-8",
+ "skip_lines": 1,
+ "csv_sep": ",",
+ }
+ form = forms_common.NewImportGroupForm(
+ data=post_dict, files=file_dict, user=ishtar_user
+ )
+ self.assertTrue(form.is_valid())
+ impt = form.save(ishtar_user)
+ if init:
+ impt.initialize(user=ishtar_user)
+ self.init_group_import(impt)
+ return impt
+
def create_importer_model(self):
return models.ImporterModel.objects.create(
klass="ishtar_common.models.Parcel", name="Parcel"
@@ -2497,6 +2564,9 @@ class ImportTest(TestCase):
def create_importer_type(self, imp_model):
return models.ImporterType.objects.create(associated_models=imp_model)
+
+class ImportTest(BaseImportTest):
+
def test_edit_import(self):
username, password, user = create_superuser()
imprt = self.create_import()
@@ -2613,16 +2683,75 @@ class ImportTest(TestCase):
profile.delete_image_zip_on_archive = False
profile.save()
imprt.archive()
- imprt = models.Import.objects.get(pk=imprt.pk)
- self.assertEqual(imprt.state, "AC")
- self.assertFalse(imprt.error_file)
- self.assertFalse(imprt.result_file)
- self.assertFalse(imprt.match_file)
- self.assertTrue(imprt.imported_images)
- self.assertTrue(imprt.archive_file)
- self.assertTrue(zipfile.is_zipfile(imprt.archive_file))
+ group_import = models.ImportGroup.objects.get(pk=imprt.pk)
+ self.assertEqual(group_import.state, "AC")
+ for imprt in group_import.imports.all():
+ self.assertEqual(imprt.state, "AC")
+ self.assertFalse(imprt.error_file)
+ self.assertFalse(imprt.result_file)
+ self.assertFalse(imprt.match_file)
+ self.assertTrue(imprt.imported_images)
+ self.assertTrue(imprt.archive_file)
+ self.assertTrue(zipfile.is_zipfile(imprt.archive_file))
+ with tempfile.TemporaryDirectory() as tmpdir:
+ current_zip = zipfile.ZipFile(imprt.archive_file.path, "r")
+ name_list = current_zip.namelist()
+ self.assertIn("content.json", name_list)
+ current_zip.extract("content.json", tmpdir)
+ content_name = os.path.join(tmpdir, "content.json")
+ with open(content_name, "r") as content:
+ files = json.loads(content.read())
+ self.assertIn("imported_file", files.keys())
+ self.assertIn(files["imported_file"], name_list)
+ self.assertIn("error_file", files.keys())
+ self.assertIn(files["error_file"], name_list)
+ self.assertIn("result_file", files.keys())
+ self.assertIn(files["result_file"], name_list)
+ self.assertIn("match_file", files.keys())
+ self.assertIn(files["match_file"], name_list)
+ rev_dict = {v: k for k, v in files.items()}
+ for name in name_list:
+ current_zip.extract(name, tmpdir)
+ if name.endswith(".txt"):
+ with open(os.path.join(tmpdir, name), "r") as f:
+ self.assertEqual(f.read(), "test" + rev_dict[name])
+ elif name.endswith(".csv"): # imported file
+ with open(os.path.join(tmpdir, name), "r") as f:
+ self.assertEqual(f.read(), csv_content)
+
+ group_import.unarchive("FE")
+ group_import = models.ImportGroup.objects.get(pk=imprt.pk)
+ for imprt in group_import.imports.all():
+ self.assertEqual(imprt.state, "FE")
+ for k in ("error_file", "result_file", "match_file", "imported_images"):
+ field = getattr(imprt, k)
+ self.assertTrue(field, "{} is missing in unarchive".format(k))
+ with open(field.path, "r") as f:
+ self.assertEqual(f.read(), "test" + k)
+ field = getattr(imprt, "imported_file")
+ self.assertTrue(field, "{} is missing in unarchive".format(k))
+ with open(field.path, "r") as f:
+ self.assertEqual(f.read(), csv_content)
+
+ def test_archive_group_import(self):
+ group_import = self.create_group_import()
+ group_import.importation()
+ profile = models.get_current_profile()
+ profile.delete_image_zip_on_archive = False
+ profile.save()
+
+ csv_content = "...."
+ group_import.archive()
+ group_import = models.Import.objects.get(pk=group_import.pk)
+ self.assertEqual(group_import.state, "AC")
+ self.assertFalse(group_import.error_file)
+ self.assertFalse(group_import.result_file)
+ self.assertFalse(group_import.match_file)
+ self.assertTrue(group_import.imported_images)
+ self.assertTrue(group_import.archive_file)
+ self.assertTrue(zipfile.is_zipfile(group_import.archive_file))
with tempfile.TemporaryDirectory() as tmpdir:
- current_zip = zipfile.ZipFile(imprt.archive_file.path, "r")
+ current_zip = zipfile.ZipFile(group_import.archive_file.path, "r")
name_list = current_zip.namelist()
self.assertIn("content.json", name_list)
current_zip.extract("content.json", tmpdir)
@@ -2647,15 +2776,15 @@ class ImportTest(TestCase):
with open(os.path.join(tmpdir, name), "r") as f:
self.assertEqual(f.read(), csv_content)
- imprt.unarchive("FE")
- imprt = models.Import.objects.get(pk=imprt.pk)
- self.assertEqual(imprt.state, "FE")
+ group_import.unarchive("FE")
+ group_import = models.Import.objects.get(pk=group_import.pk)
+ self.assertEqual(group_import.state, "FE")
for k in ("error_file", "result_file", "match_file", "imported_images"):
- field = getattr(imprt, k)
+ field = getattr(group_import, k)
self.assertTrue(field, "{} is missing in unarchive".format(k))
with open(field.path, "r") as f:
self.assertEqual(f.read(), "test" + k)
- field = getattr(imprt, "imported_file")
+ field = getattr(group_import, "imported_file")
self.assertTrue(field, "{} is missing in unarchive".format(k))
with open(field.path, "r") as f:
self.assertEqual(f.read(), csv_content)
@@ -2663,17 +2792,17 @@ class ImportTest(TestCase):
profile = models.get_current_profile()
profile.delete_image_zip_on_archive = True
profile.save()
- imprt = models.Import.objects.get(pk=imprt.pk)
- image_filename = imprt.imported_images.path
+ group_import = models.Import.objects.get(pk=group_import.pk)
+ image_filename = group_import.imported_images.path
self.assertTrue(os.path.isfile(image_filename))
- imprt.archive()
- imprt = models.Import.objects.get(pk=imprt.pk)
- self.assertFalse(imprt.imported_images)
+ group_import.archive()
+ group_import = models.Import.objects.get(pk=group_import.pk)
+ self.assertFalse(group_import.imported_images)
self.assertFalse(os.path.isfile(image_filename))
- imprt.unarchive("F")
- imprt = models.Import.objects.get(pk=imprt.pk)
- self.assertEqual(imprt.state, "FE") # as an error file so state fixed
- self.assertFalse(imprt.imported_images)
+ group_import.unarchive("F")
+ group_import = models.Import.objects.get(pk=group_import.pk)
+ self.assertEqual(group_import.state, "FE") # as an error file so state fixed
+ self.assertFalse(group_import.imported_images)
def test_delete_related(self):
town = models.Town.objects.create(name="my-test")