diff options
Diffstat (limited to 'ishtar_common')
| -rw-r--r-- | ishtar_common/fixtures/initial_data-fr.json | 11 | ||||
| -rw-r--r-- | ishtar_common/forms_common.py | 33 | ||||
| -rw-r--r-- | ishtar_common/migrations/0231_default_mandatory_keys.py | 38 | ||||
| -rw-r--r-- | ishtar_common/migrations/0231_default_mandatory_keys_import_permissions.py | 116 | ||||
| -rw-r--r-- | ishtar_common/models_common.py | 3 | ||||
| -rw-r--r-- | ishtar_common/models_imports.py | 67 | ||||
| -rw-r--r-- | ishtar_common/templates/ishtar/import_list.html | 4 | ||||
| -rw-r--r-- | ishtar_common/templates/ishtar/import_table.html | 6 | ||||
| -rw-r--r-- | ishtar_common/tests.py | 466 | ||||
| -rw-r--r-- | ishtar_common/tests/error-file.csv | 3 | ||||
| -rw-r--r-- | ishtar_common/urls.py | 28 | ||||
| -rw-r--r-- | ishtar_common/views.py | 147 | 
12 files changed, 730 insertions, 192 deletions
diff --git a/ishtar_common/fixtures/initial_data-fr.json b/ishtar_common/fixtures/initial_data-fr.json index f3c539d3c..980ed3d1a 100644 --- a/ishtar_common/fixtures/initial_data-fr.json +++ b/ishtar_common/fixtures/initial_data-fr.json @@ -301,7 +301,16 @@                  "Auteurs : lecture"              ],              [ -                "Import : ajout/modification/suppression" +                "Imports : lecture" +            ], +            [ +                "Imports : modification" +            ], +            [ +                "Imports : suppression" +            ], +            [ +                "Imports : ajout"              ]          ]      } diff --git a/ishtar_common/forms_common.py b/ishtar_common/forms_common.py index dd3a83971..0d739fbfb 100644 --- a/ishtar_common/forms_common.py +++ b/ishtar_common/forms_common.py @@ -246,19 +246,31 @@ class BaseImportForm(IshtarForm, forms.ModelForm):          super().__init__(*args, **kwargs)          self.fields["imported_file"].required = True          self._filter_group(user) -        self._filter_importer_type() +        self._filter_importer_type(user)          if "imported_images" in self.fields:              self.fields["imported_images"].validators = [file_size_validator]          self.fields["imported_file"].validators = [file_size_validator]          self._post_init() -    def _filter_importer_type(self): -        self.fields["importer_type"].choices = [("", "--")] + [ -            (imp.pk, imp.name) -            for imp in models.ImporterType.objects.filter( +    def _filter_importer_type_query(self, q, user): +        if user.is_superuser or user.ishtaruser.has_right("add_import"): +            return q +        if not user.ishtaruser.has_right("add_own_import"): +            self.fields["importer_type"].choices = [("", "--")] +            return +        q = q.filter(users__pk=user.ishtaruser.pk) +        return q + +    def _filter_importer_type(self, user): +        q = models.ImporterType.objects.filter(                  available=True, is_import=True, type=self.importer_type -            ) +        ) +        q = self._filter_importer_type_query(q, user) +        if not q: +            return +        self.fields["importer_type"].choices = [("", "--")] + [ +            (imp.pk, imp.name) for imp in q.all()          ]      def _filter_group(self, user): @@ -511,10 +523,13 @@ class NewImportGroupForm(NewImportForm):          "imported_images": FormHeader(_("Documents/Images")),      } -    def _filter_importer_type(self): +    def _filter_importer_type(self, user): +        q = models.ImporterGroup.objects.filter(available=True) +        q = self._filter_importer_type_query(q, user) +        if not q: +            return          self.fields["importer_type"].choices = [("", "--")] + [ -            (imp.pk, imp.name) -            for imp in models.ImporterGroup.objects.filter(available=True) +            (imp.pk, imp.name) for imp in q.all()          ]      def _filter_group(self, user): diff --git a/ishtar_common/migrations/0231_default_mandatory_keys.py b/ishtar_common/migrations/0231_default_mandatory_keys.py deleted file mode 100644 index cd245322a..000000000 --- a/ishtar_common/migrations/0231_default_mandatory_keys.py +++ /dev/null @@ -1,38 +0,0 @@ -# Generated by Django 2.2.24 on 2023-09-18 17:05 - -from django.db import migrations - -EXCLUDE_LIST = [ -    "-", -    "auto_external_id", -    "spatial_reference_system", -    "public_domain", -] - -FULL_COPY_LIST = [ -    "scientist__attached_to", -] - - -def migrate(apps, __): -    ImporterDefault = apps.get_model('ishtar_common', 'ImporterDefault') -    for default in ImporterDefault.objects.all(): -        if default.target not in EXCLUDE_LIST: -            req = default.target -            if req not in FULL_COPY_LIST: -                req = req.split("__")[0] -            if req.endswith("_type") or req.endswith("_types"): -                continue -            default.required_fields = req -            default.save() - - -class Migration(migrations.Migration): - -    dependencies = [ -        ('ishtar_common', '0230_auto_20231024_1045'), -    ] - -    operations = [ -        migrations.RunPython(migrate), -    ] diff --git a/ishtar_common/migrations/0231_default_mandatory_keys_import_permissions.py b/ishtar_common/migrations/0231_default_mandatory_keys_import_permissions.py new file mode 100644 index 000000000..120711f09 --- /dev/null +++ b/ishtar_common/migrations/0231_default_mandatory_keys_import_permissions.py @@ -0,0 +1,116 @@ +# Generated by Django 2.2.24 on 2023-09-18 17:05 + +from django.db import migrations + +COLOR_WARNING = "\033[93m" +COLOR_ENDC = "\033[0m" + +EXCLUDE_LIST = [ +    "-", +    "auto_external_id", +    "spatial_reference_system", +    "public_domain", +] + +FULL_COPY_LIST = [ +    "scientist__attached_to", +] + +GROUPS = [ +    [ +        "Imports : lecture", +        "view_import", +        "ishtar_common", +        "import" +    ], +    [ +        "Imports rattachés : lecture", +        "view_own_import", +        "ishtar_common", +        "import" +    ], +    [ +        "Imports : modification", +        "change_import", +        "ishtar_common", +        "import" +    ], +    [ +        "Imports rattachés : modification", +        "change_own_import", +        "ishtar_common", +        "import" +    ], +    [ +        "Imports : suppression", +        "delete_import", +        "ishtar_common", +        "import" +    ], +    [ +        "Imports rattachés : suppression", +        "delete_own_import", +        "ishtar_common", +        "import" +    ], +    [ +        "Imports : ajout", +        "add_import", +        "ishtar_common", +        "import" +    ], +    [ +        "Imports rattachés : ajout", +        "add_own_import", +        "ishtar_common", +        "import" +    ], +] + + +def migrate(apps, __): +    ImporterDefault = apps.get_model('ishtar_common', 'ImporterDefault') +    for default in ImporterDefault.objects.all(): +        if default.target not in EXCLUDE_LIST: +            req = default.target +            if req not in FULL_COPY_LIST: +                req = req.split("__")[0] +            if req.endswith("_type") or req.endswith("_types"): +                continue +            default.required_fields = req +            default.save() + +    print("") +    ProfileType = apps.get_model('ishtar_common', 'ProfileType') +    q = ProfileType.objects.filter(txt_idx="administrator") +    administrator = None +    if not q.count(): +        print(COLOR_WARNING + "** No administrator profile found. **" + COLOR_ENDC) +    else: +        administrator = q.all()[0] + +    Permission = apps.get_model("auth", "Permission") +    Group = apps.get_model("auth", "Group") +    ContentType = apps.get_model("contenttypes", "ContentType") +    for name, codename, app, model in GROUPS: +        ct, __ = ContentType.objects.get_or_create(app_label=app, model=model) +        perm, __ = Permission.objects.get_or_create( +            codename=codename, defaults={"name": name, "content_type": ct} +        ) +        group, __ = Group.objects.get_or_create(name=name) +        group.permissions.add(perm) +        if administrator: +            administrator.groups.add(group) + +    print(COLOR_WARNING + "** Verify import permissions in profiles **" + COLOR_ENDC) + + +class Migration(migrations.Migration): + +    dependencies = [ +        ('ishtar_common', '0230_auto_20231024_1045'), +    ] + +    operations = [ +        migrations.RunPython(migrate), +    ] diff --git a/ishtar_common/models_common.py b/ishtar_common/models_common.py index 43677bca4..dcb9d030d 100644 --- a/ishtar_common/models_common.py +++ b/ishtar_common/models_common.py @@ -1111,9 +1111,10 @@ class Imported(models.Model):          if not user.ishtaruser:              return []          q = getattr(self, key) +        lst = []          if user.is_superuser or user.ishtaruser.has_right("view_import"):              lst = list(q.all()) -        else: +        elif user.ishtaruser.has_right("view_own_import"):              lst = q.filter(Q(user=user.ishtaruser) | Q(importer_type__users__pk=user.ishtaruser.pk))          new_lst = []          for imprt in lst: diff --git a/ishtar_common/models_imports.py b/ishtar_common/models_imports.py index 77dad558e..2967c18aa 100644 --- a/ishtar_common/models_imports.py +++ b/ishtar_common/models_imports.py @@ -225,6 +225,10 @@ class ImporterType(models.Model):      def __str__(self):          return self.name +    @classmethod +    def is_own(cls, ishtar_user): +        return bool(cls.objects.filter(users__pk=ishtar_user.pk).count()) +      @property      def type_label(self):          if self.type in IMPORT_TYPES_DICT: @@ -445,6 +449,10 @@ class ImporterGroup(models.Model):      def __str__(self):          return self.name +    @classmethod +    def is_own(cls, ishtar_user): +        return bool(cls.objects.filter(users__pk=ishtar_user.pk).count()) +      @property      def importer_types_label(self) -> str:          return " ; ".join([imp.importer_type.name for imp in self.importer_types.all()]) @@ -1413,18 +1421,35 @@ class BaseImport(models.Model, OwnPerms, SheetItem):          abstract = True      @classmethod -    def query_can_access(cls, user): +    def get_permissions_for_actions(cls, user, session): +        if not hasattr(user, "ishtaruser") or not user.ishtaruser: +            return False, False, False, False +        can_edit_all, can_delete_all, can_edit_own, can_delete_own = False, False, False, False +        if user.is_superuser: +            can_edit_all = True +            can_delete_all = True +        if user.ishtaruser.has_right("change_import", session=session): +            can_edit_all = True +        elif user.ishtaruser.has_right("change_own_import", session=session): +            can_edit_own = True +        if user.ishtaruser.has_right("delete_import", session=session): +            can_delete_all = True +        elif user.ishtaruser.has_right("delete_own_import", session=session): +            can_delete_own = True +        return can_edit_all, can_delete_all, can_edit_own, can_delete_own + +    @classmethod +    def query_can_access(cls, user, perm="view_import"):          """          Filter the query to check access permissions          :param user: User instance          :return: import query          """          q = cls.objects -        if user.is_superuser: +        if user.is_superuser or (hasattr(user, "ishtaruser") and user.ishtaruser and +                                 user.ishtaruser.has_right(perm)):              return q -        IshtarUser = apps.get_model("ishtar_common", "IshtarUser") -        ishtar_user = IshtarUser.objects.get(pk=user.pk) -        q = q.filter(Q(user=ishtar_user) | Q(importer_type__users__pk=ishtar_user.pk)) +        q = q.filter(Q(importer_type__users__pk=user.ishtaruser.pk))          return q      @classmethod @@ -1557,25 +1582,28 @@ class ImportGroup(BaseImport):              return ""          return IMPORT_GROUP_STATE_DCT[str(self.state)] -    def get_actions(self): +    def get_actions(self, can_edit=False, can_delete=False):          """          Get available action relevant with the current status          """          actions = [] -        if self.state == "C": +        if not can_edit and not can_delete: +            return actions +        if can_edit and self.state == "C":              actions.append(("A", _("Analyse"))) -        if self.state == "A": +        if can_edit and self.state == "A":              actions.append(("A", _("Re-analyse")))              if not any(-1 for imp in self.import_list() if not imp.pre_import_form_is_valid):                  actions.append(("I", _("Launch import"))) -        if self.state in ("F", "FE"): +        if can_edit and self.state in ("F", "FE"):              actions.append(("A", _("Re-analyse")))              actions.append(("I", _("Re-import")))              actions.append(("AC", _("Archive"))) -        if self.state == "AC": +        if can_edit and self.state == "AC":              state = "FE" if any(1 for imp in self.imports.all() if imp.error_file) else "F"              actions.append((state, _("Unarchive"))) -        actions.append(("D", _("Delete"))) +        if can_delete: +            actions.append(("D", _("Delete")))          return actions      def initialize(self, user=None, session_key=None): @@ -2203,16 +2231,18 @@ class Import(BaseImport):              idx_line          ) in self.imported_line_numbers.split(",") -    def get_actions(self): +    def get_actions(self, can_edit=False, can_delete=False):          """          Get available action relevant with the current status          """          IshtarSiteProfile = apps.get_model("ishtar_common", "IshtarSiteProfile")          profile = IshtarSiteProfile.get_current_profile()          actions = [] -        if self.state == "C": +        if not can_edit and not can_delete: +            return actions +        if can_edit and self.state == "C":              actions.append(("A", _("Analyse"))) -        if self.state in ("A", "PI"): +        if can_edit and self.state in ("A", "PI"):              actions.append(("A", _("Re-analyse")))              if self.pre_import_form_is_valid:                  actions.append(("I", _("Launch import"))) @@ -2222,7 +2252,7 @@ class Import(BaseImport):                      actions.append(("CH", _("Re-check for changes")))                  else:                      actions.append(("CH", _("Check for changes"))) -        if self.state in ("F", "FE"): +        if can_edit and self.state in ("F", "FE"):              actions.append(("A", _("Re-analyse")))              actions.append(("I", _("Re-import")))              if profile.experimental_feature: @@ -2232,12 +2262,13 @@ class Import(BaseImport):                  else:                      actions.append(("CH", _("Check for changes")))              actions.append(("AC", _("Archive"))) -        if self.state == "AC": +        if can_edit and self.state == "AC":              state = "FE" if self.error_file else "F"              actions.append((state, _("Unarchive"))) -        if self.state in ("C", "A"): +        if can_delete and self.state in ("C", "A"):              actions.append(("ED", _("Edit"))) -        actions.append(("D", _("Delete"))) +        if can_delete: +            actions.append(("D", _("Delete")))          return actions      @property diff --git a/ishtar_common/templates/ishtar/import_list.html b/ishtar_common/templates/ishtar/import_list.html index 35821c5bf..ec5e714de 100644 --- a/ishtar_common/templates/ishtar/import_list.html +++ b/ishtar_common/templates/ishtar/import_list.html @@ -16,7 +16,7 @@  {% endblock %}  {% block content %} -<div class="text-center m-3"> +{% if can_create_import %}<div class="text-center m-3">    {% if has_import_table %}<a href="{% url 'new_import' %}" class="btn btn-success">      <i class="fa fa-plus"></i> {% trans 'import (table)' %}    </a>{% endif %} @@ -26,7 +26,7 @@    {% if has_import_group %}<a href="{% url 'new_import_group' %}" class="btn btn-success">      <i class="fa fa-plus"></i> {% trans 'import (group)' %}    </a>{% endif %} -</div> +</div>{% endif %}  <div id="import-container">    {% include "ishtar/import_table.html" %}  </div> diff --git a/ishtar_common/templates/ishtar/import_table.html b/ishtar_common/templates/ishtar/import_table.html index 08c949654..56e73c6f0 100644 --- a/ishtar_common/templates/ishtar/import_table.html +++ b/ishtar_common/templates/ishtar/import_table.html @@ -78,14 +78,18 @@                  {{import.status}}              </td>              <td> +                {% if import.action_list %}                  <select class="form-control"                          id='import-action-{{import.import_id}}'                          name='import-action-{{import.import_id}}'>                      <option value=''>--------</option> -                    {% for action, lbl in import.get_actions %} +                    {% for action, lbl in import.action_list %}                      <option value='{{action}}'>{{lbl}}</option>                      {% endfor%}                  </select> +                {% else %} +                – +                {% endif %}              </td>              <td><ul class="simple table-import-files">                  {% if import.get_imported_values %}<li class="p-1"> diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py index 979b38395..f14872295 100644 --- a/ishtar_common/tests.py +++ b/ishtar_common/tests.py @@ -37,7 +37,7 @@ from unittest.runner import TextTestRunner, TextTestResult  from django.apps import apps  from django.conf import settings -from django.contrib.auth.models import User, Permission +from django.contrib.auth.models import User, Permission, Group  from django.contrib.contenttypes.models import ContentType  from django.contrib.gis.geos import (      GEOSGeometry,  @@ -2474,10 +2474,16 @@ class ShortMenuTest(TestCase):  class BaseImportTest(TestCase): - -    def create_import(self): -        create_user() +    def setUp(self):          imp_model = models.ImporterModel.objects.create( +            klass="ishtar_common.models.Parcel", name="Parcel" +        ) +        self.importer_type = models.ImporterType.objects.create(associated_models=imp_model) + +    def create_import(self, name="My Import", need_user=True): +        if need_user: +            create_user() +        imp_model, __ = models.ImporterModel.objects.get_or_create(              klass="ishtar_common.models.Person", name="Person"          )          importer_type = models.ImporterType.objects.create(associated_models=imp_model) @@ -2490,6 +2496,7 @@ class BaseImportTest(TestCase):          with open(dest, "rb") as f:              mcc_operation_file = DjangoFile(f)              imprt = models.Import.objects.create( +                name=name,                  user=models.IshtarUser.objects.all()[0],                  importer_type=importer_type,                  imported_file=mcc_operation_file, @@ -2554,7 +2561,7 @@ class BaseImportTest(TestCase):              "csv_sep": ",",          }          form = forms_common.NewImportGroupForm( -            data=post_dict, files=file_dict, user=ishtar_user +            data=post_dict, files=file_dict, user=ishtar_user.user_ptr          )          self.assertTrue(form.is_valid())          impt = form.save(ishtar_user) @@ -2563,102 +2570,303 @@ class BaseImportTest(TestCase):              self.init_group_import(impt)          return impt -    def create_importer_model(self): -        return models.ImporterModel.objects.create( -            klass="ishtar_common.models.Parcel", name="Parcel" + +class ImportTestInterface(BaseImportTest): + +    def setUp(self): +        super().setUp() +        self.super_username, self.super_password, __ = create_superuser() +        self.simple_username, self.simple_password, __ = create_user() +        self.import_username, self.import_password, self.import_user = create_user( +            "import-user", "password"          ) +        self.import_group = Group.objects.create(name="Import") +        self.import_user.groups.add(self.import_group) -    def create_importer_type(self, imp_model): -        return models.ImporterType.objects.create(associated_models=imp_model) +    def superuser_login(self): +        client = Client() +        client.login(username=self.super_username, password=self.super_password) +        return client +    def simple_login(self): +        client = Client() +        client.login(username=self.simple_username, password=self.simple_password) +        return client -class ImportTest(BaseImportTest): +    def import_login(self): +        client = Client() +        client.login(username=self.import_username, password=self.import_password) +        return client + +    def set_global_permission(self, imports, codenames): +        self.import_group.permissions.clear() +        for imprt in imports: +            imprt.importer_type.users.clear() +        if not isinstance(codenames, (list, tuple)): +            codenames = (codenames,) +        for codename in codenames: +            self.import_group.permissions.add(Permission.objects.get(codename=codename)) + +    def remove_global_permission(self, imports, codenames): +        self.import_group.permissions.clear() +        for imprt in imports: +            imprt.importer_type.users.clear() +        if not isinstance(codenames, (list, tuple)): +            codenames = (codenames,) +        for codename in codenames: +            perm = Permission.objects.get(codename=codename) +            if perm in list(self.import_group.permissions.all()): +                self.import_group.permissions.remove() + +    def set_own_permission(self, imports, codenames): +        self.set_global_permission(imports, codenames) +        user = models.IshtarUser.objects.get(user_ptr=self.import_user) +        for imprt in imports: +            imprt.importer_type.users.add(user) + +    def test_list_import(self): +        imprt = self.create_import() +        imprt2 = self.create_import(name="My-import-2", need_user=False) +        # no login and simple user login +        for client in (Client(), self.simple_login()): +            response = client.get(reverse("current_imports")) +            self.assertRedirects(response, '/') + +        # superuser +        client = self.superuser_login() +        response = client.get(reverse("current_imports")) +        self.assertEqual(response.status_code, 200) +        content = response.content.decode() +        self.assertIn(imprt.name, content) +        self.assertIn(imprt2.name, content) -    def test_edit_import(self): -        username, password, user = create_superuser() +        # import user +        client = self.import_login() +        response = client.get(reverse("current_imports")) +        self.assertRedirects(response, '/') + +        self.set_global_permission([imprt, imprt2], "change_import") +        client = self.import_login() +        response = client.get(reverse("current_imports")) +        self.assertEqual(response.status_code, 200) +        content = response.content.decode() +        self.assertIn(imprt.name, content) +        self.assertIn(imprt2.name, content) +        self.remove_global_permission([imprt, imprt2], "change_import") + +        self.set_own_permission([imprt], "change_own_import") +        client = self.import_login() +        response = client.get(reverse("current_imports")) +        self.assertEqual(response.status_code, 200) +        content = response.content.decode() +        self.assertIn(imprt.name, content) +        self.assertNotIn(imprt2.name, content, +                         msg="Import 2 unexpectedly found in import list") + +    def test_import_action_permission(self):          imprt = self.create_import() -        c = Client() -        c.login(username=username, password=password) +        imprt2 = self.create_import(name="My-import-2", need_user=False) +        delete_tag = "<option value='D'>" -        response = c.get(reverse("edit_import", kwargs={"pk": imprt.pk})) +        # superuser +        client = self.superuser_login() +        response = client.get(reverse("current_imports"))          self.assertEqual(response.status_code, 200) -        self.assertContains(response, imprt.importer_type) -        self.assertContains(response, imprt.imported_file) -        self.assertContains(response, imprt.imported_images) -        self.assertContains(response, imprt.encoding) -        self.assertContains(response, imprt.csv_sep) +        content = response.content.decode() +        self.assertIn(f"import-action-{imprt.pk}", content) +        self.assertIn(f"import-action-{imprt2.pk}", content) +        self.assertIn(delete_tag, content) + +        # import user +        client = self.import_login() +        response = client.get(reverse("current_imports")) +        self.assertRedirects(response, '/') + +        self.set_global_permission([imprt, imprt2], "change_import") +        client = self.import_login() +        response = client.get(reverse("current_imports")) +        self.assertEqual(response.status_code, 200) +        content = response.content.decode() +        self.assertIn(f"import-action-{imprt.pk}", content) +        self.assertIn(f"import-action-{imprt2.pk}", content) +        self.assertNotIn(delete_tag, content) +        self.remove_global_permission([imprt, imprt2], ("change_import",)) + +        self.set_global_permission([imprt, imprt2], ("change_import", "delete_import")) +        client = self.import_login() +        response = client.get(reverse("current_imports")) +        self.assertEqual(response.status_code, 200) +        content = response.content.decode() +        self.assertIn(delete_tag, content) +        self.remove_global_permission([imprt, imprt2], ("change_import", "delete_import")) -        imp_model = self.create_importer_model() -        importer_type = self.create_importer_type(imp_model) +        self.set_own_permission([imprt], "change_own_import") +        client = self.import_login() +        response = client.get(reverse("current_imports")) +        self.assertEqual(response.status_code, 200) +        content = response.content.decode() +        self.assertIn(f"import-action-{imprt.pk}", content) +        self.assertNotIn(f"import-action-{imprt2.pk}", content, +                         msg="Import 2 unexpectedly found in import list") +        self.assertNotIn(delete_tag, content) +        self.set_own_permission([imprt], ("change_own_import", "change_own_import")) +    def _test_create_import_get_data(self): +        csv_path = os.path.join(LIB_BASE_PATH, "ishtar_common", "tests", "insee-test.csv") +        return { +            "name": "Test Name", +            "importer_type": self.importer_type.pk, +            "imported_file": SimpleUploadedFile( +                name="insee-test.csv", +                content=open(csv_path, "rb").read(), +            ), +            "encoding": "utf-8", +            "csv_sep": '|', +            "skip_lines": 1, +        } + +    def test_create_import(self): +        # init +        nb_imports = models.Import.objects.count() + +        # no login and simple user login +        for client in (Client(), self.simple_login()): +            response = client.get(reverse("new_import")) +            self.assertRedirects(response, '/') +            data = self._test_create_import_get_data() +            response = client.post(reverse("new_import"), data) +            self.assertRedirects(response, '/') +            self.assertEqual(nb_imports, models.Import.objects.count()) + +        # superuser +        client = self.superuser_login() +        response = client.get(reverse("new_import")) +        self.assertEqual(response.status_code, 200) +        data = self._test_create_import_get_data() +        response = client.post(reverse("new_import"), data) +        self.assertEqual(nb_imports + 1, models.Import.objects.count()) +        self.assertRedirects(response, '/import-list/') + +        imprt = models.Import.objects.get(name="Test Name") +        response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk})) +        self.assertEqual(response.status_code, 200) +        self.assertEqual(imprt.name, "Test Name") +        self.assertEqual(imprt.importer_type, self.importer_type) +        self.assertEqual(imprt.encoding, "utf-8") +        self.assertEqual(imprt.csv_sep, '|') + +        # import user +        for imp in models.Import.objects.all(): +            imp.delete() +        nb_imports = 0 + +        client = self.import_login() +        response = client.get(reverse("new_import")) +        self.assertRedirects(response, '/') + +        self.set_global_permission([], ("add_import", "change_import")) +        client = self.import_login() +        response = client.get(reverse("new_import")) +        self.assertEqual(response.status_code, 200) +        data = self._test_create_import_get_data() +        response = client.post(reverse("new_import"), data) +        self.assertEqual(nb_imports + 1, models.Import.objects.count()) +        self.assertRedirects(response, '/import-list/') + +    def test_edit_import(self): +        # init +        imprt = self.create_import(name="My import") +        imprt2 = self.create_import(name="My-import-2", need_user=False)          data = {              "name": "Test Name", -            "importer_type": importer_type.pk, +            "importer_type": self.importer_type.pk,              "encoding": "utf-8",              "csv_sep": '|',              "skip_lines": 32,          } -        response = c.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data) -        self.assertEqual(response.status_code, 302) +        # no login and simple user login +        for client in (Client(), self.simple_login()): +            response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk})) +            self.assertRedirects(response, '/') +            response = client.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data) +            self.assertRedirects(response, '/') -        response = c.get(reverse("edit_import", kwargs={"pk": imprt.pk})) +        client = self.superuser_login() +        response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk}))          self.assertEqual(response.status_code, 200) -        self.assertContains(response, "Test Name") -        self.assertContains(response, str(importer_type)) -        self.assertContains(response, "utf-8") +        self.assertContains(response, imprt.name) +        self.assertContains(response, imprt.importer_type) +        self.assertContains(response, imprt.imported_file) +        self.assertContains(response, imprt.imported_images) +        self.assertContains(response, imprt.encoding) +        self.assertContains(response, imprt.csv_sep) +        response = client.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data) +        self.assertRedirects(response, '/import-list/')          imprt = models.Import.objects.get(pk=imprt.pk)          self.assertEqual(imprt.name, "Test Name") -        self.assertEqual(imprt.importer_type, importer_type) +        self.assertEqual(imprt.importer_type, self.importer_type)          self.assertEqual(imprt.encoding, "utf-8")          self.assertEqual(imprt.csv_sep, "|")          self.assertEqual(imprt.skip_lines, 32) -    def test_create_import(self): -        username, password, user = create_superuser() -        c = Client() -        c.login(username=username, password=password) +        response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk})) +        self.assertEqual(response.status_code, 200) +        self.assertContains(response, "Test Name") +        self.assertContains(response, str(self.importer_type)) +        self.assertContains(response, "utf-8") -        imp_model = self.create_importer_model() -        importer_type = self.create_importer_type(imp_model) -        csv_path = os.path.join(LIB_BASE_PATH, "ishtar_common", "tests", "insee-test.csv") +        # import user +        imprt.delete() +        imprt2.delete() +        imprt = self.create_import(name="My import") +        imprt2 = self.create_import(name="My-import-2", need_user=False) + +        client = self.import_login() +        response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk})) +        self.assertRedirects(response, '/') + +        # import user global permission +        self.set_global_permission([imprt, imprt2], "change_import") +        client = self.import_login() +        response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk})) +        self.assertEqual(response.status_code, 200) +        self.assertContains(response, imprt.name) -        data = { -            "name": "Test Name", -            "importer_type": importer_type.pk, -            "imported_file": SimpleUploadedFile( -                name="insee-test.csv", -                content=open(csv_path, "rb").read(), -            ), -            "encoding": "utf-8", -            "csv_sep": '|', -            "skip_lines": 1, -        } +        response = client.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data) +        self.assertRedirects(response, '/import-list/') +        imprt = models.Import.objects.get(pk=imprt.pk) +        self.assertEqual(imprt.name, "Test Name") -        response = c.post(reverse("new_import"), data) -        self.assertEqual(response.status_code, 302) +        self.remove_global_permission([imprt, imprt2], ("change_import",)) -        imprt = models.Import.objects.get(name="Test Name") -        response = c.get(reverse("edit_import", kwargs={"pk": imprt.pk})) +        # import user own permission +        imprt.delete() +        imprt2.delete() +        imprt = self.create_import(name="My import") +        imprt2 = self.create_import(name="My-import-2", need_user=False) +        self.set_own_permission([imprt], "change_own_import") +        client = self.import_login() +        response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk}))          self.assertEqual(response.status_code, 200) +        self.assertContains(response, imprt.name) + +        response = client.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data) +        self.assertRedirects(response, '/import-list/') +        imprt = models.Import.objects.get(pk=imprt.pk)          self.assertEqual(imprt.name, "Test Name") -        self.assertEqual(imprt.importer_type, importer_type) -        self.assertEqual(imprt.encoding, "utf-8") -        self.assertEqual(imprt.csv_sep, '|') -    def test_validation_zip_import_image(self): -        username, password, user = create_superuser() -        c = Client() -        c.login(username=username, password=password) +        response = client.get(reverse("edit_import", kwargs={"pk": imprt2.pk})) +        self.assertRedirects(response, '/') -        imp_model = self.create_importer_model() -        importer_type = self.create_importer_type(imp_model) +    def test_validation_zip_import_image(self): +        # init          image_path = os.path.join(LIB_BASE_PATH, "ishtar_common", "tests", "test.png")          data = {              "name": "Import Zip Not Valid Must Fail", -            "importer_type": importer_type.pk, +            "importer_type": self.importer_type.pk,              "encoding": "utf-8",              "csv_sep": "|",              "skip_lines": 1, @@ -2668,13 +2876,130 @@ class ImportTest(BaseImportTest):                  content_type="image/png",              ),          } -        response = c.post(reverse("new_import"), data) + +        # superuser +        client = self.superuser_login() +        response = client.post(reverse("new_import"), data)          expected = str(              _('"Associated images" field must be a valid zip file.')          ).replace('"', '"')          self.assertIn(expected, response.content.decode("utf-8"))          self.assertEqual(response.status_code, 200) +    def test_display_csv(self): +        # init +        imprt = self.create_import() +        imprt2 = self.create_import(name="My-import-2", need_user=False) +        url = reverse("import_display_csv", args=["source", "", imprt.pk]) +        url2 = reverse("import_display_csv", args=["source", "", imprt2.pk]) + +        # no login +        client = Client() +        response = client.get(url) +        self.assertRedirects(response, "/") + +        # superuser +        client = self.superuser_login() +        response = client.get(url) +        self.assertEqual(response.status_code, 200) +        response = client.get(url2) +        self.assertEqual(response.status_code, 200) + +        # simple login +        client = self.simple_login() +        response = client.get(url) +        self.assertRedirects(response, "/") + +        # import user global permission +        self.set_global_permission([imprt, imprt2], "view_import") +        client = self.import_login() +        response = client.get(url) +        self.assertEqual(response.status_code, 200) +        response = client.get(url2) +        self.assertEqual(response.status_code, 200) +        self.remove_global_permission([imprt, imprt2], ("view_import",)) + +        # import user own permission +        self.set_own_permission([imprt], "view_own_import") +        client = self.import_login() +        response = client.get(url) +        self.assertEqual(response.status_code, 200) +        response = client.get(url2) +        self.assertRedirects(response, "/") + +    def test_ignore_errors(self): +        # init +        imprt = self.create_import() +        imprt2 = self.create_import(name="My-import-2", need_user=False) + +        path = os.path.join( +            LIB_BASE_PATH, "ishtar_common", "tests", "error-file.csv" +        ) +        imprt.error_file = SimpleUploadedFile(name="error-file.csv", content=open(path, "rb").read(), content_type="text/csv") +        imprt.save() +        imprt2.error_file = SimpleUploadedFile(name="error-file.csv", content=open(path, "rb").read(), content_type="text/csv") +        imprt2.save() + +        q = models.ImportLineError.objects.filter(import_item=imprt.pk) +        self.assertEqual(q.count(), 2) +        ignored_line = q.all()[0] +        url = reverse("import_ignore_line", args=[ignored_line.pk]) +        q = models.ImportLineError.objects.filter(import_item=imprt2.pk) +        self.assertEqual(q.count(), 2) +        ignored_line2 = q.all()[0] +        url2 = reverse("import_ignore_line", args=[ignored_line2.pk]) + +        # no login +        client = Client() +        response = client.get(url) +        self.assertRedirects(response, "/") + +        # superuser +        client = self.superuser_login() +        response = client.get(url) +        self.assertEqual(response.status_code, 200) +        self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line.pk).ignored, True) +        ignored_line.ignored = False +        ignored_line.save() +        response = client.get(url2) +        self.assertEqual(response.status_code, 200) +        self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line2.pk).ignored, True) +        ignored_line2.ignored = False +        ignored_line2.save() + +        # simple login +        client = self.simple_login() +        response = client.get(url) +        self.assertRedirects(response, "/") + +        # import user global permission +        self.set_global_permission([imprt, imprt2], "change_import") +        client = self.import_login() +        response = client.get(url) +        self.assertEqual(response.status_code, 200) +        self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line.pk).ignored, True) +        ignored_line.ignored = False +        ignored_line.save() +        response = client.get(url2) +        self.assertEqual(response.status_code, 200) +        self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line2.pk).ignored, True) +        ignored_line2.ignored = False +        ignored_line2.save() +        self.remove_global_permission([imprt, imprt2], ("change_import",)) + +        # import user own permission +        self.set_own_permission([imprt], "change_own_import") +        client = self.import_login() +        response = client.get(url) +        self.assertEqual(response.status_code, 200) +        self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line.pk).ignored, True) +        self.assertEqual(response.status_code, 200) +        response = client.get(url2) +        self.assertEqual(response.status_code, 404) +        self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line2.pk).ignored, False) + + +class ImportTest(BaseImportTest):      def test_archive_import(self):          imprt = self.create_import()          with open(imprt.imported_file.path, "r") as f: @@ -2796,19 +3121,6 @@ class ImportTest(BaseImportTest):                  field = getattr(sub_import, k)                  self.assertTrue(field, "{} is missing in unarchive".format(k)) -    def test_display_csv(self): -        imprt = self.create_import() -        username, password, __ = create_user("test", "test") -        c = Client() -        c.login(username=username, password=password) -        url = "import_display_csv" -        response = c.get(reverse(url, args=["source", "", imprt.pk]), ) -        self.assertEqual(response.status_code, 404) -        username, password, __ = create_superuser() -        c.login(username=username, password=password) -        response = c.get(reverse(url, args=["source", "", imprt.pk])) -        self.assertEqual(response.status_code, 200) -      def test_delete_related(self):          town = models.Town.objects.create(name="my-test")          self.assertEqual(models.Town.objects.filter(name="my-test").count(), 1) diff --git a/ishtar_common/tests/error-file.csv b/ishtar_common/tests/error-file.csv new file mode 100644 index 000000000..007d2d3c2 --- /dev/null +++ b/ishtar_common/tests/error-file.csv @@ -0,0 +1,3 @@ +"ligne","colonne","erreur" +"2","1","Valeur requise" +"3","1","Valeur requise" diff --git a/ishtar_common/urls.py b/ishtar_common/urls.py index 52e1324b2..c3d614ac6 100644 --- a/ishtar_common/urls.py +++ b/ishtar_common/urls.py @@ -210,62 +210,62 @@ urlpatterns = [      ),      url(          r"^import-new/$", -        check_rights(["change_import"])(views.NewImportView.as_view()), +        check_rights(["add_import", "add_own_import"])(views.NewImportView.as_view()),          name="new_import",      ),      url(          r"^import-edit/(?P<pk>[0-9]+)/$", -        check_rights(["change_import"])(views.EditImportView.as_view()), +        check_rights(["change_import", "change_own_import"])(views.EditImportView.as_view()),          name="edit_import",      ),      url(          r"^import-new-gis/$", -        check_rights(["change_import"])(views.NewImportGISView.as_view()), +        check_rights(["add_import", "add_own_import"])(views.NewImportGISView.as_view()),          name="new_import_gis",      ),      url(          r"^import-new-group/$", -        check_rights(["change_import"])(views.NewImportGroupView.as_view()), +        check_rights(["add_import", "add_own_import"])(views.NewImportGroupView.as_view()),          name="new_import_group",      ),      url(          r"^import-list/$", -        check_rights(["change_import"])(views.ImportListView.as_view()), +        check_rights(["change_import", "change_own_import"])(views.ImportListView.as_view()),          name="current_imports",      ),      url(          r"^import-list-table/$", -        check_rights(["change_import"])(views.ImportListTableView.as_view()), +        check_rights(["change_import", "change_own_import"])(views.ImportListTableView.as_view()),          name="current_imports_table",      ),      url(         r"^import-get-status/$", -        check_rights(["change_import"])(views.import_get_status), +        check_rights(["change_import", "change_own_import"])(views.import_get_status),          name="import_get_status",      ),      url(          r"^import-list-old/$", -        check_rights(["change_import"])(views.ImportOldListView.as_view()), +        check_rights(["change_import", "change_own_import"])(views.ImportOldListView.as_view()),          name="old_imports",      ),      url(          r"^import-delete/(?P<pk>[0-9]+)/$", -        views.ImportDeleteView.as_view(), +        check_rights(["delete_import", "delete_own_import"])(views.ImportDeleteView.as_view()),          name="import_delete",      ),      url(          r"^import-group-delete/(?P<pk>[0-9]+)/$", -        views.ImportGroupDeleteView.as_view(), +        check_rights(["delete_import", "delete_own_import"])(views.ImportGroupDeleteView.as_view()),          name="import_group_delete",      ),      url(          r"^import-link-unmatched/(?P<pk>[0-9]+)/$", -        views.ImportLinkView.as_view(), +        check_rights(["change_import", "change_own_import"])(views.ImportMatchView.as_view()),          name="import_link_unmatched",      ),      url(          r"^import-csv-view/(?P<target>source|result|match|error)/(?P<group>group\-)?(?P<pk>[0-9]+)/$", -        views.ImportCSVView.as_view(), +        check_rights(["view_import", "view_own_import"])(views.ImportCSVView.as_view()),          name="import_display_csv",      ),      url( @@ -281,12 +281,12 @@ urlpatterns = [      ),      url(          r"^import-pre-form/(?P<import_id>[0-9]+)/$", -        check_rights(["change_import"])(views.ImportPreFormView.as_view()), +        check_rights(["change_import", "change_own_import"])(views.ImportPreFormView.as_view()),          name="import_pre_import_form",      ),      url(          r"^import-ignore-line/(?P<line_id>[0-9]+)/$", -        views.line_error, +        check_rights(["change_import", "change_own_import"])(views.line_error),          name="import_ignore_line",      ),      url(r"^profile(?:/(?P<pk>[0-9]+))?/$", views.ProfileEdit.as_view(), name="profile"), diff --git a/ishtar_common/views.py b/ishtar_common/views.py index b91f3202c..cd22d62eb 100644 --- a/ishtar_common/views.py +++ b/ishtar_common/views.py @@ -1490,7 +1490,28 @@ class NewImportView(BaseImportView, CreateView):      page_name = _("Import: create (table)") -class EditImportView(BaseImportView, UpdateView): +class ImportPermissionMixin: +    permission_full = "change_import" +    permission_own = "change_own_import" + +    def dispatch(self, request, *args, **kwargs): +        import_pk = self.kwargs["pk"] +        user = request.user +        if not user or not user.ishtaruser: +            return redirect("/") +        model = models.ImportGroup if self.kwargs.get("group", None) else models.Import +        q = model.query_can_access(user, perm=self.permission_full).filter(pk=import_pk) +        if not user.is_superuser and not user.ishtaruser.has_right(self.permission_full): +            if not user.ishtaruser.has_right(self.permission_own): +                return redirect("/") +            q = q.filter(Q(importer_type__users__pk=user.ishtaruser.pk)) +        if not q.count(): +            return redirect("/") +        returned = super().dispatch(request, *args, **kwargs) +        return returned + + +class EditImportView(ImportPermissionMixin, BaseImportView, UpdateView):      page_name = _("Import: edit (table)") @@ -1544,6 +1565,21 @@ class ImportPreFormView(IshtarMixin, LoginRequiredMixin, FormView):          return HttpResponseRedirect(self.get_success_url()) +def get_permissions_for_actions(user, imprt, owns, can_edit_all, can_delete_all, can_edit_own, can_delete_own): +    can_edit, can_delete = False, False +    is_own = None +    if can_edit_own or can_delete_own:  # need to check owner +        if imprt.importer_type_id not in owns: +            # "is_own" only query once by importer type +            owns[imprt.importer_type.pk] = imprt.importer_type.is_own(user.ishtaruser) +        is_own = owns[imprt.importer_type_id] +    if can_edit_all or (can_edit_own and is_own): +        can_edit = True +    if can_delete_all or (can_delete_own and is_own): +        can_delete = True +    return can_edit, can_delete + +  class ImportListView(IshtarMixin, LoginRequiredMixin, ListView):      template_name = "ishtar/import_list.html"      model = models.Import @@ -1555,15 +1591,31 @@ class ImportListView(IshtarMixin, LoginRequiredMixin, ListView):      def get_queryset(self):          user = self.request.user -        if not user.pk: +        if not user.pk or not user.ishtaruser:              raise Http404() -        q1 = self._queryset_filter(self.model.query_can_access(user)) +        q1 = self._queryset_filter(self.model.query_can_access(user, "change_import"))          q1 = q1.filter(group__isnull=True).order_by("-end_date", "-creation_date", "-pk") -        q2 = self._queryset_filter(models.ImportGroup.query_can_access(user)) +        q2 = self._queryset_filter(models.ImportGroup.query_can_access(user, "change_import"))          q2 = q2.order_by("-end_date", "-creation_date", "-pk") -        return list(reversed(sorted(list(q1) + list(q2), key=lambda x: (x.end_date or x.creation_date)))) +        values = list(reversed(sorted(list(q1) + list(q2), key=lambda x: (x.end_date or x.creation_date)))) +        can_edit_all, can_delete_all, can_edit_own, can_delete_own = models.Import.get_permissions_for_actions( +            user, self.request.session +        ) +        imports = [] +        owns = {} +        for imprt in values: +            can_edit, can_delete = get_permissions_for_actions( +                user, imprt, owns, can_edit_all, can_delete_all, can_edit_own, can_delete_own +            ) +            imprt.action_list = imprt.get_actions(can_edit=can_edit, can_delete=can_delete) +            imports.append(imprt) +        return imports      def post(self, request, *args, **kwargs): +        can_edit_all, can_delete_all, can_edit_own, can_delete_own = models.Import.get_permissions_for_actions( +            request.user, request.session +        ) +        owns = {}          for field in request.POST:              if not field.startswith("import-action-") or not request.POST[field]:                  continue @@ -1576,28 +1628,26 @@ class ImportListView(IshtarMixin, LoginRequiredMixin, ListView):                  imprt = model.objects.get(pk=int(field.split("-")[-1]))              except (models.Import.DoesNotExist, ValueError):                  continue -            if not self.request.user.is_superuser: -                # user can only edit his own imports -                user = models.IshtarUser.objects.get(pk=self.request.user.pk) -                if imprt.user != user: -                    continue +            can_edit, can_delete = get_permissions_for_actions( +                request.user, imprt, owns, can_edit_all, can_delete_all, can_edit_own, can_delete_own +            )              action = request.POST[field] -            if action == "D": +            if can_delete and action == "D":                  url = "import_group_delete" if is_group else "import_delete"                  return HttpResponseRedirect(                      reverse(url, kwargs={"pk": imprt.pk})                  ) -            elif action == "ED": +            elif can_edit and action == "ED":                  url = "edit_import_group" if is_group else "edit_import"                  return HttpResponseRedirect(                      reverse(url, kwargs={"pk": imprt.pk})                  ) -            elif action == "A": +            elif can_edit and action == "A":                  imprt.initialize(                      user=self.request.user.ishtaruser,                      session_key=request.session.session_key,                  ) -            elif action == "I": +            elif can_edit and action == "I":                  if settings.USE_BACKGROUND_TASK:                      imprt.delayed_importation(request, request.session.session_key)                  else: @@ -1612,12 +1662,12 @@ class ImportListView(IshtarMixin, LoginRequiredMixin, ListView):                              f"{imprt} - {e}",                              "warning",                          ) -            elif action == "CH": +            elif can_edit and action == "CH":                  if settings.USE_BACKGROUND_TASK:                      imprt.delayed_check_modified(request.session.session_key)                  else:                      imprt.check_modified() -            elif action == "IS": +            elif can_edit and action == "IS":                  if imprt.current_line is None:                      imprt.current_line = imprt.skip_lines                      imprt.save() @@ -1626,19 +1676,37 @@ class ImportListView(IshtarMixin, LoginRequiredMixin, ListView):                          "import_step_by_step", args=[imprt.pk, imprt.current_line + 1]                      )                  ) -            elif action == "AC": +            elif can_edit and action == "AC":                  imprt.archive() -            elif action in ("F", "FE"): +            elif can_edit and action in ("F", "FE"):                  imprt.unarchive(action)          return HttpResponseRedirect(reverse(self.current_url))      def get_context_data(self, **kwargs):          dct = super().get_context_data(**kwargs) +        add_import_perm = self.request.user.ishtaruser.has_right("add_import", session=self.request.session) +        import_type_table = models.ImporterType.objects.filter(available=True, is_import=True, type='tab') +        import_type_gis = models.ImporterType.objects.filter(available=True, is_import=True, type='gis') +        import_type_group = models.ImporterGroup.objects.filter(available=True) +        if not add_import_perm and self.request.user.ishtaruser.has_right("add_own_import", +                                                                          session=self.request.session): +            import_type_table = import_type_table.filter(users__pk=self.request.user.ishtaruser.pk) +            import_type_gis = import_type_gis.filter(users__pk=self.request.user.ishtaruser.pk) +            import_type_group = import_type_group.filter(users__pk=self.request.user.ishtaruser.pk) +            add_import_perm = True +        has_import_table, has_import_gis, has_import_group = False, False, False +        if add_import_perm: +            if import_type_table.count(): +                has_import_table = True +            if import_type_gis.count(): +                has_import_gis = True +            if import_type_group.count(): +                has_import_group = True          dct.update({ -            "autorefresh_available": settings.USE_BACKGROUND_TASK, -            "has_import_table": models.ImporterType.objects.filter(available=True, is_import=True, type='tab').count(), -            "has_import_gis": models.ImporterType.objects.filter(available=True, is_import=True, type='gis').count(), -            "has_import_group": models.ImporterGroup.objects.filter(available=True).count(), +            "has_import_table": has_import_table, +            "has_import_gis": has_import_gis, +            "has_import_group": has_import_group, +            "can_create_import": has_import_table or has_import_gis or has_import_group,          })          return dct @@ -2083,14 +2151,23 @@ def import_get_status(request, current_right=None):                  "number_of_line": item.number_of_line,                  "progress_percent": item.progress_percent,              }) -        item_dct["actions"] = [(key, str(lbl)) for key, lbl in item.get_actions()] +        can_edit_all, can_delete_all, can_edit_own, can_delete_own = models.Import.get_permissions_for_actions( +            request.user +        ) +        can_edit, can_delete = get_permissions_for_actions( +            request.user, item, {}, can_edit_all, can_delete_all, can_edit_own, can_delete_own +        ) +        item_dct["actions"] = [ +            (key, str(lbl)) +            for key, lbl in item.get_actions(can_edit=can_edit, can_delete=can_delete) +        ]          response[key].append(item_dct)      data = json.dumps(response)      return HttpResponse(data, content_type="application/json") -class ImportLinkView(IshtarMixin, LoginRequiredMixin, ModelFormSetView): +class ImportMatchView(ImportPermissionMixin, IshtarMixin, LoginRequiredMixin, ModelFormSetView):      template_name = "ishtar/formset_import_match.html"      model = models.TargetKey      page_name = _("Link unmatched items") @@ -2100,9 +2177,11 @@ class ImportLinkView(IshtarMixin, LoginRequiredMixin, ModelFormSetView):      form_class = forms.TargetKeyForm      formset_class = forms.TargetKeyFormset      max_fields = 250 +    permission_full = "change_import" +    permission_own = "change_own_import"      def get_formset_kwargs(self): -        kwargs = super(ImportLinkView, self).get_formset_kwargs() +        kwargs = super().get_formset_kwargs()          kwargs["user"] = self.request.user          return kwargs @@ -2126,25 +2205,29 @@ class ImportLinkView(IshtarMixin, LoginRequiredMixin, ModelFormSetView):          return reverse("import_link_unmatched", args=[self.kwargs["pk"]]) -class ImportDeleteView(IshtarMixin, LoginRequiredMixin, DeleteView): +class ImportDeleteView(ImportPermissionMixin, IshtarMixin, LoginRequiredMixin, DeleteView):      template_name = "ishtar/import_delete.html"      model = models.Import      page_name = _("Delete import") +    permission_full = "delete_import" +    permission_own = "delete_own_import"      def get_success_url(self):          return reverse("current_imports") -class ImportGroupDeleteView(IshtarMixin, LoginRequiredMixin, DeleteView): +class ImportGroupDeleteView(ImportPermissionMixin, IshtarMixin, LoginRequiredMixin, DeleteView):      template_name = "ishtar/import_delete.html"      model = models.ImportGroup      page_name = _("Delete import") +    permission_full = "delete_import" +    permission_own = "delete_own_import"      def get_success_url(self):          return reverse("current_imports") -class ImportCSVView(IshtarMixin, LoginRequiredMixin, TemplateView): +class ImportCSVView(ImportPermissionMixin, IshtarMixin, LoginRequiredMixin, TemplateView):      template_name = "ishtar/blocks/view_import_csv.html"      ATTRIBUTES = {          "source": "get_imported_values", @@ -2158,13 +2241,15 @@ class ImportCSVView(IshtarMixin, LoginRequiredMixin, TemplateView):          "result": ("fa fa-th", _("Result")) ,          "match": ("fa fa-arrows-h", _("Match")),      } +    permission_full = "view_import" +    permission_own = "view_own_import"      def get(self, request, *args, **kwargs):          user = self.request.user          if not user.pk:              raise Http404()          model = models.ImportGroup if kwargs.get("group", None) else models.Import -        q = model.query_can_access(self.request.user).filter(pk=kwargs.get("pk", -1)) +        q = model.query_can_access(self.request.user, perm=self.permission_full).filter(pk=kwargs.get("pk", -1))          if not q.count():              raise Http404()          self.import_item = q.all()[0] @@ -2208,7 +2293,7 @@ class ImportCSVView(IshtarMixin, LoginRequiredMixin, TemplateView):          return data -def line_error(request, line_id): +def line_error(request, line_id, current_right=None):      """      Set or unset ignored state of a csv error file      """ @@ -2219,7 +2304,7 @@ def line_error(request, line_id):      if not q.count():          return      line = q.all()[0] -    q = models.Import.query_can_access(request.user).filter(pk=line.import_item_id) +    q = models.Import.query_can_access(request.user, perm="change_import").filter(pk=line.import_item_id)      if not q.count():          raise Http404()      line.ignored = not line.ignored  | 
