diff options
| -rw-r--r-- | archaeological_finds/tests.py | 28 | ||||
| -rw-r--r-- | archaeological_operations/tests.py | 3 | ||||
| -rw-r--r-- | ishtar_common/models_imports.py | 2 | ||||
| -rw-r--r-- | ishtar_common/tests.py | 181 | 
4 files changed, 161 insertions, 53 deletions
| diff --git a/archaeological_finds/tests.py b/archaeological_finds/tests.py index b0a0af7dc..312d1385f 100644 --- a/archaeological_finds/tests.py +++ b/archaeological_finds/tests.py @@ -696,11 +696,7 @@ class ImportFindTest(ImportTest, FindInit, TestCase):          self.assertEqual(base_find.main_geodata_id, new[0].pk)      def test_group_import(self): -        root = os.path.join(settings.LIB_BASE_PATH, "archaeological_finds", "tests") -        importer_filename = os.path.join(root, "importer-group.zip") -        self.restore_serialized(importer_filename) -        imp_group = ImporterGroup.objects.get(slug="chantier-des-depots") -        imp_file = open(os.path.join(root, "importer-group.csv"), "rb") +        imp_group, imp_file, imp_media = self.get_group_import()          file_dict = {              "imported_file": SimpleUploadedFile(imp_file.name, imp_file.read())          } @@ -716,33 +712,13 @@ class ImportFindTest(ImportTest, FindInit, TestCase):          )          self.assertFalse(form.is_valid())          self.assertIn(str(_("This importer need a document archive.")), form.errors["__all__"]) -        imp_media = open(os.path.join(root, "importer-group-media.zip"), "rb")          file_dict["imported_images"] = SimpleUploadedFile(imp_media.name, imp_media.read())          form = forms_common.NewImportGroupForm(              data=post_dict, files=file_dict, user=self.user          )          self.assertTrue(form.is_valid())          impt = form.save(self.ishtar_user) -        impt.initialize() - -        Operation.objects.get_or_create( -            code_patriarche="123456", -            operation_type=OperationType.objects.all()[0] -        ) -        wt, __ = WarehouseType.objects.get_or_create(label="WT", txt_idx="WT") -        w, __ = Warehouse.objects.get_or_create( -            external_id="warh", -            defaults={"name": "Warehouse test", "warehouse_type": wt}, -        ) -        div1, __ = ContainerType.objects.get_or_create(label="Div1", txt_idx="DIV1") -        div2, __ = ContainerType.objects.get_or_create(label="Div2", txt_idx="DIV2") -        WarehouseDivisionLink.objects.get_or_create( -            warehouse=w, container_type=div1, order=10 -        ) -        WarehouseDivisionLink.objects.get_or_create( -            warehouse=w, container_type=div2, order=20 -        ) -        ContainerType.objects.get_or_create(label="CT", txt_idx="CT") +        self.init_group_import(impt)          nb_base_find = models.BaseFind.objects.count()          nb_find = models.Find.objects.count() diff --git a/archaeological_operations/tests.py b/archaeological_operations/tests.py index 423eae635..baffffa26 100644 --- a/archaeological_operations/tests.py +++ b/archaeological_operations/tests.py @@ -95,6 +95,7 @@ from archaeological_files.models import File, FileType  from ishtar_common import forms_common  from ishtar_common.tests import ( +    BaseImportTest,      WizardTest,      WizardTestFormData as FormData,      create_superuser, @@ -158,7 +159,7 @@ class FileInit(object):          self.item.save() -class ImportTest: +class ImportTest(BaseImportTest):      def setUp(self):          self.username, self.password, self.user = create_superuser()          self.ishtar_user = IshtarUser.objects.get(pk=self.user.pk) diff --git a/ishtar_common/models_imports.py b/ishtar_common/models_imports.py index 85b859a5d..9e505392d 100644 --- a/ishtar_common/models_imports.py +++ b/ishtar_common/models_imports.py @@ -2036,6 +2036,8 @@ class Import(BaseImport):      def initialize(self, user=None, session_key=None):          self.state = "AP"          self.end_date = datetime.datetime.now() +        if user and not self.user: +            self.user = user          self.save()          try:              self.get_importer_instance().initialize( diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py index 8726dc559..952002e89 100644 --- a/ishtar_common/tests.py +++ b/ishtar_common/tests.py @@ -62,7 +62,7 @@ from django.test.runner import DiscoverRunner  from django.utils.translation import ugettext_lazy as _  from django.urls import reverse -from ishtar_common import models, models_common +from ishtar_common import models, models_common, forms_common  from ishtar_common import views  from ishtar_common.apps import admin_site  from ishtar_common.serializers import ( @@ -2467,7 +2467,8 @@ class ShortMenuTest(TestCase):          self.assertEqual(response.status_code, 200) -class ImportTest(TestCase): +class BaseImportTest(TestCase): +      def create_import(self):          create_user()          imp_model = models.ImporterModel.objects.create( @@ -2479,7 +2480,7 @@ class ImportTest(TestCase):              LIB_BASE_PATH              + "archaeological_operations/tests/MCC-operations-example.csv",              dest, -        ) +            )          with open(dest, "rb") as f:              mcc_operation_file = DjangoFile(f)              imprt = models.Import.objects.create( @@ -2489,6 +2490,72 @@ class ImportTest(TestCase):              )          return imprt +    def init_group_import(self, impt): +        impt.initialize() +        Town = apps.get_model("ishtar_common", "Town") +        Town.objects.get_or_create(numero_insee="59350", name="Nordweiler") +        Operation = apps.get_model("archaeological_operations", "Operation") +        OperationType = apps.get_model("ishtar_common", "OperationType") +        ope_type = OperationType.objects.create( +            label="Diag", +        ) +        Operation.objects.get_or_create( +            code_patriarche="123456", +            operation_type=ope_type +        ) +        Warehouse = apps.get_model("archaeological_warehouse", "Warehouse") +        WarehouseType = apps.get_model("archaeological_warehouse", "WarehouseType") +        ContainerType = apps.get_model("archaeological_warehouse", "ContainerType") +        WarehouseDivisionLink = apps.get_model("archaeological_warehouse", "WarehouseDivisionLink") +        wt, __ = WarehouseType.objects.get_or_create(label="WT", txt_idx="WT") +        w, __ = Warehouse.objects.get_or_create( +            external_id="warh", +            defaults={"name": "Warehouse test", "warehouse_type": wt}, +        ) +        div1, __ = ContainerType.objects.get_or_create(label="Div1", txt_idx="DIV1") +        div2, __ = ContainerType.objects.get_or_create(label="Div2", txt_idx="DIV2") +        WarehouseDivisionLink.objects.get_or_create( +            warehouse=w, container_type=div1, order=10 +        ) +        WarehouseDivisionLink.objects.get_or_create( +            warehouse=w, container_type=div2, order=20 +        ) +        ContainerType.objects.get_or_create(label="CT", txt_idx="CT") + +    def get_group_import(self): +        root = os.path.join(settings.LIB_BASE_PATH, "archaeological_finds", "tests") +        importer_filename = os.path.join(root, "importer-group.zip") +        restore_serialized(importer_filename) +        imp_group = models.ImporterGroup.objects.get(slug="chantier-des-depots") +        imp_file = open(os.path.join(root, "importer-group.csv"), "rb") +        imp_media = open(os.path.join(root, "importer-group-media.zip"), "rb") +        return imp_group, imp_file, imp_media + +    def create_group_import(self, init=True): +        imp_group, imp_file, imp_media = self.get_group_import() +        create_user() +        ishtar_user = models.IshtarUser.objects.all()[0] +        file_dict = { +            "imported_file": SimpleUploadedFile(imp_file.name, imp_file.read()), +            "imported_images": SimpleUploadedFile(imp_media.name, imp_media.read()) +        } +        post_dict = { +            "importer_type": imp_group.pk, +            "name": "find_group_import", +            "encoding": "utf-8", +            "skip_lines": 1, +            "csv_sep": ",", +        } +        form = forms_common.NewImportGroupForm( +            data=post_dict, files=file_dict, user=ishtar_user +        ) +        self.assertTrue(form.is_valid()) +        impt = form.save(ishtar_user) +        if init: +            impt.initialize(user=ishtar_user) +            self.init_group_import(impt) +        return impt +      def create_importer_model(self):          return models.ImporterModel.objects.create(              klass="ishtar_common.models.Parcel", name="Parcel" @@ -2497,6 +2564,9 @@ class ImportTest(TestCase):      def create_importer_type(self, imp_model):          return models.ImporterType.objects.create(associated_models=imp_model) + +class ImportTest(BaseImportTest): +      def test_edit_import(self):          username, password, user = create_superuser()          imprt = self.create_import() @@ -2613,16 +2683,75 @@ class ImportTest(TestCase):          profile.delete_image_zip_on_archive = False          profile.save()          imprt.archive() -        imprt = models.Import.objects.get(pk=imprt.pk) -        self.assertEqual(imprt.state, "AC") -        self.assertFalse(imprt.error_file) -        self.assertFalse(imprt.result_file) -        self.assertFalse(imprt.match_file) -        self.assertTrue(imprt.imported_images) -        self.assertTrue(imprt.archive_file) -        self.assertTrue(zipfile.is_zipfile(imprt.archive_file)) +        group_import = models.ImportGroup.objects.get(pk=imprt.pk) +        self.assertEqual(group_import.state, "AC") +        for imprt in group_import.imports.all(): +            self.assertEqual(imprt.state, "AC") +            self.assertFalse(imprt.error_file) +            self.assertFalse(imprt.result_file) +            self.assertFalse(imprt.match_file) +            self.assertTrue(imprt.imported_images) +            self.assertTrue(imprt.archive_file) +            self.assertTrue(zipfile.is_zipfile(imprt.archive_file)) +            with tempfile.TemporaryDirectory() as tmpdir: +                current_zip = zipfile.ZipFile(imprt.archive_file.path, "r") +                name_list = current_zip.namelist() +                self.assertIn("content.json", name_list) +                current_zip.extract("content.json", tmpdir) +                content_name = os.path.join(tmpdir, "content.json") +                with open(content_name, "r") as content: +                    files = json.loads(content.read()) +                self.assertIn("imported_file", files.keys()) +                self.assertIn(files["imported_file"], name_list) +                self.assertIn("error_file", files.keys()) +                self.assertIn(files["error_file"], name_list) +                self.assertIn("result_file", files.keys()) +                self.assertIn(files["result_file"], name_list) +                self.assertIn("match_file", files.keys()) +                self.assertIn(files["match_file"], name_list) +                rev_dict = {v: k for k, v in files.items()} +                for name in name_list: +                    current_zip.extract(name, tmpdir) +                    if name.endswith(".txt"): +                        with open(os.path.join(tmpdir, name), "r") as f: +                            self.assertEqual(f.read(), "test" + rev_dict[name]) +                    elif name.endswith(".csv"):  # imported file +                        with open(os.path.join(tmpdir, name), "r") as f: +                            self.assertEqual(f.read(), csv_content) + +        group_import.unarchive("FE") +        group_import = models.ImportGroup.objects.get(pk=imprt.pk) +        for imprt in group_import.imports.all(): +            self.assertEqual(imprt.state, "FE") +            for k in ("error_file", "result_file", "match_file", "imported_images"): +                field = getattr(imprt, k) +                self.assertTrue(field, "{} is missing in unarchive".format(k)) +                with open(field.path, "r") as f: +                    self.assertEqual(f.read(), "test" + k) +            field = getattr(imprt, "imported_file") +            self.assertTrue(field, "{} is missing in unarchive".format(k)) +            with open(field.path, "r") as f: +                self.assertEqual(f.read(), csv_content) + +    def test_archive_group_import(self): +        group_import = self.create_group_import() +        group_import.importation() +        profile = models.get_current_profile() +        profile.delete_image_zip_on_archive = False +        profile.save() + +        csv_content = "...." +        group_import.archive() +        group_import = models.Import.objects.get(pk=group_import.pk) +        self.assertEqual(group_import.state, "AC") +        self.assertFalse(group_import.error_file) +        self.assertFalse(group_import.result_file) +        self.assertFalse(group_import.match_file) +        self.assertTrue(group_import.imported_images) +        self.assertTrue(group_import.archive_file) +        self.assertTrue(zipfile.is_zipfile(group_import.archive_file))          with tempfile.TemporaryDirectory() as tmpdir: -            current_zip = zipfile.ZipFile(imprt.archive_file.path, "r") +            current_zip = zipfile.ZipFile(group_import.archive_file.path, "r")              name_list = current_zip.namelist()              self.assertIn("content.json", name_list)              current_zip.extract("content.json", tmpdir) @@ -2647,15 +2776,15 @@ class ImportTest(TestCase):                      with open(os.path.join(tmpdir, name), "r") as f:                          self.assertEqual(f.read(), csv_content) -        imprt.unarchive("FE") -        imprt = models.Import.objects.get(pk=imprt.pk) -        self.assertEqual(imprt.state, "FE") +        group_import.unarchive("FE") +        group_import = models.Import.objects.get(pk=group_import.pk) +        self.assertEqual(group_import.state, "FE")          for k in ("error_file", "result_file", "match_file", "imported_images"): -            field = getattr(imprt, k) +            field = getattr(group_import, k)              self.assertTrue(field, "{} is missing in unarchive".format(k))              with open(field.path, "r") as f:                  self.assertEqual(f.read(), "test" + k) -        field = getattr(imprt, "imported_file") +        field = getattr(group_import, "imported_file")          self.assertTrue(field, "{} is missing in unarchive".format(k))          with open(field.path, "r") as f:              self.assertEqual(f.read(), csv_content) @@ -2663,17 +2792,17 @@ class ImportTest(TestCase):          profile = models.get_current_profile()          profile.delete_image_zip_on_archive = True          profile.save() -        imprt = models.Import.objects.get(pk=imprt.pk) -        image_filename = imprt.imported_images.path +        group_import = models.Import.objects.get(pk=group_import.pk) +        image_filename = group_import.imported_images.path          self.assertTrue(os.path.isfile(image_filename)) -        imprt.archive() -        imprt = models.Import.objects.get(pk=imprt.pk) -        self.assertFalse(imprt.imported_images) +        group_import.archive() +        group_import = models.Import.objects.get(pk=group_import.pk) +        self.assertFalse(group_import.imported_images)          self.assertFalse(os.path.isfile(image_filename)) -        imprt.unarchive("F") -        imprt = models.Import.objects.get(pk=imprt.pk) -        self.assertEqual(imprt.state, "FE")  # as an error file so state fixed -        self.assertFalse(imprt.imported_images) +        group_import.unarchive("F") +        group_import = models.Import.objects.get(pk=group_import.pk) +        self.assertEqual(group_import.state, "FE")  # as an error file so state fixed +        self.assertFalse(group_import.imported_images)      def test_delete_related(self):          town = models.Town.objects.create(name="my-test") | 
