diff options
author | Étienne Loks <etienne.loks@iggdrasil.net> | 2023-10-26 17:03:41 +0200 |
---|---|---|
committer | Étienne Loks <etienne.loks@iggdrasil.net> | 2024-04-16 16:38:32 +0200 |
commit | f4f482cd4074898f5344a3a078e27800bbd060fd (patch) | |
tree | 46a317f0f5de7b0177206ac5b965be794ff2b2af /ishtar_common | |
parent | e008dd87b2eafd88cec3d75d0b3b4c92ce891f23 (diff) | |
download | Ishtar-f4f482cd4074898f5344a3a078e27800bbd060fd.tar.bz2 Ishtar-f4f482cd4074898f5344a3a078e27800bbd060fd.zip |
✨ refactoring import permissions
Diffstat (limited to 'ishtar_common')
-rw-r--r-- | ishtar_common/fixtures/initial_data-fr.json | 11 | ||||
-rw-r--r-- | ishtar_common/forms_common.py | 33 | ||||
-rw-r--r-- | ishtar_common/migrations/0231_default_mandatory_keys.py | 38 | ||||
-rw-r--r-- | ishtar_common/migrations/0231_default_mandatory_keys_import_permissions.py | 116 | ||||
-rw-r--r-- | ishtar_common/models_common.py | 3 | ||||
-rw-r--r-- | ishtar_common/models_imports.py | 67 | ||||
-rw-r--r-- | ishtar_common/templates/ishtar/import_list.html | 4 | ||||
-rw-r--r-- | ishtar_common/templates/ishtar/import_table.html | 6 | ||||
-rw-r--r-- | ishtar_common/tests.py | 466 | ||||
-rw-r--r-- | ishtar_common/tests/error-file.csv | 3 | ||||
-rw-r--r-- | ishtar_common/urls.py | 28 | ||||
-rw-r--r-- | ishtar_common/views.py | 147 |
12 files changed, 730 insertions, 192 deletions
diff --git a/ishtar_common/fixtures/initial_data-fr.json b/ishtar_common/fixtures/initial_data-fr.json index f3c539d3c..980ed3d1a 100644 --- a/ishtar_common/fixtures/initial_data-fr.json +++ b/ishtar_common/fixtures/initial_data-fr.json @@ -301,7 +301,16 @@ "Auteurs : lecture" ], [ - "Import : ajout/modification/suppression" + "Imports : lecture" + ], + [ + "Imports : modification" + ], + [ + "Imports : suppression" + ], + [ + "Imports : ajout" ] ] } diff --git a/ishtar_common/forms_common.py b/ishtar_common/forms_common.py index dd3a83971..0d739fbfb 100644 --- a/ishtar_common/forms_common.py +++ b/ishtar_common/forms_common.py @@ -246,19 +246,31 @@ class BaseImportForm(IshtarForm, forms.ModelForm): super().__init__(*args, **kwargs) self.fields["imported_file"].required = True self._filter_group(user) - self._filter_importer_type() + self._filter_importer_type(user) if "imported_images" in self.fields: self.fields["imported_images"].validators = [file_size_validator] self.fields["imported_file"].validators = [file_size_validator] self._post_init() - def _filter_importer_type(self): - self.fields["importer_type"].choices = [("", "--")] + [ - (imp.pk, imp.name) - for imp in models.ImporterType.objects.filter( + def _filter_importer_type_query(self, q, user): + if user.is_superuser or user.ishtaruser.has_right("add_import"): + return q + if not user.ishtaruser.has_right("add_own_import"): + self.fields["importer_type"].choices = [("", "--")] + return + q = q.filter(users__pk=user.ishtaruser.pk) + return q + + def _filter_importer_type(self, user): + q = models.ImporterType.objects.filter( available=True, is_import=True, type=self.importer_type - ) + ) + q = self._filter_importer_type_query(q, user) + if not q: + return + self.fields["importer_type"].choices = [("", "--")] + [ + (imp.pk, imp.name) for imp in q.all() ] def _filter_group(self, user): @@ -511,10 +523,13 @@ class NewImportGroupForm(NewImportForm): "imported_images": FormHeader(_("Documents/Images")), } - def _filter_importer_type(self): + def _filter_importer_type(self, user): + q = models.ImporterGroup.objects.filter(available=True) + q = self._filter_importer_type_query(q, user) + if not q: + return self.fields["importer_type"].choices = [("", "--")] + [ - (imp.pk, imp.name) - for imp in models.ImporterGroup.objects.filter(available=True) + (imp.pk, imp.name) for imp in q.all() ] def _filter_group(self, user): diff --git a/ishtar_common/migrations/0231_default_mandatory_keys.py b/ishtar_common/migrations/0231_default_mandatory_keys.py deleted file mode 100644 index cd245322a..000000000 --- a/ishtar_common/migrations/0231_default_mandatory_keys.py +++ /dev/null @@ -1,38 +0,0 @@ -# Generated by Django 2.2.24 on 2023-09-18 17:05 - -from django.db import migrations - -EXCLUDE_LIST = [ - "-", - "auto_external_id", - "spatial_reference_system", - "public_domain", -] - -FULL_COPY_LIST = [ - "scientist__attached_to", -] - - -def migrate(apps, __): - ImporterDefault = apps.get_model('ishtar_common', 'ImporterDefault') - for default in ImporterDefault.objects.all(): - if default.target not in EXCLUDE_LIST: - req = default.target - if req not in FULL_COPY_LIST: - req = req.split("__")[0] - if req.endswith("_type") or req.endswith("_types"): - continue - default.required_fields = req - default.save() - - -class Migration(migrations.Migration): - - dependencies = [ - ('ishtar_common', '0230_auto_20231024_1045'), - ] - - operations = [ - migrations.RunPython(migrate), - ] diff --git a/ishtar_common/migrations/0231_default_mandatory_keys_import_permissions.py b/ishtar_common/migrations/0231_default_mandatory_keys_import_permissions.py new file mode 100644 index 000000000..120711f09 --- /dev/null +++ b/ishtar_common/migrations/0231_default_mandatory_keys_import_permissions.py @@ -0,0 +1,116 @@ +# Generated by Django 2.2.24 on 2023-09-18 17:05 + +from django.db import migrations + +COLOR_WARNING = "\033[93m" +COLOR_ENDC = "\033[0m" + +EXCLUDE_LIST = [ + "-", + "auto_external_id", + "spatial_reference_system", + "public_domain", +] + +FULL_COPY_LIST = [ + "scientist__attached_to", +] + +GROUPS = [ + [ + "Imports : lecture", + "view_import", + "ishtar_common", + "import" + ], + [ + "Imports rattachés : lecture", + "view_own_import", + "ishtar_common", + "import" + ], + [ + "Imports : modification", + "change_import", + "ishtar_common", + "import" + ], + [ + "Imports rattachés : modification", + "change_own_import", + "ishtar_common", + "import" + ], + [ + "Imports : suppression", + "delete_import", + "ishtar_common", + "import" + ], + [ + "Imports rattachés : suppression", + "delete_own_import", + "ishtar_common", + "import" + ], + [ + "Imports : ajout", + "add_import", + "ishtar_common", + "import" + ], + [ + "Imports rattachés : ajout", + "add_own_import", + "ishtar_common", + "import" + ], +] + + +def migrate(apps, __): + ImporterDefault = apps.get_model('ishtar_common', 'ImporterDefault') + for default in ImporterDefault.objects.all(): + if default.target not in EXCLUDE_LIST: + req = default.target + if req not in FULL_COPY_LIST: + req = req.split("__")[0] + if req.endswith("_type") or req.endswith("_types"): + continue + default.required_fields = req + default.save() + + print("") + ProfileType = apps.get_model('ishtar_common', 'ProfileType') + q = ProfileType.objects.filter(txt_idx="administrator") + administrator = None + if not q.count(): + print(COLOR_WARNING + "** No administrator profile found. **" + COLOR_ENDC) + else: + administrator = q.all()[0] + + Permission = apps.get_model("auth", "Permission") + Group = apps.get_model("auth", "Group") + ContentType = apps.get_model("contenttypes", "ContentType") + for name, codename, app, model in GROUPS: + ct, __ = ContentType.objects.get_or_create(app_label=app, model=model) + perm, __ = Permission.objects.get_or_create( + codename=codename, defaults={"name": name, "content_type": ct} + ) + group, __ = Group.objects.get_or_create(name=name) + group.permissions.add(perm) + if administrator: + administrator.groups.add(group) + + print(COLOR_WARNING + "** Verify import permissions in profiles **" + COLOR_ENDC) + + +class Migration(migrations.Migration): + + dependencies = [ + ('ishtar_common', '0230_auto_20231024_1045'), + ] + + operations = [ + migrations.RunPython(migrate), + ] diff --git a/ishtar_common/models_common.py b/ishtar_common/models_common.py index 43677bca4..dcb9d030d 100644 --- a/ishtar_common/models_common.py +++ b/ishtar_common/models_common.py @@ -1111,9 +1111,10 @@ class Imported(models.Model): if not user.ishtaruser: return [] q = getattr(self, key) + lst = [] if user.is_superuser or user.ishtaruser.has_right("view_import"): lst = list(q.all()) - else: + elif user.ishtaruser.has_right("view_own_import"): lst = q.filter(Q(user=user.ishtaruser) | Q(importer_type__users__pk=user.ishtaruser.pk)) new_lst = [] for imprt in lst: diff --git a/ishtar_common/models_imports.py b/ishtar_common/models_imports.py index 77dad558e..2967c18aa 100644 --- a/ishtar_common/models_imports.py +++ b/ishtar_common/models_imports.py @@ -225,6 +225,10 @@ class ImporterType(models.Model): def __str__(self): return self.name + @classmethod + def is_own(cls, ishtar_user): + return bool(cls.objects.filter(users__pk=ishtar_user.pk).count()) + @property def type_label(self): if self.type in IMPORT_TYPES_DICT: @@ -445,6 +449,10 @@ class ImporterGroup(models.Model): def __str__(self): return self.name + @classmethod + def is_own(cls, ishtar_user): + return bool(cls.objects.filter(users__pk=ishtar_user.pk).count()) + @property def importer_types_label(self) -> str: return " ; ".join([imp.importer_type.name for imp in self.importer_types.all()]) @@ -1413,18 +1421,35 @@ class BaseImport(models.Model, OwnPerms, SheetItem): abstract = True @classmethod - def query_can_access(cls, user): + def get_permissions_for_actions(cls, user, session): + if not hasattr(user, "ishtaruser") or not user.ishtaruser: + return False, False, False, False + can_edit_all, can_delete_all, can_edit_own, can_delete_own = False, False, False, False + if user.is_superuser: + can_edit_all = True + can_delete_all = True + if user.ishtaruser.has_right("change_import", session=session): + can_edit_all = True + elif user.ishtaruser.has_right("change_own_import", session=session): + can_edit_own = True + if user.ishtaruser.has_right("delete_import", session=session): + can_delete_all = True + elif user.ishtaruser.has_right("delete_own_import", session=session): + can_delete_own = True + return can_edit_all, can_delete_all, can_edit_own, can_delete_own + + @classmethod + def query_can_access(cls, user, perm="view_import"): """ Filter the query to check access permissions :param user: User instance :return: import query """ q = cls.objects - if user.is_superuser: + if user.is_superuser or (hasattr(user, "ishtaruser") and user.ishtaruser and + user.ishtaruser.has_right(perm)): return q - IshtarUser = apps.get_model("ishtar_common", "IshtarUser") - ishtar_user = IshtarUser.objects.get(pk=user.pk) - q = q.filter(Q(user=ishtar_user) | Q(importer_type__users__pk=ishtar_user.pk)) + q = q.filter(Q(importer_type__users__pk=user.ishtaruser.pk)) return q @classmethod @@ -1557,25 +1582,28 @@ class ImportGroup(BaseImport): return "" return IMPORT_GROUP_STATE_DCT[str(self.state)] - def get_actions(self): + def get_actions(self, can_edit=False, can_delete=False): """ Get available action relevant with the current status """ actions = [] - if self.state == "C": + if not can_edit and not can_delete: + return actions + if can_edit and self.state == "C": actions.append(("A", _("Analyse"))) - if self.state == "A": + if can_edit and self.state == "A": actions.append(("A", _("Re-analyse"))) if not any(-1 for imp in self.import_list() if not imp.pre_import_form_is_valid): actions.append(("I", _("Launch import"))) - if self.state in ("F", "FE"): + if can_edit and self.state in ("F", "FE"): actions.append(("A", _("Re-analyse"))) actions.append(("I", _("Re-import"))) actions.append(("AC", _("Archive"))) - if self.state == "AC": + if can_edit and self.state == "AC": state = "FE" if any(1 for imp in self.imports.all() if imp.error_file) else "F" actions.append((state, _("Unarchive"))) - actions.append(("D", _("Delete"))) + if can_delete: + actions.append(("D", _("Delete"))) return actions def initialize(self, user=None, session_key=None): @@ -2203,16 +2231,18 @@ class Import(BaseImport): idx_line ) in self.imported_line_numbers.split(",") - def get_actions(self): + def get_actions(self, can_edit=False, can_delete=False): """ Get available action relevant with the current status """ IshtarSiteProfile = apps.get_model("ishtar_common", "IshtarSiteProfile") profile = IshtarSiteProfile.get_current_profile() actions = [] - if self.state == "C": + if not can_edit and not can_delete: + return actions + if can_edit and self.state == "C": actions.append(("A", _("Analyse"))) - if self.state in ("A", "PI"): + if can_edit and self.state in ("A", "PI"): actions.append(("A", _("Re-analyse"))) if self.pre_import_form_is_valid: actions.append(("I", _("Launch import"))) @@ -2222,7 +2252,7 @@ class Import(BaseImport): actions.append(("CH", _("Re-check for changes"))) else: actions.append(("CH", _("Check for changes"))) - if self.state in ("F", "FE"): + if can_edit and self.state in ("F", "FE"): actions.append(("A", _("Re-analyse"))) actions.append(("I", _("Re-import"))) if profile.experimental_feature: @@ -2232,12 +2262,13 @@ class Import(BaseImport): else: actions.append(("CH", _("Check for changes"))) actions.append(("AC", _("Archive"))) - if self.state == "AC": + if can_edit and self.state == "AC": state = "FE" if self.error_file else "F" actions.append((state, _("Unarchive"))) - if self.state in ("C", "A"): + if can_delete and self.state in ("C", "A"): actions.append(("ED", _("Edit"))) - actions.append(("D", _("Delete"))) + if can_delete: + actions.append(("D", _("Delete"))) return actions @property diff --git a/ishtar_common/templates/ishtar/import_list.html b/ishtar_common/templates/ishtar/import_list.html index 35821c5bf..ec5e714de 100644 --- a/ishtar_common/templates/ishtar/import_list.html +++ b/ishtar_common/templates/ishtar/import_list.html @@ -16,7 +16,7 @@ {% endblock %} {% block content %} -<div class="text-center m-3"> +{% if can_create_import %}<div class="text-center m-3"> {% if has_import_table %}<a href="{% url 'new_import' %}" class="btn btn-success"> <i class="fa fa-plus"></i> {% trans 'import (table)' %} </a>{% endif %} @@ -26,7 +26,7 @@ {% if has_import_group %}<a href="{% url 'new_import_group' %}" class="btn btn-success"> <i class="fa fa-plus"></i> {% trans 'import (group)' %} </a>{% endif %} -</div> +</div>{% endif %} <div id="import-container"> {% include "ishtar/import_table.html" %} </div> diff --git a/ishtar_common/templates/ishtar/import_table.html b/ishtar_common/templates/ishtar/import_table.html index 08c949654..56e73c6f0 100644 --- a/ishtar_common/templates/ishtar/import_table.html +++ b/ishtar_common/templates/ishtar/import_table.html @@ -78,14 +78,18 @@ {{import.status}} </td> <td> + {% if import.action_list %} <select class="form-control" id='import-action-{{import.import_id}}' name='import-action-{{import.import_id}}'> <option value=''>--------</option> - {% for action, lbl in import.get_actions %} + {% for action, lbl in import.action_list %} <option value='{{action}}'>{{lbl}}</option> {% endfor%} </select> + {% else %} + – + {% endif %} </td> <td><ul class="simple table-import-files"> {% if import.get_imported_values %}<li class="p-1"> diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py index 979b38395..f14872295 100644 --- a/ishtar_common/tests.py +++ b/ishtar_common/tests.py @@ -37,7 +37,7 @@ from unittest.runner import TextTestRunner, TextTestResult from django.apps import apps from django.conf import settings -from django.contrib.auth.models import User, Permission +from django.contrib.auth.models import User, Permission, Group from django.contrib.contenttypes.models import ContentType from django.contrib.gis.geos import ( GEOSGeometry, @@ -2474,10 +2474,16 @@ class ShortMenuTest(TestCase): class BaseImportTest(TestCase): - - def create_import(self): - create_user() + def setUp(self): imp_model = models.ImporterModel.objects.create( + klass="ishtar_common.models.Parcel", name="Parcel" + ) + self.importer_type = models.ImporterType.objects.create(associated_models=imp_model) + + def create_import(self, name="My Import", need_user=True): + if need_user: + create_user() + imp_model, __ = models.ImporterModel.objects.get_or_create( klass="ishtar_common.models.Person", name="Person" ) importer_type = models.ImporterType.objects.create(associated_models=imp_model) @@ -2490,6 +2496,7 @@ class BaseImportTest(TestCase): with open(dest, "rb") as f: mcc_operation_file = DjangoFile(f) imprt = models.Import.objects.create( + name=name, user=models.IshtarUser.objects.all()[0], importer_type=importer_type, imported_file=mcc_operation_file, @@ -2554,7 +2561,7 @@ class BaseImportTest(TestCase): "csv_sep": ",", } form = forms_common.NewImportGroupForm( - data=post_dict, files=file_dict, user=ishtar_user + data=post_dict, files=file_dict, user=ishtar_user.user_ptr ) self.assertTrue(form.is_valid()) impt = form.save(ishtar_user) @@ -2563,102 +2570,303 @@ class BaseImportTest(TestCase): self.init_group_import(impt) return impt - def create_importer_model(self): - return models.ImporterModel.objects.create( - klass="ishtar_common.models.Parcel", name="Parcel" + +class ImportTestInterface(BaseImportTest): + + def setUp(self): + super().setUp() + self.super_username, self.super_password, __ = create_superuser() + self.simple_username, self.simple_password, __ = create_user() + self.import_username, self.import_password, self.import_user = create_user( + "import-user", "password" ) + self.import_group = Group.objects.create(name="Import") + self.import_user.groups.add(self.import_group) - def create_importer_type(self, imp_model): - return models.ImporterType.objects.create(associated_models=imp_model) + def superuser_login(self): + client = Client() + client.login(username=self.super_username, password=self.super_password) + return client + def simple_login(self): + client = Client() + client.login(username=self.simple_username, password=self.simple_password) + return client -class ImportTest(BaseImportTest): + def import_login(self): + client = Client() + client.login(username=self.import_username, password=self.import_password) + return client + + def set_global_permission(self, imports, codenames): + self.import_group.permissions.clear() + for imprt in imports: + imprt.importer_type.users.clear() + if not isinstance(codenames, (list, tuple)): + codenames = (codenames,) + for codename in codenames: + self.import_group.permissions.add(Permission.objects.get(codename=codename)) + + def remove_global_permission(self, imports, codenames): + self.import_group.permissions.clear() + for imprt in imports: + imprt.importer_type.users.clear() + if not isinstance(codenames, (list, tuple)): + codenames = (codenames,) + for codename in codenames: + perm = Permission.objects.get(codename=codename) + if perm in list(self.import_group.permissions.all()): + self.import_group.permissions.remove() + + def set_own_permission(self, imports, codenames): + self.set_global_permission(imports, codenames) + user = models.IshtarUser.objects.get(user_ptr=self.import_user) + for imprt in imports: + imprt.importer_type.users.add(user) + + def test_list_import(self): + imprt = self.create_import() + imprt2 = self.create_import(name="My-import-2", need_user=False) + # no login and simple user login + for client in (Client(), self.simple_login()): + response = client.get(reverse("current_imports")) + self.assertRedirects(response, '/') + + # superuser + client = self.superuser_login() + response = client.get(reverse("current_imports")) + self.assertEqual(response.status_code, 200) + content = response.content.decode() + self.assertIn(imprt.name, content) + self.assertIn(imprt2.name, content) - def test_edit_import(self): - username, password, user = create_superuser() + # import user + client = self.import_login() + response = client.get(reverse("current_imports")) + self.assertRedirects(response, '/') + + self.set_global_permission([imprt, imprt2], "change_import") + client = self.import_login() + response = client.get(reverse("current_imports")) + self.assertEqual(response.status_code, 200) + content = response.content.decode() + self.assertIn(imprt.name, content) + self.assertIn(imprt2.name, content) + self.remove_global_permission([imprt, imprt2], "change_import") + + self.set_own_permission([imprt], "change_own_import") + client = self.import_login() + response = client.get(reverse("current_imports")) + self.assertEqual(response.status_code, 200) + content = response.content.decode() + self.assertIn(imprt.name, content) + self.assertNotIn(imprt2.name, content, + msg="Import 2 unexpectedly found in import list") + + def test_import_action_permission(self): imprt = self.create_import() - c = Client() - c.login(username=username, password=password) + imprt2 = self.create_import(name="My-import-2", need_user=False) + delete_tag = "<option value='D'>" - response = c.get(reverse("edit_import", kwargs={"pk": imprt.pk})) + # superuser + client = self.superuser_login() + response = client.get(reverse("current_imports")) self.assertEqual(response.status_code, 200) - self.assertContains(response, imprt.importer_type) - self.assertContains(response, imprt.imported_file) - self.assertContains(response, imprt.imported_images) - self.assertContains(response, imprt.encoding) - self.assertContains(response, imprt.csv_sep) + content = response.content.decode() + self.assertIn(f"import-action-{imprt.pk}", content) + self.assertIn(f"import-action-{imprt2.pk}", content) + self.assertIn(delete_tag, content) + + # import user + client = self.import_login() + response = client.get(reverse("current_imports")) + self.assertRedirects(response, '/') + + self.set_global_permission([imprt, imprt2], "change_import") + client = self.import_login() + response = client.get(reverse("current_imports")) + self.assertEqual(response.status_code, 200) + content = response.content.decode() + self.assertIn(f"import-action-{imprt.pk}", content) + self.assertIn(f"import-action-{imprt2.pk}", content) + self.assertNotIn(delete_tag, content) + self.remove_global_permission([imprt, imprt2], ("change_import",)) + + self.set_global_permission([imprt, imprt2], ("change_import", "delete_import")) + client = self.import_login() + response = client.get(reverse("current_imports")) + self.assertEqual(response.status_code, 200) + content = response.content.decode() + self.assertIn(delete_tag, content) + self.remove_global_permission([imprt, imprt2], ("change_import", "delete_import")) - imp_model = self.create_importer_model() - importer_type = self.create_importer_type(imp_model) + self.set_own_permission([imprt], "change_own_import") + client = self.import_login() + response = client.get(reverse("current_imports")) + self.assertEqual(response.status_code, 200) + content = response.content.decode() + self.assertIn(f"import-action-{imprt.pk}", content) + self.assertNotIn(f"import-action-{imprt2.pk}", content, + msg="Import 2 unexpectedly found in import list") + self.assertNotIn(delete_tag, content) + self.set_own_permission([imprt], ("change_own_import", "change_own_import")) + def _test_create_import_get_data(self): + csv_path = os.path.join(LIB_BASE_PATH, "ishtar_common", "tests", "insee-test.csv") + return { + "name": "Test Name", + "importer_type": self.importer_type.pk, + "imported_file": SimpleUploadedFile( + name="insee-test.csv", + content=open(csv_path, "rb").read(), + ), + "encoding": "utf-8", + "csv_sep": '|', + "skip_lines": 1, + } + + def test_create_import(self): + # init + nb_imports = models.Import.objects.count() + + # no login and simple user login + for client in (Client(), self.simple_login()): + response = client.get(reverse("new_import")) + self.assertRedirects(response, '/') + data = self._test_create_import_get_data() + response = client.post(reverse("new_import"), data) + self.assertRedirects(response, '/') + self.assertEqual(nb_imports, models.Import.objects.count()) + + # superuser + client = self.superuser_login() + response = client.get(reverse("new_import")) + self.assertEqual(response.status_code, 200) + data = self._test_create_import_get_data() + response = client.post(reverse("new_import"), data) + self.assertEqual(nb_imports + 1, models.Import.objects.count()) + self.assertRedirects(response, '/import-list/') + + imprt = models.Import.objects.get(name="Test Name") + response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk})) + self.assertEqual(response.status_code, 200) + self.assertEqual(imprt.name, "Test Name") + self.assertEqual(imprt.importer_type, self.importer_type) + self.assertEqual(imprt.encoding, "utf-8") + self.assertEqual(imprt.csv_sep, '|') + + # import user + for imp in models.Import.objects.all(): + imp.delete() + nb_imports = 0 + + client = self.import_login() + response = client.get(reverse("new_import")) + self.assertRedirects(response, '/') + + self.set_global_permission([], ("add_import", "change_import")) + client = self.import_login() + response = client.get(reverse("new_import")) + self.assertEqual(response.status_code, 200) + data = self._test_create_import_get_data() + response = client.post(reverse("new_import"), data) + self.assertEqual(nb_imports + 1, models.Import.objects.count()) + self.assertRedirects(response, '/import-list/') + + def test_edit_import(self): + # init + imprt = self.create_import(name="My import") + imprt2 = self.create_import(name="My-import-2", need_user=False) data = { "name": "Test Name", - "importer_type": importer_type.pk, + "importer_type": self.importer_type.pk, "encoding": "utf-8", "csv_sep": '|', "skip_lines": 32, } - response = c.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data) - self.assertEqual(response.status_code, 302) + # no login and simple user login + for client in (Client(), self.simple_login()): + response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk})) + self.assertRedirects(response, '/') + response = client.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data) + self.assertRedirects(response, '/') - response = c.get(reverse("edit_import", kwargs={"pk": imprt.pk})) + client = self.superuser_login() + response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk})) self.assertEqual(response.status_code, 200) - self.assertContains(response, "Test Name") - self.assertContains(response, str(importer_type)) - self.assertContains(response, "utf-8") + self.assertContains(response, imprt.name) + self.assertContains(response, imprt.importer_type) + self.assertContains(response, imprt.imported_file) + self.assertContains(response, imprt.imported_images) + self.assertContains(response, imprt.encoding) + self.assertContains(response, imprt.csv_sep) + response = client.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data) + self.assertRedirects(response, '/import-list/') imprt = models.Import.objects.get(pk=imprt.pk) self.assertEqual(imprt.name, "Test Name") - self.assertEqual(imprt.importer_type, importer_type) + self.assertEqual(imprt.importer_type, self.importer_type) self.assertEqual(imprt.encoding, "utf-8") self.assertEqual(imprt.csv_sep, "|") self.assertEqual(imprt.skip_lines, 32) - def test_create_import(self): - username, password, user = create_superuser() - c = Client() - c.login(username=username, password=password) + response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk})) + self.assertEqual(response.status_code, 200) + self.assertContains(response, "Test Name") + self.assertContains(response, str(self.importer_type)) + self.assertContains(response, "utf-8") - imp_model = self.create_importer_model() - importer_type = self.create_importer_type(imp_model) - csv_path = os.path.join(LIB_BASE_PATH, "ishtar_common", "tests", "insee-test.csv") + # import user + imprt.delete() + imprt2.delete() + imprt = self.create_import(name="My import") + imprt2 = self.create_import(name="My-import-2", need_user=False) + + client = self.import_login() + response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk})) + self.assertRedirects(response, '/') + + # import user global permission + self.set_global_permission([imprt, imprt2], "change_import") + client = self.import_login() + response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk})) + self.assertEqual(response.status_code, 200) + self.assertContains(response, imprt.name) - data = { - "name": "Test Name", - "importer_type": importer_type.pk, - "imported_file": SimpleUploadedFile( - name="insee-test.csv", - content=open(csv_path, "rb").read(), - ), - "encoding": "utf-8", - "csv_sep": '|', - "skip_lines": 1, - } + response = client.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data) + self.assertRedirects(response, '/import-list/') + imprt = models.Import.objects.get(pk=imprt.pk) + self.assertEqual(imprt.name, "Test Name") - response = c.post(reverse("new_import"), data) - self.assertEqual(response.status_code, 302) + self.remove_global_permission([imprt, imprt2], ("change_import",)) - imprt = models.Import.objects.get(name="Test Name") - response = c.get(reverse("edit_import", kwargs={"pk": imprt.pk})) + # import user own permission + imprt.delete() + imprt2.delete() + imprt = self.create_import(name="My import") + imprt2 = self.create_import(name="My-import-2", need_user=False) + self.set_own_permission([imprt], "change_own_import") + client = self.import_login() + response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk})) self.assertEqual(response.status_code, 200) + self.assertContains(response, imprt.name) + + response = client.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data) + self.assertRedirects(response, '/import-list/') + imprt = models.Import.objects.get(pk=imprt.pk) self.assertEqual(imprt.name, "Test Name") - self.assertEqual(imprt.importer_type, importer_type) - self.assertEqual(imprt.encoding, "utf-8") - self.assertEqual(imprt.csv_sep, '|') - def test_validation_zip_import_image(self): - username, password, user = create_superuser() - c = Client() - c.login(username=username, password=password) + response = client.get(reverse("edit_import", kwargs={"pk": imprt2.pk})) + self.assertRedirects(response, '/') - imp_model = self.create_importer_model() - importer_type = self.create_importer_type(imp_model) + def test_validation_zip_import_image(self): + # init image_path = os.path.join(LIB_BASE_PATH, "ishtar_common", "tests", "test.png") data = { "name": "Import Zip Not Valid Must Fail", - "importer_type": importer_type.pk, + "importer_type": self.importer_type.pk, "encoding": "utf-8", "csv_sep": "|", "skip_lines": 1, @@ -2668,13 +2876,130 @@ class ImportTest(BaseImportTest): content_type="image/png", ), } - response = c.post(reverse("new_import"), data) + + # superuser + client = self.superuser_login() + response = client.post(reverse("new_import"), data) expected = str( _('"Associated images" field must be a valid zip file.') ).replace('"', '"') self.assertIn(expected, response.content.decode("utf-8")) self.assertEqual(response.status_code, 200) + def test_display_csv(self): + # init + imprt = self.create_import() + imprt2 = self.create_import(name="My-import-2", need_user=False) + url = reverse("import_display_csv", args=["source", "", imprt.pk]) + url2 = reverse("import_display_csv", args=["source", "", imprt2.pk]) + + # no login + client = Client() + response = client.get(url) + self.assertRedirects(response, "/") + + # superuser + client = self.superuser_login() + response = client.get(url) + self.assertEqual(response.status_code, 200) + response = client.get(url2) + self.assertEqual(response.status_code, 200) + + # simple login + client = self.simple_login() + response = client.get(url) + self.assertRedirects(response, "/") + + # import user global permission + self.set_global_permission([imprt, imprt2], "view_import") + client = self.import_login() + response = client.get(url) + self.assertEqual(response.status_code, 200) + response = client.get(url2) + self.assertEqual(response.status_code, 200) + self.remove_global_permission([imprt, imprt2], ("view_import",)) + + # import user own permission + self.set_own_permission([imprt], "view_own_import") + client = self.import_login() + response = client.get(url) + self.assertEqual(response.status_code, 200) + response = client.get(url2) + self.assertRedirects(response, "/") + + def test_ignore_errors(self): + # init + imprt = self.create_import() + imprt2 = self.create_import(name="My-import-2", need_user=False) + + path = os.path.join( + LIB_BASE_PATH, "ishtar_common", "tests", "error-file.csv" + ) + imprt.error_file = SimpleUploadedFile(name="error-file.csv", content=open(path, "rb").read(), content_type="text/csv") + imprt.save() + imprt2.error_file = SimpleUploadedFile(name="error-file.csv", content=open(path, "rb").read(), content_type="text/csv") + imprt2.save() + + q = models.ImportLineError.objects.filter(import_item=imprt.pk) + self.assertEqual(q.count(), 2) + ignored_line = q.all()[0] + url = reverse("import_ignore_line", args=[ignored_line.pk]) + q = models.ImportLineError.objects.filter(import_item=imprt2.pk) + self.assertEqual(q.count(), 2) + ignored_line2 = q.all()[0] + url2 = reverse("import_ignore_line", args=[ignored_line2.pk]) + + # no login + client = Client() + response = client.get(url) + self.assertRedirects(response, "/") + + # superuser + client = self.superuser_login() + response = client.get(url) + self.assertEqual(response.status_code, 200) + self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line.pk).ignored, True) + ignored_line.ignored = False + ignored_line.save() + response = client.get(url2) + self.assertEqual(response.status_code, 200) + self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line2.pk).ignored, True) + ignored_line2.ignored = False + ignored_line2.save() + + # simple login + client = self.simple_login() + response = client.get(url) + self.assertRedirects(response, "/") + + # import user global permission + self.set_global_permission([imprt, imprt2], "change_import") + client = self.import_login() + response = client.get(url) + self.assertEqual(response.status_code, 200) + self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line.pk).ignored, True) + ignored_line.ignored = False + ignored_line.save() + response = client.get(url2) + self.assertEqual(response.status_code, 200) + self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line2.pk).ignored, True) + ignored_line2.ignored = False + ignored_line2.save() + self.remove_global_permission([imprt, imprt2], ("change_import",)) + + # import user own permission + self.set_own_permission([imprt], "change_own_import") + client = self.import_login() + response = client.get(url) + self.assertEqual(response.status_code, 200) + self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line.pk).ignored, True) + self.assertEqual(response.status_code, 200) + response = client.get(url2) + self.assertEqual(response.status_code, 404) + self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line2.pk).ignored, False) + + +class ImportTest(BaseImportTest): def test_archive_import(self): imprt = self.create_import() with open(imprt.imported_file.path, "r") as f: @@ -2796,19 +3121,6 @@ class ImportTest(BaseImportTest): field = getattr(sub_import, k) self.assertTrue(field, "{} is missing in unarchive".format(k)) - def test_display_csv(self): - imprt = self.create_import() - username, password, __ = create_user("test", "test") - c = Client() - c.login(username=username, password=password) - url = "import_display_csv" - response = c.get(reverse(url, args=["source", "", imprt.pk]), ) - self.assertEqual(response.status_code, 404) - username, password, __ = create_superuser() - c.login(username=username, password=password) - response = c.get(reverse(url, args=["source", "", imprt.pk])) - self.assertEqual(response.status_code, 200) - def test_delete_related(self): town = models.Town.objects.create(name="my-test") self.assertEqual(models.Town.objects.filter(name="my-test").count(), 1) diff --git a/ishtar_common/tests/error-file.csv b/ishtar_common/tests/error-file.csv new file mode 100644 index 000000000..007d2d3c2 --- /dev/null +++ b/ishtar_common/tests/error-file.csv @@ -0,0 +1,3 @@ +"ligne","colonne","erreur" +"2","1","Valeur requise" +"3","1","Valeur requise" diff --git a/ishtar_common/urls.py b/ishtar_common/urls.py index 52e1324b2..c3d614ac6 100644 --- a/ishtar_common/urls.py +++ b/ishtar_common/urls.py @@ -210,62 +210,62 @@ urlpatterns = [ ), url( r"^import-new/$", - check_rights(["change_import"])(views.NewImportView.as_view()), + check_rights(["add_import", "add_own_import"])(views.NewImportView.as_view()), name="new_import", ), url( r"^import-edit/(?P<pk>[0-9]+)/$", - check_rights(["change_import"])(views.EditImportView.as_view()), + check_rights(["change_import", "change_own_import"])(views.EditImportView.as_view()), name="edit_import", ), url( r"^import-new-gis/$", - check_rights(["change_import"])(views.NewImportGISView.as_view()), + check_rights(["add_import", "add_own_import"])(views.NewImportGISView.as_view()), name="new_import_gis", ), url( r"^import-new-group/$", - check_rights(["change_import"])(views.NewImportGroupView.as_view()), + check_rights(["add_import", "add_own_import"])(views.NewImportGroupView.as_view()), name="new_import_group", ), url( r"^import-list/$", - check_rights(["change_import"])(views.ImportListView.as_view()), + check_rights(["change_import", "change_own_import"])(views.ImportListView.as_view()), name="current_imports", ), url( r"^import-list-table/$", - check_rights(["change_import"])(views.ImportListTableView.as_view()), + check_rights(["change_import", "change_own_import"])(views.ImportListTableView.as_view()), name="current_imports_table", ), url( r"^import-get-status/$", - check_rights(["change_import"])(views.import_get_status), + check_rights(["change_import", "change_own_import"])(views.import_get_status), name="import_get_status", ), url( r"^import-list-old/$", - check_rights(["change_import"])(views.ImportOldListView.as_view()), + check_rights(["change_import", "change_own_import"])(views.ImportOldListView.as_view()), name="old_imports", ), url( r"^import-delete/(?P<pk>[0-9]+)/$", - views.ImportDeleteView.as_view(), + check_rights(["delete_import", "delete_own_import"])(views.ImportDeleteView.as_view()), name="import_delete", ), url( r"^import-group-delete/(?P<pk>[0-9]+)/$", - views.ImportGroupDeleteView.as_view(), + check_rights(["delete_import", "delete_own_import"])(views.ImportGroupDeleteView.as_view()), name="import_group_delete", ), url( r"^import-link-unmatched/(?P<pk>[0-9]+)/$", - views.ImportLinkView.as_view(), + check_rights(["change_import", "change_own_import"])(views.ImportMatchView.as_view()), name="import_link_unmatched", ), url( r"^import-csv-view/(?P<target>source|result|match|error)/(?P<group>group\-)?(?P<pk>[0-9]+)/$", - views.ImportCSVView.as_view(), + check_rights(["view_import", "view_own_import"])(views.ImportCSVView.as_view()), name="import_display_csv", ), url( @@ -281,12 +281,12 @@ urlpatterns = [ ), url( r"^import-pre-form/(?P<import_id>[0-9]+)/$", - check_rights(["change_import"])(views.ImportPreFormView.as_view()), + check_rights(["change_import", "change_own_import"])(views.ImportPreFormView.as_view()), name="import_pre_import_form", ), url( r"^import-ignore-line/(?P<line_id>[0-9]+)/$", - views.line_error, + check_rights(["change_import", "change_own_import"])(views.line_error), name="import_ignore_line", ), url(r"^profile(?:/(?P<pk>[0-9]+))?/$", views.ProfileEdit.as_view(), name="profile"), diff --git a/ishtar_common/views.py b/ishtar_common/views.py index b91f3202c..cd22d62eb 100644 --- a/ishtar_common/views.py +++ b/ishtar_common/views.py @@ -1490,7 +1490,28 @@ class NewImportView(BaseImportView, CreateView): page_name = _("Import: create (table)") -class EditImportView(BaseImportView, UpdateView): +class ImportPermissionMixin: + permission_full = "change_import" + permission_own = "change_own_import" + + def dispatch(self, request, *args, **kwargs): + import_pk = self.kwargs["pk"] + user = request.user + if not user or not user.ishtaruser: + return redirect("/") + model = models.ImportGroup if self.kwargs.get("group", None) else models.Import + q = model.query_can_access(user, perm=self.permission_full).filter(pk=import_pk) + if not user.is_superuser and not user.ishtaruser.has_right(self.permission_full): + if not user.ishtaruser.has_right(self.permission_own): + return redirect("/") + q = q.filter(Q(importer_type__users__pk=user.ishtaruser.pk)) + if not q.count(): + return redirect("/") + returned = super().dispatch(request, *args, **kwargs) + return returned + + +class EditImportView(ImportPermissionMixin, BaseImportView, UpdateView): page_name = _("Import: edit (table)") @@ -1544,6 +1565,21 @@ class ImportPreFormView(IshtarMixin, LoginRequiredMixin, FormView): return HttpResponseRedirect(self.get_success_url()) +def get_permissions_for_actions(user, imprt, owns, can_edit_all, can_delete_all, can_edit_own, can_delete_own): + can_edit, can_delete = False, False + is_own = None + if can_edit_own or can_delete_own: # need to check owner + if imprt.importer_type_id not in owns: + # "is_own" only query once by importer type + owns[imprt.importer_type.pk] = imprt.importer_type.is_own(user.ishtaruser) + is_own = owns[imprt.importer_type_id] + if can_edit_all or (can_edit_own and is_own): + can_edit = True + if can_delete_all or (can_delete_own and is_own): + can_delete = True + return can_edit, can_delete + + class ImportListView(IshtarMixin, LoginRequiredMixin, ListView): template_name = "ishtar/import_list.html" model = models.Import @@ -1555,15 +1591,31 @@ class ImportListView(IshtarMixin, LoginRequiredMixin, ListView): def get_queryset(self): user = self.request.user - if not user.pk: + if not user.pk or not user.ishtaruser: raise Http404() - q1 = self._queryset_filter(self.model.query_can_access(user)) + q1 = self._queryset_filter(self.model.query_can_access(user, "change_import")) q1 = q1.filter(group__isnull=True).order_by("-end_date", "-creation_date", "-pk") - q2 = self._queryset_filter(models.ImportGroup.query_can_access(user)) + q2 = self._queryset_filter(models.ImportGroup.query_can_access(user, "change_import")) q2 = q2.order_by("-end_date", "-creation_date", "-pk") - return list(reversed(sorted(list(q1) + list(q2), key=lambda x: (x.end_date or x.creation_date)))) + values = list(reversed(sorted(list(q1) + list(q2), key=lambda x: (x.end_date or x.creation_date)))) + can_edit_all, can_delete_all, can_edit_own, can_delete_own = models.Import.get_permissions_for_actions( + user, self.request.session + ) + imports = [] + owns = {} + for imprt in values: + can_edit, can_delete = get_permissions_for_actions( + user, imprt, owns, can_edit_all, can_delete_all, can_edit_own, can_delete_own + ) + imprt.action_list = imprt.get_actions(can_edit=can_edit, can_delete=can_delete) + imports.append(imprt) + return imports def post(self, request, *args, **kwargs): + can_edit_all, can_delete_all, can_edit_own, can_delete_own = models.Import.get_permissions_for_actions( + request.user, request.session + ) + owns = {} for field in request.POST: if not field.startswith("import-action-") or not request.POST[field]: continue @@ -1576,28 +1628,26 @@ class ImportListView(IshtarMixin, LoginRequiredMixin, ListView): imprt = model.objects.get(pk=int(field.split("-")[-1])) except (models.Import.DoesNotExist, ValueError): continue - if not self.request.user.is_superuser: - # user can only edit his own imports - user = models.IshtarUser.objects.get(pk=self.request.user.pk) - if imprt.user != user: - continue + can_edit, can_delete = get_permissions_for_actions( + request.user, imprt, owns, can_edit_all, can_delete_all, can_edit_own, can_delete_own + ) action = request.POST[field] - if action == "D": + if can_delete and action == "D": url = "import_group_delete" if is_group else "import_delete" return HttpResponseRedirect( reverse(url, kwargs={"pk": imprt.pk}) ) - elif action == "ED": + elif can_edit and action == "ED": url = "edit_import_group" if is_group else "edit_import" return HttpResponseRedirect( reverse(url, kwargs={"pk": imprt.pk}) ) - elif action == "A": + elif can_edit and action == "A": imprt.initialize( user=self.request.user.ishtaruser, session_key=request.session.session_key, ) - elif action == "I": + elif can_edit and action == "I": if settings.USE_BACKGROUND_TASK: imprt.delayed_importation(request, request.session.session_key) else: @@ -1612,12 +1662,12 @@ class ImportListView(IshtarMixin, LoginRequiredMixin, ListView): f"{imprt} - {e}", "warning", ) - elif action == "CH": + elif can_edit and action == "CH": if settings.USE_BACKGROUND_TASK: imprt.delayed_check_modified(request.session.session_key) else: imprt.check_modified() - elif action == "IS": + elif can_edit and action == "IS": if imprt.current_line is None: imprt.current_line = imprt.skip_lines imprt.save() @@ -1626,19 +1676,37 @@ class ImportListView(IshtarMixin, LoginRequiredMixin, ListView): "import_step_by_step", args=[imprt.pk, imprt.current_line + 1] ) ) - elif action == "AC": + elif can_edit and action == "AC": imprt.archive() - elif action in ("F", "FE"): + elif can_edit and action in ("F", "FE"): imprt.unarchive(action) return HttpResponseRedirect(reverse(self.current_url)) def get_context_data(self, **kwargs): dct = super().get_context_data(**kwargs) + add_import_perm = self.request.user.ishtaruser.has_right("add_import", session=self.request.session) + import_type_table = models.ImporterType.objects.filter(available=True, is_import=True, type='tab') + import_type_gis = models.ImporterType.objects.filter(available=True, is_import=True, type='gis') + import_type_group = models.ImporterGroup.objects.filter(available=True) + if not add_import_perm and self.request.user.ishtaruser.has_right("add_own_import", + session=self.request.session): + import_type_table = import_type_table.filter(users__pk=self.request.user.ishtaruser.pk) + import_type_gis = import_type_gis.filter(users__pk=self.request.user.ishtaruser.pk) + import_type_group = import_type_group.filter(users__pk=self.request.user.ishtaruser.pk) + add_import_perm = True + has_import_table, has_import_gis, has_import_group = False, False, False + if add_import_perm: + if import_type_table.count(): + has_import_table = True + if import_type_gis.count(): + has_import_gis = True + if import_type_group.count(): + has_import_group = True dct.update({ - "autorefresh_available": settings.USE_BACKGROUND_TASK, - "has_import_table": models.ImporterType.objects.filter(available=True, is_import=True, type='tab').count(), - "has_import_gis": models.ImporterType.objects.filter(available=True, is_import=True, type='gis').count(), - "has_import_group": models.ImporterGroup.objects.filter(available=True).count(), + "has_import_table": has_import_table, + "has_import_gis": has_import_gis, + "has_import_group": has_import_group, + "can_create_import": has_import_table or has_import_gis or has_import_group, }) return dct @@ -2083,14 +2151,23 @@ def import_get_status(request, current_right=None): "number_of_line": item.number_of_line, "progress_percent": item.progress_percent, }) - item_dct["actions"] = [(key, str(lbl)) for key, lbl in item.get_actions()] + can_edit_all, can_delete_all, can_edit_own, can_delete_own = models.Import.get_permissions_for_actions( + request.user + ) + can_edit, can_delete = get_permissions_for_actions( + request.user, item, {}, can_edit_all, can_delete_all, can_edit_own, can_delete_own + ) + item_dct["actions"] = [ + (key, str(lbl)) + for key, lbl in item.get_actions(can_edit=can_edit, can_delete=can_delete) + ] response[key].append(item_dct) data = json.dumps(response) return HttpResponse(data, content_type="application/json") -class ImportLinkView(IshtarMixin, LoginRequiredMixin, ModelFormSetView): +class ImportMatchView(ImportPermissionMixin, IshtarMixin, LoginRequiredMixin, ModelFormSetView): template_name = "ishtar/formset_import_match.html" model = models.TargetKey page_name = _("Link unmatched items") @@ -2100,9 +2177,11 @@ class ImportLinkView(IshtarMixin, LoginRequiredMixin, ModelFormSetView): form_class = forms.TargetKeyForm formset_class = forms.TargetKeyFormset max_fields = 250 + permission_full = "change_import" + permission_own = "change_own_import" def get_formset_kwargs(self): - kwargs = super(ImportLinkView, self).get_formset_kwargs() + kwargs = super().get_formset_kwargs() kwargs["user"] = self.request.user return kwargs @@ -2126,25 +2205,29 @@ class ImportLinkView(IshtarMixin, LoginRequiredMixin, ModelFormSetView): return reverse("import_link_unmatched", args=[self.kwargs["pk"]]) -class ImportDeleteView(IshtarMixin, LoginRequiredMixin, DeleteView): +class ImportDeleteView(ImportPermissionMixin, IshtarMixin, LoginRequiredMixin, DeleteView): template_name = "ishtar/import_delete.html" model = models.Import page_name = _("Delete import") + permission_full = "delete_import" + permission_own = "delete_own_import" def get_success_url(self): return reverse("current_imports") -class ImportGroupDeleteView(IshtarMixin, LoginRequiredMixin, DeleteView): +class ImportGroupDeleteView(ImportPermissionMixin, IshtarMixin, LoginRequiredMixin, DeleteView): template_name = "ishtar/import_delete.html" model = models.ImportGroup page_name = _("Delete import") + permission_full = "delete_import" + permission_own = "delete_own_import" def get_success_url(self): return reverse("current_imports") -class ImportCSVView(IshtarMixin, LoginRequiredMixin, TemplateView): +class ImportCSVView(ImportPermissionMixin, IshtarMixin, LoginRequiredMixin, TemplateView): template_name = "ishtar/blocks/view_import_csv.html" ATTRIBUTES = { "source": "get_imported_values", @@ -2158,13 +2241,15 @@ class ImportCSVView(IshtarMixin, LoginRequiredMixin, TemplateView): "result": ("fa fa-th", _("Result")) , "match": ("fa fa-arrows-h", _("Match")), } + permission_full = "view_import" + permission_own = "view_own_import" def get(self, request, *args, **kwargs): user = self.request.user if not user.pk: raise Http404() model = models.ImportGroup if kwargs.get("group", None) else models.Import - q = model.query_can_access(self.request.user).filter(pk=kwargs.get("pk", -1)) + q = model.query_can_access(self.request.user, perm=self.permission_full).filter(pk=kwargs.get("pk", -1)) if not q.count(): raise Http404() self.import_item = q.all()[0] @@ -2208,7 +2293,7 @@ class ImportCSVView(IshtarMixin, LoginRequiredMixin, TemplateView): return data -def line_error(request, line_id): +def line_error(request, line_id, current_right=None): """ Set or unset ignored state of a csv error file """ @@ -2219,7 +2304,7 @@ def line_error(request, line_id): if not q.count(): return line = q.all()[0] - q = models.Import.query_can_access(request.user).filter(pk=line.import_item_id) + q = models.Import.query_can_access(request.user, perm="change_import").filter(pk=line.import_item_id) if not q.count(): raise Http404() line.ignored = not line.ignored |