summaryrefslogtreecommitdiff
path: root/ishtar_common/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'ishtar_common/tests.py')
-rw-r--r--ishtar_common/tests.py466
1 files changed, 389 insertions, 77 deletions
diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py
index 979b38395..f14872295 100644
--- a/ishtar_common/tests.py
+++ b/ishtar_common/tests.py
@@ -37,7 +37,7 @@ from unittest.runner import TextTestRunner, TextTestResult
from django.apps import apps
from django.conf import settings
-from django.contrib.auth.models import User, Permission
+from django.contrib.auth.models import User, Permission, Group
from django.contrib.contenttypes.models import ContentType
from django.contrib.gis.geos import (
GEOSGeometry,
@@ -2474,10 +2474,16 @@ class ShortMenuTest(TestCase):
class BaseImportTest(TestCase):
-
- def create_import(self):
- create_user()
+ def setUp(self):
imp_model = models.ImporterModel.objects.create(
+ klass="ishtar_common.models.Parcel", name="Parcel"
+ )
+ self.importer_type = models.ImporterType.objects.create(associated_models=imp_model)
+
+ def create_import(self, name="My Import", need_user=True):
+ if need_user:
+ create_user()
+ imp_model, __ = models.ImporterModel.objects.get_or_create(
klass="ishtar_common.models.Person", name="Person"
)
importer_type = models.ImporterType.objects.create(associated_models=imp_model)
@@ -2490,6 +2496,7 @@ class BaseImportTest(TestCase):
with open(dest, "rb") as f:
mcc_operation_file = DjangoFile(f)
imprt = models.Import.objects.create(
+ name=name,
user=models.IshtarUser.objects.all()[0],
importer_type=importer_type,
imported_file=mcc_operation_file,
@@ -2554,7 +2561,7 @@ class BaseImportTest(TestCase):
"csv_sep": ",",
}
form = forms_common.NewImportGroupForm(
- data=post_dict, files=file_dict, user=ishtar_user
+ data=post_dict, files=file_dict, user=ishtar_user.user_ptr
)
self.assertTrue(form.is_valid())
impt = form.save(ishtar_user)
@@ -2563,102 +2570,303 @@ class BaseImportTest(TestCase):
self.init_group_import(impt)
return impt
- def create_importer_model(self):
- return models.ImporterModel.objects.create(
- klass="ishtar_common.models.Parcel", name="Parcel"
+
+class ImportTestInterface(BaseImportTest):
+
+ def setUp(self):
+ super().setUp()
+ self.super_username, self.super_password, __ = create_superuser()
+ self.simple_username, self.simple_password, __ = create_user()
+ self.import_username, self.import_password, self.import_user = create_user(
+ "import-user", "password"
)
+ self.import_group = Group.objects.create(name="Import")
+ self.import_user.groups.add(self.import_group)
- def create_importer_type(self, imp_model):
- return models.ImporterType.objects.create(associated_models=imp_model)
+ def superuser_login(self):
+ client = Client()
+ client.login(username=self.super_username, password=self.super_password)
+ return client
+ def simple_login(self):
+ client = Client()
+ client.login(username=self.simple_username, password=self.simple_password)
+ return client
-class ImportTest(BaseImportTest):
+ def import_login(self):
+ client = Client()
+ client.login(username=self.import_username, password=self.import_password)
+ return client
+
+ def set_global_permission(self, imports, codenames):
+ self.import_group.permissions.clear()
+ for imprt in imports:
+ imprt.importer_type.users.clear()
+ if not isinstance(codenames, (list, tuple)):
+ codenames = (codenames,)
+ for codename in codenames:
+ self.import_group.permissions.add(Permission.objects.get(codename=codename))
+
+ def remove_global_permission(self, imports, codenames):
+ self.import_group.permissions.clear()
+ for imprt in imports:
+ imprt.importer_type.users.clear()
+ if not isinstance(codenames, (list, tuple)):
+ codenames = (codenames,)
+ for codename in codenames:
+ perm = Permission.objects.get(codename=codename)
+ if perm in list(self.import_group.permissions.all()):
+ self.import_group.permissions.remove()
+
+ def set_own_permission(self, imports, codenames):
+ self.set_global_permission(imports, codenames)
+ user = models.IshtarUser.objects.get(user_ptr=self.import_user)
+ for imprt in imports:
+ imprt.importer_type.users.add(user)
+
+ def test_list_import(self):
+ imprt = self.create_import()
+ imprt2 = self.create_import(name="My-import-2", need_user=False)
+ # no login and simple user login
+ for client in (Client(), self.simple_login()):
+ response = client.get(reverse("current_imports"))
+ self.assertRedirects(response, '/')
+
+ # superuser
+ client = self.superuser_login()
+ response = client.get(reverse("current_imports"))
+ self.assertEqual(response.status_code, 200)
+ content = response.content.decode()
+ self.assertIn(imprt.name, content)
+ self.assertIn(imprt2.name, content)
- def test_edit_import(self):
- username, password, user = create_superuser()
+ # import user
+ client = self.import_login()
+ response = client.get(reverse("current_imports"))
+ self.assertRedirects(response, '/')
+
+ self.set_global_permission([imprt, imprt2], "change_import")
+ client = self.import_login()
+ response = client.get(reverse("current_imports"))
+ self.assertEqual(response.status_code, 200)
+ content = response.content.decode()
+ self.assertIn(imprt.name, content)
+ self.assertIn(imprt2.name, content)
+ self.remove_global_permission([imprt, imprt2], "change_import")
+
+ self.set_own_permission([imprt], "change_own_import")
+ client = self.import_login()
+ response = client.get(reverse("current_imports"))
+ self.assertEqual(response.status_code, 200)
+ content = response.content.decode()
+ self.assertIn(imprt.name, content)
+ self.assertNotIn(imprt2.name, content,
+ msg="Import 2 unexpectedly found in import list")
+
+ def test_import_action_permission(self):
imprt = self.create_import()
- c = Client()
- c.login(username=username, password=password)
+ imprt2 = self.create_import(name="My-import-2", need_user=False)
+ delete_tag = "<option value='D'>"
- response = c.get(reverse("edit_import", kwargs={"pk": imprt.pk}))
+ # superuser
+ client = self.superuser_login()
+ response = client.get(reverse("current_imports"))
self.assertEqual(response.status_code, 200)
- self.assertContains(response, imprt.importer_type)
- self.assertContains(response, imprt.imported_file)
- self.assertContains(response, imprt.imported_images)
- self.assertContains(response, imprt.encoding)
- self.assertContains(response, imprt.csv_sep)
+ content = response.content.decode()
+ self.assertIn(f"import-action-{imprt.pk}", content)
+ self.assertIn(f"import-action-{imprt2.pk}", content)
+ self.assertIn(delete_tag, content)
+
+ # import user
+ client = self.import_login()
+ response = client.get(reverse("current_imports"))
+ self.assertRedirects(response, '/')
+
+ self.set_global_permission([imprt, imprt2], "change_import")
+ client = self.import_login()
+ response = client.get(reverse("current_imports"))
+ self.assertEqual(response.status_code, 200)
+ content = response.content.decode()
+ self.assertIn(f"import-action-{imprt.pk}", content)
+ self.assertIn(f"import-action-{imprt2.pk}", content)
+ self.assertNotIn(delete_tag, content)
+ self.remove_global_permission([imprt, imprt2], ("change_import",))
+
+ self.set_global_permission([imprt, imprt2], ("change_import", "delete_import"))
+ client = self.import_login()
+ response = client.get(reverse("current_imports"))
+ self.assertEqual(response.status_code, 200)
+ content = response.content.decode()
+ self.assertIn(delete_tag, content)
+ self.remove_global_permission([imprt, imprt2], ("change_import", "delete_import"))
- imp_model = self.create_importer_model()
- importer_type = self.create_importer_type(imp_model)
+ self.set_own_permission([imprt], "change_own_import")
+ client = self.import_login()
+ response = client.get(reverse("current_imports"))
+ self.assertEqual(response.status_code, 200)
+ content = response.content.decode()
+ self.assertIn(f"import-action-{imprt.pk}", content)
+ self.assertNotIn(f"import-action-{imprt2.pk}", content,
+ msg="Import 2 unexpectedly found in import list")
+ self.assertNotIn(delete_tag, content)
+ self.set_own_permission([imprt], ("change_own_import", "change_own_import"))
+ def _test_create_import_get_data(self):
+ csv_path = os.path.join(LIB_BASE_PATH, "ishtar_common", "tests", "insee-test.csv")
+ return {
+ "name": "Test Name",
+ "importer_type": self.importer_type.pk,
+ "imported_file": SimpleUploadedFile(
+ name="insee-test.csv",
+ content=open(csv_path, "rb").read(),
+ ),
+ "encoding": "utf-8",
+ "csv_sep": '|',
+ "skip_lines": 1,
+ }
+
+ def test_create_import(self):
+ # init
+ nb_imports = models.Import.objects.count()
+
+ # no login and simple user login
+ for client in (Client(), self.simple_login()):
+ response = client.get(reverse("new_import"))
+ self.assertRedirects(response, '/')
+ data = self._test_create_import_get_data()
+ response = client.post(reverse("new_import"), data)
+ self.assertRedirects(response, '/')
+ self.assertEqual(nb_imports, models.Import.objects.count())
+
+ # superuser
+ client = self.superuser_login()
+ response = client.get(reverse("new_import"))
+ self.assertEqual(response.status_code, 200)
+ data = self._test_create_import_get_data()
+ response = client.post(reverse("new_import"), data)
+ self.assertEqual(nb_imports + 1, models.Import.objects.count())
+ self.assertRedirects(response, '/import-list/')
+
+ imprt = models.Import.objects.get(name="Test Name")
+ response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk}))
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(imprt.name, "Test Name")
+ self.assertEqual(imprt.importer_type, self.importer_type)
+ self.assertEqual(imprt.encoding, "utf-8")
+ self.assertEqual(imprt.csv_sep, '|')
+
+ # import user
+ for imp in models.Import.objects.all():
+ imp.delete()
+ nb_imports = 0
+
+ client = self.import_login()
+ response = client.get(reverse("new_import"))
+ self.assertRedirects(response, '/')
+
+ self.set_global_permission([], ("add_import", "change_import"))
+ client = self.import_login()
+ response = client.get(reverse("new_import"))
+ self.assertEqual(response.status_code, 200)
+ data = self._test_create_import_get_data()
+ response = client.post(reverse("new_import"), data)
+ self.assertEqual(nb_imports + 1, models.Import.objects.count())
+ self.assertRedirects(response, '/import-list/')
+
+ def test_edit_import(self):
+ # init
+ imprt = self.create_import(name="My import")
+ imprt2 = self.create_import(name="My-import-2", need_user=False)
data = {
"name": "Test Name",
- "importer_type": importer_type.pk,
+ "importer_type": self.importer_type.pk,
"encoding": "utf-8",
"csv_sep": '|',
"skip_lines": 32,
}
- response = c.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data)
- self.assertEqual(response.status_code, 302)
+ # no login and simple user login
+ for client in (Client(), self.simple_login()):
+ response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk}))
+ self.assertRedirects(response, '/')
+ response = client.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data)
+ self.assertRedirects(response, '/')
- response = c.get(reverse("edit_import", kwargs={"pk": imprt.pk}))
+ client = self.superuser_login()
+ response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk}))
self.assertEqual(response.status_code, 200)
- self.assertContains(response, "Test Name")
- self.assertContains(response, str(importer_type))
- self.assertContains(response, "utf-8")
+ self.assertContains(response, imprt.name)
+ self.assertContains(response, imprt.importer_type)
+ self.assertContains(response, imprt.imported_file)
+ self.assertContains(response, imprt.imported_images)
+ self.assertContains(response, imprt.encoding)
+ self.assertContains(response, imprt.csv_sep)
+ response = client.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data)
+ self.assertRedirects(response, '/import-list/')
imprt = models.Import.objects.get(pk=imprt.pk)
self.assertEqual(imprt.name, "Test Name")
- self.assertEqual(imprt.importer_type, importer_type)
+ self.assertEqual(imprt.importer_type, self.importer_type)
self.assertEqual(imprt.encoding, "utf-8")
self.assertEqual(imprt.csv_sep, "|")
self.assertEqual(imprt.skip_lines, 32)
- def test_create_import(self):
- username, password, user = create_superuser()
- c = Client()
- c.login(username=username, password=password)
+ response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk}))
+ self.assertEqual(response.status_code, 200)
+ self.assertContains(response, "Test Name")
+ self.assertContains(response, str(self.importer_type))
+ self.assertContains(response, "utf-8")
- imp_model = self.create_importer_model()
- importer_type = self.create_importer_type(imp_model)
- csv_path = os.path.join(LIB_BASE_PATH, "ishtar_common", "tests", "insee-test.csv")
+ # import user
+ imprt.delete()
+ imprt2.delete()
+ imprt = self.create_import(name="My import")
+ imprt2 = self.create_import(name="My-import-2", need_user=False)
+
+ client = self.import_login()
+ response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk}))
+ self.assertRedirects(response, '/')
+
+ # import user global permission
+ self.set_global_permission([imprt, imprt2], "change_import")
+ client = self.import_login()
+ response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk}))
+ self.assertEqual(response.status_code, 200)
+ self.assertContains(response, imprt.name)
- data = {
- "name": "Test Name",
- "importer_type": importer_type.pk,
- "imported_file": SimpleUploadedFile(
- name="insee-test.csv",
- content=open(csv_path, "rb").read(),
- ),
- "encoding": "utf-8",
- "csv_sep": '|',
- "skip_lines": 1,
- }
+ response = client.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data)
+ self.assertRedirects(response, '/import-list/')
+ imprt = models.Import.objects.get(pk=imprt.pk)
+ self.assertEqual(imprt.name, "Test Name")
- response = c.post(reverse("new_import"), data)
- self.assertEqual(response.status_code, 302)
+ self.remove_global_permission([imprt, imprt2], ("change_import",))
- imprt = models.Import.objects.get(name="Test Name")
- response = c.get(reverse("edit_import", kwargs={"pk": imprt.pk}))
+ # import user own permission
+ imprt.delete()
+ imprt2.delete()
+ imprt = self.create_import(name="My import")
+ imprt2 = self.create_import(name="My-import-2", need_user=False)
+ self.set_own_permission([imprt], "change_own_import")
+ client = self.import_login()
+ response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk}))
self.assertEqual(response.status_code, 200)
+ self.assertContains(response, imprt.name)
+
+ response = client.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data)
+ self.assertRedirects(response, '/import-list/')
+ imprt = models.Import.objects.get(pk=imprt.pk)
self.assertEqual(imprt.name, "Test Name")
- self.assertEqual(imprt.importer_type, importer_type)
- self.assertEqual(imprt.encoding, "utf-8")
- self.assertEqual(imprt.csv_sep, '|')
- def test_validation_zip_import_image(self):
- username, password, user = create_superuser()
- c = Client()
- c.login(username=username, password=password)
+ response = client.get(reverse("edit_import", kwargs={"pk": imprt2.pk}))
+ self.assertRedirects(response, '/')
- imp_model = self.create_importer_model()
- importer_type = self.create_importer_type(imp_model)
+ def test_validation_zip_import_image(self):
+ # init
image_path = os.path.join(LIB_BASE_PATH, "ishtar_common", "tests", "test.png")
data = {
"name": "Import Zip Not Valid Must Fail",
- "importer_type": importer_type.pk,
+ "importer_type": self.importer_type.pk,
"encoding": "utf-8",
"csv_sep": "|",
"skip_lines": 1,
@@ -2668,13 +2876,130 @@ class ImportTest(BaseImportTest):
content_type="image/png",
),
}
- response = c.post(reverse("new_import"), data)
+
+ # superuser
+ client = self.superuser_login()
+ response = client.post(reverse("new_import"), data)
expected = str(
_('"Associated images" field must be a valid zip file.')
).replace('"', '&quot;')
self.assertIn(expected, response.content.decode("utf-8"))
self.assertEqual(response.status_code, 200)
+ def test_display_csv(self):
+ # init
+ imprt = self.create_import()
+ imprt2 = self.create_import(name="My-import-2", need_user=False)
+ url = reverse("import_display_csv", args=["source", "", imprt.pk])
+ url2 = reverse("import_display_csv", args=["source", "", imprt2.pk])
+
+ # no login
+ client = Client()
+ response = client.get(url)
+ self.assertRedirects(response, "/")
+
+ # superuser
+ client = self.superuser_login()
+ response = client.get(url)
+ self.assertEqual(response.status_code, 200)
+ response = client.get(url2)
+ self.assertEqual(response.status_code, 200)
+
+ # simple login
+ client = self.simple_login()
+ response = client.get(url)
+ self.assertRedirects(response, "/")
+
+ # import user global permission
+ self.set_global_permission([imprt, imprt2], "view_import")
+ client = self.import_login()
+ response = client.get(url)
+ self.assertEqual(response.status_code, 200)
+ response = client.get(url2)
+ self.assertEqual(response.status_code, 200)
+ self.remove_global_permission([imprt, imprt2], ("view_import",))
+
+ # import user own permission
+ self.set_own_permission([imprt], "view_own_import")
+ client = self.import_login()
+ response = client.get(url)
+ self.assertEqual(response.status_code, 200)
+ response = client.get(url2)
+ self.assertRedirects(response, "/")
+
+ def test_ignore_errors(self):
+ # init
+ imprt = self.create_import()
+ imprt2 = self.create_import(name="My-import-2", need_user=False)
+
+ path = os.path.join(
+ LIB_BASE_PATH, "ishtar_common", "tests", "error-file.csv"
+ )
+ imprt.error_file = SimpleUploadedFile(name="error-file.csv", content=open(path, "rb").read(), content_type="text/csv")
+ imprt.save()
+ imprt2.error_file = SimpleUploadedFile(name="error-file.csv", content=open(path, "rb").read(), content_type="text/csv")
+ imprt2.save()
+
+ q = models.ImportLineError.objects.filter(import_item=imprt.pk)
+ self.assertEqual(q.count(), 2)
+ ignored_line = q.all()[0]
+ url = reverse("import_ignore_line", args=[ignored_line.pk])
+ q = models.ImportLineError.objects.filter(import_item=imprt2.pk)
+ self.assertEqual(q.count(), 2)
+ ignored_line2 = q.all()[0]
+ url2 = reverse("import_ignore_line", args=[ignored_line2.pk])
+
+ # no login
+ client = Client()
+ response = client.get(url)
+ self.assertRedirects(response, "/")
+
+ # superuser
+ client = self.superuser_login()
+ response = client.get(url)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line.pk).ignored, True)
+ ignored_line.ignored = False
+ ignored_line.save()
+ response = client.get(url2)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line2.pk).ignored, True)
+ ignored_line2.ignored = False
+ ignored_line2.save()
+
+ # simple login
+ client = self.simple_login()
+ response = client.get(url)
+ self.assertRedirects(response, "/")
+
+ # import user global permission
+ self.set_global_permission([imprt, imprt2], "change_import")
+ client = self.import_login()
+ response = client.get(url)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line.pk).ignored, True)
+ ignored_line.ignored = False
+ ignored_line.save()
+ response = client.get(url2)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line2.pk).ignored, True)
+ ignored_line2.ignored = False
+ ignored_line2.save()
+ self.remove_global_permission([imprt, imprt2], ("change_import",))
+
+ # import user own permission
+ self.set_own_permission([imprt], "change_own_import")
+ client = self.import_login()
+ response = client.get(url)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line.pk).ignored, True)
+ self.assertEqual(response.status_code, 200)
+ response = client.get(url2)
+ self.assertEqual(response.status_code, 404)
+ self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line2.pk).ignored, False)
+
+
+class ImportTest(BaseImportTest):
def test_archive_import(self):
imprt = self.create_import()
with open(imprt.imported_file.path, "r") as f:
@@ -2796,19 +3121,6 @@ class ImportTest(BaseImportTest):
field = getattr(sub_import, k)
self.assertTrue(field, "{} is missing in unarchive".format(k))
- def test_display_csv(self):
- imprt = self.create_import()
- username, password, __ = create_user("test", "test")
- c = Client()
- c.login(username=username, password=password)
- url = "import_display_csv"
- response = c.get(reverse(url, args=["source", "", imprt.pk]), )
- self.assertEqual(response.status_code, 404)
- username, password, __ = create_superuser()
- c.login(username=username, password=password)
- response = c.get(reverse(url, args=["source", "", imprt.pk]))
- self.assertEqual(response.status_code, 200)
-
def test_delete_related(self):
town = models.Town.objects.create(name="my-test")
self.assertEqual(models.Town.objects.filter(name="my-test").count(), 1)