diff options
author | Étienne Loks <etienne.loks@iggdrasil.net> | 2023-10-26 17:03:41 +0200 |
---|---|---|
committer | Étienne Loks <etienne.loks@iggdrasil.net> | 2024-02-05 10:51:52 +0100 |
commit | 5507389c115a472308beff61c0c3172509a9545c (patch) | |
tree | aca7334a68dccb2922d9a0e14011df88283acdb5 /ishtar_common/tests.py | |
parent | 8c6e4ec4124ca8e2c03d816c72ca706808f40f02 (diff) | |
download | Ishtar-5507389c115a472308beff61c0c3172509a9545c.tar.bz2 Ishtar-5507389c115a472308beff61c0c3172509a9545c.zip |
✨ refactoring import permissions
Diffstat (limited to 'ishtar_common/tests.py')
-rw-r--r-- | ishtar_common/tests.py | 466 |
1 files changed, 389 insertions, 77 deletions
diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py index 979b38395..f14872295 100644 --- a/ishtar_common/tests.py +++ b/ishtar_common/tests.py @@ -37,7 +37,7 @@ from unittest.runner import TextTestRunner, TextTestResult from django.apps import apps from django.conf import settings -from django.contrib.auth.models import User, Permission +from django.contrib.auth.models import User, Permission, Group from django.contrib.contenttypes.models import ContentType from django.contrib.gis.geos import ( GEOSGeometry, @@ -2474,10 +2474,16 @@ class ShortMenuTest(TestCase): class BaseImportTest(TestCase): - - def create_import(self): - create_user() + def setUp(self): imp_model = models.ImporterModel.objects.create( + klass="ishtar_common.models.Parcel", name="Parcel" + ) + self.importer_type = models.ImporterType.objects.create(associated_models=imp_model) + + def create_import(self, name="My Import", need_user=True): + if need_user: + create_user() + imp_model, __ = models.ImporterModel.objects.get_or_create( klass="ishtar_common.models.Person", name="Person" ) importer_type = models.ImporterType.objects.create(associated_models=imp_model) @@ -2490,6 +2496,7 @@ class BaseImportTest(TestCase): with open(dest, "rb") as f: mcc_operation_file = DjangoFile(f) imprt = models.Import.objects.create( + name=name, user=models.IshtarUser.objects.all()[0], importer_type=importer_type, imported_file=mcc_operation_file, @@ -2554,7 +2561,7 @@ class BaseImportTest(TestCase): "csv_sep": ",", } form = forms_common.NewImportGroupForm( - data=post_dict, files=file_dict, user=ishtar_user + data=post_dict, files=file_dict, user=ishtar_user.user_ptr ) self.assertTrue(form.is_valid()) impt = form.save(ishtar_user) @@ -2563,102 +2570,303 @@ class BaseImportTest(TestCase): self.init_group_import(impt) return impt - def create_importer_model(self): - return models.ImporterModel.objects.create( - klass="ishtar_common.models.Parcel", name="Parcel" + +class ImportTestInterface(BaseImportTest): + + def setUp(self): + super().setUp() + self.super_username, self.super_password, __ = create_superuser() + self.simple_username, self.simple_password, __ = create_user() + self.import_username, self.import_password, self.import_user = create_user( + "import-user", "password" ) + self.import_group = Group.objects.create(name="Import") + self.import_user.groups.add(self.import_group) - def create_importer_type(self, imp_model): - return models.ImporterType.objects.create(associated_models=imp_model) + def superuser_login(self): + client = Client() + client.login(username=self.super_username, password=self.super_password) + return client + def simple_login(self): + client = Client() + client.login(username=self.simple_username, password=self.simple_password) + return client -class ImportTest(BaseImportTest): + def import_login(self): + client = Client() + client.login(username=self.import_username, password=self.import_password) + return client + + def set_global_permission(self, imports, codenames): + self.import_group.permissions.clear() + for imprt in imports: + imprt.importer_type.users.clear() + if not isinstance(codenames, (list, tuple)): + codenames = (codenames,) + for codename in codenames: + self.import_group.permissions.add(Permission.objects.get(codename=codename)) + + def remove_global_permission(self, imports, codenames): + self.import_group.permissions.clear() + for imprt in imports: + imprt.importer_type.users.clear() + if not isinstance(codenames, (list, tuple)): + codenames = (codenames,) + for codename in codenames: + perm = Permission.objects.get(codename=codename) + if perm in list(self.import_group.permissions.all()): + self.import_group.permissions.remove() + + def set_own_permission(self, imports, codenames): + self.set_global_permission(imports, codenames) + user = models.IshtarUser.objects.get(user_ptr=self.import_user) + for imprt in imports: + imprt.importer_type.users.add(user) + + def test_list_import(self): + imprt = self.create_import() + imprt2 = self.create_import(name="My-import-2", need_user=False) + # no login and simple user login + for client in (Client(), self.simple_login()): + response = client.get(reverse("current_imports")) + self.assertRedirects(response, '/') + + # superuser + client = self.superuser_login() + response = client.get(reverse("current_imports")) + self.assertEqual(response.status_code, 200) + content = response.content.decode() + self.assertIn(imprt.name, content) + self.assertIn(imprt2.name, content) - def test_edit_import(self): - username, password, user = create_superuser() + # import user + client = self.import_login() + response = client.get(reverse("current_imports")) + self.assertRedirects(response, '/') + + self.set_global_permission([imprt, imprt2], "change_import") + client = self.import_login() + response = client.get(reverse("current_imports")) + self.assertEqual(response.status_code, 200) + content = response.content.decode() + self.assertIn(imprt.name, content) + self.assertIn(imprt2.name, content) + self.remove_global_permission([imprt, imprt2], "change_import") + + self.set_own_permission([imprt], "change_own_import") + client = self.import_login() + response = client.get(reverse("current_imports")) + self.assertEqual(response.status_code, 200) + content = response.content.decode() + self.assertIn(imprt.name, content) + self.assertNotIn(imprt2.name, content, + msg="Import 2 unexpectedly found in import list") + + def test_import_action_permission(self): imprt = self.create_import() - c = Client() - c.login(username=username, password=password) + imprt2 = self.create_import(name="My-import-2", need_user=False) + delete_tag = "<option value='D'>" - response = c.get(reverse("edit_import", kwargs={"pk": imprt.pk})) + # superuser + client = self.superuser_login() + response = client.get(reverse("current_imports")) self.assertEqual(response.status_code, 200) - self.assertContains(response, imprt.importer_type) - self.assertContains(response, imprt.imported_file) - self.assertContains(response, imprt.imported_images) - self.assertContains(response, imprt.encoding) - self.assertContains(response, imprt.csv_sep) + content = response.content.decode() + self.assertIn(f"import-action-{imprt.pk}", content) + self.assertIn(f"import-action-{imprt2.pk}", content) + self.assertIn(delete_tag, content) + + # import user + client = self.import_login() + response = client.get(reverse("current_imports")) + self.assertRedirects(response, '/') + + self.set_global_permission([imprt, imprt2], "change_import") + client = self.import_login() + response = client.get(reverse("current_imports")) + self.assertEqual(response.status_code, 200) + content = response.content.decode() + self.assertIn(f"import-action-{imprt.pk}", content) + self.assertIn(f"import-action-{imprt2.pk}", content) + self.assertNotIn(delete_tag, content) + self.remove_global_permission([imprt, imprt2], ("change_import",)) + + self.set_global_permission([imprt, imprt2], ("change_import", "delete_import")) + client = self.import_login() + response = client.get(reverse("current_imports")) + self.assertEqual(response.status_code, 200) + content = response.content.decode() + self.assertIn(delete_tag, content) + self.remove_global_permission([imprt, imprt2], ("change_import", "delete_import")) - imp_model = self.create_importer_model() - importer_type = self.create_importer_type(imp_model) + self.set_own_permission([imprt], "change_own_import") + client = self.import_login() + response = client.get(reverse("current_imports")) + self.assertEqual(response.status_code, 200) + content = response.content.decode() + self.assertIn(f"import-action-{imprt.pk}", content) + self.assertNotIn(f"import-action-{imprt2.pk}", content, + msg="Import 2 unexpectedly found in import list") + self.assertNotIn(delete_tag, content) + self.set_own_permission([imprt], ("change_own_import", "change_own_import")) + def _test_create_import_get_data(self): + csv_path = os.path.join(LIB_BASE_PATH, "ishtar_common", "tests", "insee-test.csv") + return { + "name": "Test Name", + "importer_type": self.importer_type.pk, + "imported_file": SimpleUploadedFile( + name="insee-test.csv", + content=open(csv_path, "rb").read(), + ), + "encoding": "utf-8", + "csv_sep": '|', + "skip_lines": 1, + } + + def test_create_import(self): + # init + nb_imports = models.Import.objects.count() + + # no login and simple user login + for client in (Client(), self.simple_login()): + response = client.get(reverse("new_import")) + self.assertRedirects(response, '/') + data = self._test_create_import_get_data() + response = client.post(reverse("new_import"), data) + self.assertRedirects(response, '/') + self.assertEqual(nb_imports, models.Import.objects.count()) + + # superuser + client = self.superuser_login() + response = client.get(reverse("new_import")) + self.assertEqual(response.status_code, 200) + data = self._test_create_import_get_data() + response = client.post(reverse("new_import"), data) + self.assertEqual(nb_imports + 1, models.Import.objects.count()) + self.assertRedirects(response, '/import-list/') + + imprt = models.Import.objects.get(name="Test Name") + response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk})) + self.assertEqual(response.status_code, 200) + self.assertEqual(imprt.name, "Test Name") + self.assertEqual(imprt.importer_type, self.importer_type) + self.assertEqual(imprt.encoding, "utf-8") + self.assertEqual(imprt.csv_sep, '|') + + # import user + for imp in models.Import.objects.all(): + imp.delete() + nb_imports = 0 + + client = self.import_login() + response = client.get(reverse("new_import")) + self.assertRedirects(response, '/') + + self.set_global_permission([], ("add_import", "change_import")) + client = self.import_login() + response = client.get(reverse("new_import")) + self.assertEqual(response.status_code, 200) + data = self._test_create_import_get_data() + response = client.post(reverse("new_import"), data) + self.assertEqual(nb_imports + 1, models.Import.objects.count()) + self.assertRedirects(response, '/import-list/') + + def test_edit_import(self): + # init + imprt = self.create_import(name="My import") + imprt2 = self.create_import(name="My-import-2", need_user=False) data = { "name": "Test Name", - "importer_type": importer_type.pk, + "importer_type": self.importer_type.pk, "encoding": "utf-8", "csv_sep": '|', "skip_lines": 32, } - response = c.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data) - self.assertEqual(response.status_code, 302) + # no login and simple user login + for client in (Client(), self.simple_login()): + response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk})) + self.assertRedirects(response, '/') + response = client.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data) + self.assertRedirects(response, '/') - response = c.get(reverse("edit_import", kwargs={"pk": imprt.pk})) + client = self.superuser_login() + response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk})) self.assertEqual(response.status_code, 200) - self.assertContains(response, "Test Name") - self.assertContains(response, str(importer_type)) - self.assertContains(response, "utf-8") + self.assertContains(response, imprt.name) + self.assertContains(response, imprt.importer_type) + self.assertContains(response, imprt.imported_file) + self.assertContains(response, imprt.imported_images) + self.assertContains(response, imprt.encoding) + self.assertContains(response, imprt.csv_sep) + response = client.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data) + self.assertRedirects(response, '/import-list/') imprt = models.Import.objects.get(pk=imprt.pk) self.assertEqual(imprt.name, "Test Name") - self.assertEqual(imprt.importer_type, importer_type) + self.assertEqual(imprt.importer_type, self.importer_type) self.assertEqual(imprt.encoding, "utf-8") self.assertEqual(imprt.csv_sep, "|") self.assertEqual(imprt.skip_lines, 32) - def test_create_import(self): - username, password, user = create_superuser() - c = Client() - c.login(username=username, password=password) + response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk})) + self.assertEqual(response.status_code, 200) + self.assertContains(response, "Test Name") + self.assertContains(response, str(self.importer_type)) + self.assertContains(response, "utf-8") - imp_model = self.create_importer_model() - importer_type = self.create_importer_type(imp_model) - csv_path = os.path.join(LIB_BASE_PATH, "ishtar_common", "tests", "insee-test.csv") + # import user + imprt.delete() + imprt2.delete() + imprt = self.create_import(name="My import") + imprt2 = self.create_import(name="My-import-2", need_user=False) + + client = self.import_login() + response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk})) + self.assertRedirects(response, '/') + + # import user global permission + self.set_global_permission([imprt, imprt2], "change_import") + client = self.import_login() + response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk})) + self.assertEqual(response.status_code, 200) + self.assertContains(response, imprt.name) - data = { - "name": "Test Name", - "importer_type": importer_type.pk, - "imported_file": SimpleUploadedFile( - name="insee-test.csv", - content=open(csv_path, "rb").read(), - ), - "encoding": "utf-8", - "csv_sep": '|', - "skip_lines": 1, - } + response = client.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data) + self.assertRedirects(response, '/import-list/') + imprt = models.Import.objects.get(pk=imprt.pk) + self.assertEqual(imprt.name, "Test Name") - response = c.post(reverse("new_import"), data) - self.assertEqual(response.status_code, 302) + self.remove_global_permission([imprt, imprt2], ("change_import",)) - imprt = models.Import.objects.get(name="Test Name") - response = c.get(reverse("edit_import", kwargs={"pk": imprt.pk})) + # import user own permission + imprt.delete() + imprt2.delete() + imprt = self.create_import(name="My import") + imprt2 = self.create_import(name="My-import-2", need_user=False) + self.set_own_permission([imprt], "change_own_import") + client = self.import_login() + response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk})) self.assertEqual(response.status_code, 200) + self.assertContains(response, imprt.name) + + response = client.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data) + self.assertRedirects(response, '/import-list/') + imprt = models.Import.objects.get(pk=imprt.pk) self.assertEqual(imprt.name, "Test Name") - self.assertEqual(imprt.importer_type, importer_type) - self.assertEqual(imprt.encoding, "utf-8") - self.assertEqual(imprt.csv_sep, '|') - def test_validation_zip_import_image(self): - username, password, user = create_superuser() - c = Client() - c.login(username=username, password=password) + response = client.get(reverse("edit_import", kwargs={"pk": imprt2.pk})) + self.assertRedirects(response, '/') - imp_model = self.create_importer_model() - importer_type = self.create_importer_type(imp_model) + def test_validation_zip_import_image(self): + # init image_path = os.path.join(LIB_BASE_PATH, "ishtar_common", "tests", "test.png") data = { "name": "Import Zip Not Valid Must Fail", - "importer_type": importer_type.pk, + "importer_type": self.importer_type.pk, "encoding": "utf-8", "csv_sep": "|", "skip_lines": 1, @@ -2668,13 +2876,130 @@ class ImportTest(BaseImportTest): content_type="image/png", ), } - response = c.post(reverse("new_import"), data) + + # superuser + client = self.superuser_login() + response = client.post(reverse("new_import"), data) expected = str( _('"Associated images" field must be a valid zip file.') ).replace('"', '"') self.assertIn(expected, response.content.decode("utf-8")) self.assertEqual(response.status_code, 200) + def test_display_csv(self): + # init + imprt = self.create_import() + imprt2 = self.create_import(name="My-import-2", need_user=False) + url = reverse("import_display_csv", args=["source", "", imprt.pk]) + url2 = reverse("import_display_csv", args=["source", "", imprt2.pk]) + + # no login + client = Client() + response = client.get(url) + self.assertRedirects(response, "/") + + # superuser + client = self.superuser_login() + response = client.get(url) + self.assertEqual(response.status_code, 200) + response = client.get(url2) + self.assertEqual(response.status_code, 200) + + # simple login + client = self.simple_login() + response = client.get(url) + self.assertRedirects(response, "/") + + # import user global permission + self.set_global_permission([imprt, imprt2], "view_import") + client = self.import_login() + response = client.get(url) + self.assertEqual(response.status_code, 200) + response = client.get(url2) + self.assertEqual(response.status_code, 200) + self.remove_global_permission([imprt, imprt2], ("view_import",)) + + # import user own permission + self.set_own_permission([imprt], "view_own_import") + client = self.import_login() + response = client.get(url) + self.assertEqual(response.status_code, 200) + response = client.get(url2) + self.assertRedirects(response, "/") + + def test_ignore_errors(self): + # init + imprt = self.create_import() + imprt2 = self.create_import(name="My-import-2", need_user=False) + + path = os.path.join( + LIB_BASE_PATH, "ishtar_common", "tests", "error-file.csv" + ) + imprt.error_file = SimpleUploadedFile(name="error-file.csv", content=open(path, "rb").read(), content_type="text/csv") + imprt.save() + imprt2.error_file = SimpleUploadedFile(name="error-file.csv", content=open(path, "rb").read(), content_type="text/csv") + imprt2.save() + + q = models.ImportLineError.objects.filter(import_item=imprt.pk) + self.assertEqual(q.count(), 2) + ignored_line = q.all()[0] + url = reverse("import_ignore_line", args=[ignored_line.pk]) + q = models.ImportLineError.objects.filter(import_item=imprt2.pk) + self.assertEqual(q.count(), 2) + ignored_line2 = q.all()[0] + url2 = reverse("import_ignore_line", args=[ignored_line2.pk]) + + # no login + client = Client() + response = client.get(url) + self.assertRedirects(response, "/") + + # superuser + client = self.superuser_login() + response = client.get(url) + self.assertEqual(response.status_code, 200) + self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line.pk).ignored, True) + ignored_line.ignored = False + ignored_line.save() + response = client.get(url2) + self.assertEqual(response.status_code, 200) + self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line2.pk).ignored, True) + ignored_line2.ignored = False + ignored_line2.save() + + # simple login + client = self.simple_login() + response = client.get(url) + self.assertRedirects(response, "/") + + # import user global permission + self.set_global_permission([imprt, imprt2], "change_import") + client = self.import_login() + response = client.get(url) + self.assertEqual(response.status_code, 200) + self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line.pk).ignored, True) + ignored_line.ignored = False + ignored_line.save() + response = client.get(url2) + self.assertEqual(response.status_code, 200) + self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line2.pk).ignored, True) + ignored_line2.ignored = False + ignored_line2.save() + self.remove_global_permission([imprt, imprt2], ("change_import",)) + + # import user own permission + self.set_own_permission([imprt], "change_own_import") + client = self.import_login() + response = client.get(url) + self.assertEqual(response.status_code, 200) + self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line.pk).ignored, True) + self.assertEqual(response.status_code, 200) + response = client.get(url2) + self.assertEqual(response.status_code, 404) + self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line2.pk).ignored, False) + + +class ImportTest(BaseImportTest): def test_archive_import(self): imprt = self.create_import() with open(imprt.imported_file.path, "r") as f: @@ -2796,19 +3121,6 @@ class ImportTest(BaseImportTest): field = getattr(sub_import, k) self.assertTrue(field, "{} is missing in unarchive".format(k)) - def test_display_csv(self): - imprt = self.create_import() - username, password, __ = create_user("test", "test") - c = Client() - c.login(username=username, password=password) - url = "import_display_csv" - response = c.get(reverse(url, args=["source", "", imprt.pk]), ) - self.assertEqual(response.status_code, 404) - username, password, __ = create_superuser() - c.login(username=username, password=password) - response = c.get(reverse(url, args=["source", "", imprt.pk])) - self.assertEqual(response.status_code, 200) - def test_delete_related(self): town = models.Town.objects.create(name="my-test") self.assertEqual(models.Town.objects.filter(name="my-test").count(), 1) |