summaryrefslogtreecommitdiff
path: root/ishtar_common
diff options
context:
space:
mode:
authorÉtienne Loks <etienne.loks@iggdrasil.net>2023-10-26 17:03:41 +0200
committerÉtienne Loks <etienne.loks@iggdrasil.net>2024-04-16 16:38:32 +0200
commitf4f482cd4074898f5344a3a078e27800bbd060fd (patch)
tree46a317f0f5de7b0177206ac5b965be794ff2b2af /ishtar_common
parente008dd87b2eafd88cec3d75d0b3b4c92ce891f23 (diff)
downloadIshtar-f4f482cd4074898f5344a3a078e27800bbd060fd.tar.bz2
Ishtar-f4f482cd4074898f5344a3a078e27800bbd060fd.zip
✨ refactoring import permissions
Diffstat (limited to 'ishtar_common')
-rw-r--r--ishtar_common/fixtures/initial_data-fr.json11
-rw-r--r--ishtar_common/forms_common.py33
-rw-r--r--ishtar_common/migrations/0231_default_mandatory_keys.py38
-rw-r--r--ishtar_common/migrations/0231_default_mandatory_keys_import_permissions.py116
-rw-r--r--ishtar_common/models_common.py3
-rw-r--r--ishtar_common/models_imports.py67
-rw-r--r--ishtar_common/templates/ishtar/import_list.html4
-rw-r--r--ishtar_common/templates/ishtar/import_table.html6
-rw-r--r--ishtar_common/tests.py466
-rw-r--r--ishtar_common/tests/error-file.csv3
-rw-r--r--ishtar_common/urls.py28
-rw-r--r--ishtar_common/views.py147
12 files changed, 730 insertions, 192 deletions
diff --git a/ishtar_common/fixtures/initial_data-fr.json b/ishtar_common/fixtures/initial_data-fr.json
index f3c539d3c..980ed3d1a 100644
--- a/ishtar_common/fixtures/initial_data-fr.json
+++ b/ishtar_common/fixtures/initial_data-fr.json
@@ -301,7 +301,16 @@
"Auteurs : lecture"
],
[
- "Import : ajout/modification/suppression"
+ "Imports : lecture"
+ ],
+ [
+ "Imports : modification"
+ ],
+ [
+ "Imports : suppression"
+ ],
+ [
+ "Imports : ajout"
]
]
}
diff --git a/ishtar_common/forms_common.py b/ishtar_common/forms_common.py
index dd3a83971..0d739fbfb 100644
--- a/ishtar_common/forms_common.py
+++ b/ishtar_common/forms_common.py
@@ -246,19 +246,31 @@ class BaseImportForm(IshtarForm, forms.ModelForm):
super().__init__(*args, **kwargs)
self.fields["imported_file"].required = True
self._filter_group(user)
- self._filter_importer_type()
+ self._filter_importer_type(user)
if "imported_images" in self.fields:
self.fields["imported_images"].validators = [file_size_validator]
self.fields["imported_file"].validators = [file_size_validator]
self._post_init()
- def _filter_importer_type(self):
- self.fields["importer_type"].choices = [("", "--")] + [
- (imp.pk, imp.name)
- for imp in models.ImporterType.objects.filter(
+ def _filter_importer_type_query(self, q, user):
+ if user.is_superuser or user.ishtaruser.has_right("add_import"):
+ return q
+ if not user.ishtaruser.has_right("add_own_import"):
+ self.fields["importer_type"].choices = [("", "--")]
+ return
+ q = q.filter(users__pk=user.ishtaruser.pk)
+ return q
+
+ def _filter_importer_type(self, user):
+ q = models.ImporterType.objects.filter(
available=True, is_import=True, type=self.importer_type
- )
+ )
+ q = self._filter_importer_type_query(q, user)
+ if not q:
+ return
+ self.fields["importer_type"].choices = [("", "--")] + [
+ (imp.pk, imp.name) for imp in q.all()
]
def _filter_group(self, user):
@@ -511,10 +523,13 @@ class NewImportGroupForm(NewImportForm):
"imported_images": FormHeader(_("Documents/Images")),
}
- def _filter_importer_type(self):
+ def _filter_importer_type(self, user):
+ q = models.ImporterGroup.objects.filter(available=True)
+ q = self._filter_importer_type_query(q, user)
+ if not q:
+ return
self.fields["importer_type"].choices = [("", "--")] + [
- (imp.pk, imp.name)
- for imp in models.ImporterGroup.objects.filter(available=True)
+ (imp.pk, imp.name) for imp in q.all()
]
def _filter_group(self, user):
diff --git a/ishtar_common/migrations/0231_default_mandatory_keys.py b/ishtar_common/migrations/0231_default_mandatory_keys.py
deleted file mode 100644
index cd245322a..000000000
--- a/ishtar_common/migrations/0231_default_mandatory_keys.py
+++ /dev/null
@@ -1,38 +0,0 @@
-# Generated by Django 2.2.24 on 2023-09-18 17:05
-
-from django.db import migrations
-
-EXCLUDE_LIST = [
- "-",
- "auto_external_id",
- "spatial_reference_system",
- "public_domain",
-]
-
-FULL_COPY_LIST = [
- "scientist__attached_to",
-]
-
-
-def migrate(apps, __):
- ImporterDefault = apps.get_model('ishtar_common', 'ImporterDefault')
- for default in ImporterDefault.objects.all():
- if default.target not in EXCLUDE_LIST:
- req = default.target
- if req not in FULL_COPY_LIST:
- req = req.split("__")[0]
- if req.endswith("_type") or req.endswith("_types"):
- continue
- default.required_fields = req
- default.save()
-
-
-class Migration(migrations.Migration):
-
- dependencies = [
- ('ishtar_common', '0230_auto_20231024_1045'),
- ]
-
- operations = [
- migrations.RunPython(migrate),
- ]
diff --git a/ishtar_common/migrations/0231_default_mandatory_keys_import_permissions.py b/ishtar_common/migrations/0231_default_mandatory_keys_import_permissions.py
new file mode 100644
index 000000000..120711f09
--- /dev/null
+++ b/ishtar_common/migrations/0231_default_mandatory_keys_import_permissions.py
@@ -0,0 +1,116 @@
+# Generated by Django 2.2.24 on 2023-09-18 17:05
+
+from django.db import migrations
+
+COLOR_WARNING = "\033[93m"
+COLOR_ENDC = "\033[0m"
+
+EXCLUDE_LIST = [
+ "-",
+ "auto_external_id",
+ "spatial_reference_system",
+ "public_domain",
+]
+
+FULL_COPY_LIST = [
+ "scientist__attached_to",
+]
+
+GROUPS = [
+ [
+ "Imports : lecture",
+ "view_import",
+ "ishtar_common",
+ "import"
+ ],
+ [
+ "Imports rattachés : lecture",
+ "view_own_import",
+ "ishtar_common",
+ "import"
+ ],
+ [
+ "Imports : modification",
+ "change_import",
+ "ishtar_common",
+ "import"
+ ],
+ [
+ "Imports rattachés : modification",
+ "change_own_import",
+ "ishtar_common",
+ "import"
+ ],
+ [
+ "Imports : suppression",
+ "delete_import",
+ "ishtar_common",
+ "import"
+ ],
+ [
+ "Imports rattachés : suppression",
+ "delete_own_import",
+ "ishtar_common",
+ "import"
+ ],
+ [
+ "Imports : ajout",
+ "add_import",
+ "ishtar_common",
+ "import"
+ ],
+ [
+ "Imports rattachés : ajout",
+ "add_own_import",
+ "ishtar_common",
+ "import"
+ ],
+]
+
+
+def migrate(apps, __):
+ ImporterDefault = apps.get_model('ishtar_common', 'ImporterDefault')
+ for default in ImporterDefault.objects.all():
+ if default.target not in EXCLUDE_LIST:
+ req = default.target
+ if req not in FULL_COPY_LIST:
+ req = req.split("__")[0]
+ if req.endswith("_type") or req.endswith("_types"):
+ continue
+ default.required_fields = req
+ default.save()
+
+ print("")
+ ProfileType = apps.get_model('ishtar_common', 'ProfileType')
+ q = ProfileType.objects.filter(txt_idx="administrator")
+ administrator = None
+ if not q.count():
+ print(COLOR_WARNING + "** No administrator profile found. **" + COLOR_ENDC)
+ else:
+ administrator = q.all()[0]
+
+ Permission = apps.get_model("auth", "Permission")
+ Group = apps.get_model("auth", "Group")
+ ContentType = apps.get_model("contenttypes", "ContentType")
+ for name, codename, app, model in GROUPS:
+ ct, __ = ContentType.objects.get_or_create(app_label=app, model=model)
+ perm, __ = Permission.objects.get_or_create(
+ codename=codename, defaults={"name": name, "content_type": ct}
+ )
+ group, __ = Group.objects.get_or_create(name=name)
+ group.permissions.add(perm)
+ if administrator:
+ administrator.groups.add(group)
+
+ print(COLOR_WARNING + "** Verify import permissions in profiles **" + COLOR_ENDC)
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('ishtar_common', '0230_auto_20231024_1045'),
+ ]
+
+ operations = [
+ migrations.RunPython(migrate),
+ ]
diff --git a/ishtar_common/models_common.py b/ishtar_common/models_common.py
index 43677bca4..dcb9d030d 100644
--- a/ishtar_common/models_common.py
+++ b/ishtar_common/models_common.py
@@ -1111,9 +1111,10 @@ class Imported(models.Model):
if not user.ishtaruser:
return []
q = getattr(self, key)
+ lst = []
if user.is_superuser or user.ishtaruser.has_right("view_import"):
lst = list(q.all())
- else:
+ elif user.ishtaruser.has_right("view_own_import"):
lst = q.filter(Q(user=user.ishtaruser) | Q(importer_type__users__pk=user.ishtaruser.pk))
new_lst = []
for imprt in lst:
diff --git a/ishtar_common/models_imports.py b/ishtar_common/models_imports.py
index 77dad558e..2967c18aa 100644
--- a/ishtar_common/models_imports.py
+++ b/ishtar_common/models_imports.py
@@ -225,6 +225,10 @@ class ImporterType(models.Model):
def __str__(self):
return self.name
+ @classmethod
+ def is_own(cls, ishtar_user):
+ return bool(cls.objects.filter(users__pk=ishtar_user.pk).count())
+
@property
def type_label(self):
if self.type in IMPORT_TYPES_DICT:
@@ -445,6 +449,10 @@ class ImporterGroup(models.Model):
def __str__(self):
return self.name
+ @classmethod
+ def is_own(cls, ishtar_user):
+ return bool(cls.objects.filter(users__pk=ishtar_user.pk).count())
+
@property
def importer_types_label(self) -> str:
return " ; ".join([imp.importer_type.name for imp in self.importer_types.all()])
@@ -1413,18 +1421,35 @@ class BaseImport(models.Model, OwnPerms, SheetItem):
abstract = True
@classmethod
- def query_can_access(cls, user):
+ def get_permissions_for_actions(cls, user, session):
+ if not hasattr(user, "ishtaruser") or not user.ishtaruser:
+ return False, False, False, False
+ can_edit_all, can_delete_all, can_edit_own, can_delete_own = False, False, False, False
+ if user.is_superuser:
+ can_edit_all = True
+ can_delete_all = True
+ if user.ishtaruser.has_right("change_import", session=session):
+ can_edit_all = True
+ elif user.ishtaruser.has_right("change_own_import", session=session):
+ can_edit_own = True
+ if user.ishtaruser.has_right("delete_import", session=session):
+ can_delete_all = True
+ elif user.ishtaruser.has_right("delete_own_import", session=session):
+ can_delete_own = True
+ return can_edit_all, can_delete_all, can_edit_own, can_delete_own
+
+ @classmethod
+ def query_can_access(cls, user, perm="view_import"):
"""
Filter the query to check access permissions
:param user: User instance
:return: import query
"""
q = cls.objects
- if user.is_superuser:
+ if user.is_superuser or (hasattr(user, "ishtaruser") and user.ishtaruser and
+ user.ishtaruser.has_right(perm)):
return q
- IshtarUser = apps.get_model("ishtar_common", "IshtarUser")
- ishtar_user = IshtarUser.objects.get(pk=user.pk)
- q = q.filter(Q(user=ishtar_user) | Q(importer_type__users__pk=ishtar_user.pk))
+ q = q.filter(Q(importer_type__users__pk=user.ishtaruser.pk))
return q
@classmethod
@@ -1557,25 +1582,28 @@ class ImportGroup(BaseImport):
return ""
return IMPORT_GROUP_STATE_DCT[str(self.state)]
- def get_actions(self):
+ def get_actions(self, can_edit=False, can_delete=False):
"""
Get available action relevant with the current status
"""
actions = []
- if self.state == "C":
+ if not can_edit and not can_delete:
+ return actions
+ if can_edit and self.state == "C":
actions.append(("A", _("Analyse")))
- if self.state == "A":
+ if can_edit and self.state == "A":
actions.append(("A", _("Re-analyse")))
if not any(-1 for imp in self.import_list() if not imp.pre_import_form_is_valid):
actions.append(("I", _("Launch import")))
- if self.state in ("F", "FE"):
+ if can_edit and self.state in ("F", "FE"):
actions.append(("A", _("Re-analyse")))
actions.append(("I", _("Re-import")))
actions.append(("AC", _("Archive")))
- if self.state == "AC":
+ if can_edit and self.state == "AC":
state = "FE" if any(1 for imp in self.imports.all() if imp.error_file) else "F"
actions.append((state, _("Unarchive")))
- actions.append(("D", _("Delete")))
+ if can_delete:
+ actions.append(("D", _("Delete")))
return actions
def initialize(self, user=None, session_key=None):
@@ -2203,16 +2231,18 @@ class Import(BaseImport):
idx_line
) in self.imported_line_numbers.split(",")
- def get_actions(self):
+ def get_actions(self, can_edit=False, can_delete=False):
"""
Get available action relevant with the current status
"""
IshtarSiteProfile = apps.get_model("ishtar_common", "IshtarSiteProfile")
profile = IshtarSiteProfile.get_current_profile()
actions = []
- if self.state == "C":
+ if not can_edit and not can_delete:
+ return actions
+ if can_edit and self.state == "C":
actions.append(("A", _("Analyse")))
- if self.state in ("A", "PI"):
+ if can_edit and self.state in ("A", "PI"):
actions.append(("A", _("Re-analyse")))
if self.pre_import_form_is_valid:
actions.append(("I", _("Launch import")))
@@ -2222,7 +2252,7 @@ class Import(BaseImport):
actions.append(("CH", _("Re-check for changes")))
else:
actions.append(("CH", _("Check for changes")))
- if self.state in ("F", "FE"):
+ if can_edit and self.state in ("F", "FE"):
actions.append(("A", _("Re-analyse")))
actions.append(("I", _("Re-import")))
if profile.experimental_feature:
@@ -2232,12 +2262,13 @@ class Import(BaseImport):
else:
actions.append(("CH", _("Check for changes")))
actions.append(("AC", _("Archive")))
- if self.state == "AC":
+ if can_edit and self.state == "AC":
state = "FE" if self.error_file else "F"
actions.append((state, _("Unarchive")))
- if self.state in ("C", "A"):
+ if can_delete and self.state in ("C", "A"):
actions.append(("ED", _("Edit")))
- actions.append(("D", _("Delete")))
+ if can_delete:
+ actions.append(("D", _("Delete")))
return actions
@property
diff --git a/ishtar_common/templates/ishtar/import_list.html b/ishtar_common/templates/ishtar/import_list.html
index 35821c5bf..ec5e714de 100644
--- a/ishtar_common/templates/ishtar/import_list.html
+++ b/ishtar_common/templates/ishtar/import_list.html
@@ -16,7 +16,7 @@
{% endblock %}
{% block content %}
-<div class="text-center m-3">
+{% if can_create_import %}<div class="text-center m-3">
{% if has_import_table %}<a href="{% url 'new_import' %}" class="btn btn-success">
<i class="fa fa-plus"></i> {% trans 'import (table)' %}
</a>{% endif %}
@@ -26,7 +26,7 @@
{% if has_import_group %}<a href="{% url 'new_import_group' %}" class="btn btn-success">
<i class="fa fa-plus"></i> {% trans 'import (group)' %}
</a>{% endif %}
-</div>
+</div>{% endif %}
<div id="import-container">
{% include "ishtar/import_table.html" %}
</div>
diff --git a/ishtar_common/templates/ishtar/import_table.html b/ishtar_common/templates/ishtar/import_table.html
index 08c949654..56e73c6f0 100644
--- a/ishtar_common/templates/ishtar/import_table.html
+++ b/ishtar_common/templates/ishtar/import_table.html
@@ -78,14 +78,18 @@
{{import.status}}
</td>
<td>
+ {% if import.action_list %}
<select class="form-control"
id='import-action-{{import.import_id}}'
name='import-action-{{import.import_id}}'>
<option value=''>--------</option>
- {% for action, lbl in import.get_actions %}
+ {% for action, lbl in import.action_list %}
<option value='{{action}}'>{{lbl}}</option>
{% endfor%}
</select>
+ {% else %}
+ &ndash;
+ {% endif %}
</td>
<td><ul class="simple table-import-files">
{% if import.get_imported_values %}<li class="p-1">
diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py
index 979b38395..f14872295 100644
--- a/ishtar_common/tests.py
+++ b/ishtar_common/tests.py
@@ -37,7 +37,7 @@ from unittest.runner import TextTestRunner, TextTestResult
from django.apps import apps
from django.conf import settings
-from django.contrib.auth.models import User, Permission
+from django.contrib.auth.models import User, Permission, Group
from django.contrib.contenttypes.models import ContentType
from django.contrib.gis.geos import (
GEOSGeometry,
@@ -2474,10 +2474,16 @@ class ShortMenuTest(TestCase):
class BaseImportTest(TestCase):
-
- def create_import(self):
- create_user()
+ def setUp(self):
imp_model = models.ImporterModel.objects.create(
+ klass="ishtar_common.models.Parcel", name="Parcel"
+ )
+ self.importer_type = models.ImporterType.objects.create(associated_models=imp_model)
+
+ def create_import(self, name="My Import", need_user=True):
+ if need_user:
+ create_user()
+ imp_model, __ = models.ImporterModel.objects.get_or_create(
klass="ishtar_common.models.Person", name="Person"
)
importer_type = models.ImporterType.objects.create(associated_models=imp_model)
@@ -2490,6 +2496,7 @@ class BaseImportTest(TestCase):
with open(dest, "rb") as f:
mcc_operation_file = DjangoFile(f)
imprt = models.Import.objects.create(
+ name=name,
user=models.IshtarUser.objects.all()[0],
importer_type=importer_type,
imported_file=mcc_operation_file,
@@ -2554,7 +2561,7 @@ class BaseImportTest(TestCase):
"csv_sep": ",",
}
form = forms_common.NewImportGroupForm(
- data=post_dict, files=file_dict, user=ishtar_user
+ data=post_dict, files=file_dict, user=ishtar_user.user_ptr
)
self.assertTrue(form.is_valid())
impt = form.save(ishtar_user)
@@ -2563,102 +2570,303 @@ class BaseImportTest(TestCase):
self.init_group_import(impt)
return impt
- def create_importer_model(self):
- return models.ImporterModel.objects.create(
- klass="ishtar_common.models.Parcel", name="Parcel"
+
+class ImportTestInterface(BaseImportTest):
+
+ def setUp(self):
+ super().setUp()
+ self.super_username, self.super_password, __ = create_superuser()
+ self.simple_username, self.simple_password, __ = create_user()
+ self.import_username, self.import_password, self.import_user = create_user(
+ "import-user", "password"
)
+ self.import_group = Group.objects.create(name="Import")
+ self.import_user.groups.add(self.import_group)
- def create_importer_type(self, imp_model):
- return models.ImporterType.objects.create(associated_models=imp_model)
+ def superuser_login(self):
+ client = Client()
+ client.login(username=self.super_username, password=self.super_password)
+ return client
+ def simple_login(self):
+ client = Client()
+ client.login(username=self.simple_username, password=self.simple_password)
+ return client
-class ImportTest(BaseImportTest):
+ def import_login(self):
+ client = Client()
+ client.login(username=self.import_username, password=self.import_password)
+ return client
+
+ def set_global_permission(self, imports, codenames):
+ self.import_group.permissions.clear()
+ for imprt in imports:
+ imprt.importer_type.users.clear()
+ if not isinstance(codenames, (list, tuple)):
+ codenames = (codenames,)
+ for codename in codenames:
+ self.import_group.permissions.add(Permission.objects.get(codename=codename))
+
+ def remove_global_permission(self, imports, codenames):
+ self.import_group.permissions.clear()
+ for imprt in imports:
+ imprt.importer_type.users.clear()
+ if not isinstance(codenames, (list, tuple)):
+ codenames = (codenames,)
+ for codename in codenames:
+ perm = Permission.objects.get(codename=codename)
+ if perm in list(self.import_group.permissions.all()):
+ self.import_group.permissions.remove()
+
+ def set_own_permission(self, imports, codenames):
+ self.set_global_permission(imports, codenames)
+ user = models.IshtarUser.objects.get(user_ptr=self.import_user)
+ for imprt in imports:
+ imprt.importer_type.users.add(user)
+
+ def test_list_import(self):
+ imprt = self.create_import()
+ imprt2 = self.create_import(name="My-import-2", need_user=False)
+ # no login and simple user login
+ for client in (Client(), self.simple_login()):
+ response = client.get(reverse("current_imports"))
+ self.assertRedirects(response, '/')
+
+ # superuser
+ client = self.superuser_login()
+ response = client.get(reverse("current_imports"))
+ self.assertEqual(response.status_code, 200)
+ content = response.content.decode()
+ self.assertIn(imprt.name, content)
+ self.assertIn(imprt2.name, content)
- def test_edit_import(self):
- username, password, user = create_superuser()
+ # import user
+ client = self.import_login()
+ response = client.get(reverse("current_imports"))
+ self.assertRedirects(response, '/')
+
+ self.set_global_permission([imprt, imprt2], "change_import")
+ client = self.import_login()
+ response = client.get(reverse("current_imports"))
+ self.assertEqual(response.status_code, 200)
+ content = response.content.decode()
+ self.assertIn(imprt.name, content)
+ self.assertIn(imprt2.name, content)
+ self.remove_global_permission([imprt, imprt2], "change_import")
+
+ self.set_own_permission([imprt], "change_own_import")
+ client = self.import_login()
+ response = client.get(reverse("current_imports"))
+ self.assertEqual(response.status_code, 200)
+ content = response.content.decode()
+ self.assertIn(imprt.name, content)
+ self.assertNotIn(imprt2.name, content,
+ msg="Import 2 unexpectedly found in import list")
+
+ def test_import_action_permission(self):
imprt = self.create_import()
- c = Client()
- c.login(username=username, password=password)
+ imprt2 = self.create_import(name="My-import-2", need_user=False)
+ delete_tag = "<option value='D'>"
- response = c.get(reverse("edit_import", kwargs={"pk": imprt.pk}))
+ # superuser
+ client = self.superuser_login()
+ response = client.get(reverse("current_imports"))
self.assertEqual(response.status_code, 200)
- self.assertContains(response, imprt.importer_type)
- self.assertContains(response, imprt.imported_file)
- self.assertContains(response, imprt.imported_images)
- self.assertContains(response, imprt.encoding)
- self.assertContains(response, imprt.csv_sep)
+ content = response.content.decode()
+ self.assertIn(f"import-action-{imprt.pk}", content)
+ self.assertIn(f"import-action-{imprt2.pk}", content)
+ self.assertIn(delete_tag, content)
+
+ # import user
+ client = self.import_login()
+ response = client.get(reverse("current_imports"))
+ self.assertRedirects(response, '/')
+
+ self.set_global_permission([imprt, imprt2], "change_import")
+ client = self.import_login()
+ response = client.get(reverse("current_imports"))
+ self.assertEqual(response.status_code, 200)
+ content = response.content.decode()
+ self.assertIn(f"import-action-{imprt.pk}", content)
+ self.assertIn(f"import-action-{imprt2.pk}", content)
+ self.assertNotIn(delete_tag, content)
+ self.remove_global_permission([imprt, imprt2], ("change_import",))
+
+ self.set_global_permission([imprt, imprt2], ("change_import", "delete_import"))
+ client = self.import_login()
+ response = client.get(reverse("current_imports"))
+ self.assertEqual(response.status_code, 200)
+ content = response.content.decode()
+ self.assertIn(delete_tag, content)
+ self.remove_global_permission([imprt, imprt2], ("change_import", "delete_import"))
- imp_model = self.create_importer_model()
- importer_type = self.create_importer_type(imp_model)
+ self.set_own_permission([imprt], "change_own_import")
+ client = self.import_login()
+ response = client.get(reverse("current_imports"))
+ self.assertEqual(response.status_code, 200)
+ content = response.content.decode()
+ self.assertIn(f"import-action-{imprt.pk}", content)
+ self.assertNotIn(f"import-action-{imprt2.pk}", content,
+ msg="Import 2 unexpectedly found in import list")
+ self.assertNotIn(delete_tag, content)
+ self.set_own_permission([imprt], ("change_own_import", "change_own_import"))
+ def _test_create_import_get_data(self):
+ csv_path = os.path.join(LIB_BASE_PATH, "ishtar_common", "tests", "insee-test.csv")
+ return {
+ "name": "Test Name",
+ "importer_type": self.importer_type.pk,
+ "imported_file": SimpleUploadedFile(
+ name="insee-test.csv",
+ content=open(csv_path, "rb").read(),
+ ),
+ "encoding": "utf-8",
+ "csv_sep": '|',
+ "skip_lines": 1,
+ }
+
+ def test_create_import(self):
+ # init
+ nb_imports = models.Import.objects.count()
+
+ # no login and simple user login
+ for client in (Client(), self.simple_login()):
+ response = client.get(reverse("new_import"))
+ self.assertRedirects(response, '/')
+ data = self._test_create_import_get_data()
+ response = client.post(reverse("new_import"), data)
+ self.assertRedirects(response, '/')
+ self.assertEqual(nb_imports, models.Import.objects.count())
+
+ # superuser
+ client = self.superuser_login()
+ response = client.get(reverse("new_import"))
+ self.assertEqual(response.status_code, 200)
+ data = self._test_create_import_get_data()
+ response = client.post(reverse("new_import"), data)
+ self.assertEqual(nb_imports + 1, models.Import.objects.count())
+ self.assertRedirects(response, '/import-list/')
+
+ imprt = models.Import.objects.get(name="Test Name")
+ response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk}))
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(imprt.name, "Test Name")
+ self.assertEqual(imprt.importer_type, self.importer_type)
+ self.assertEqual(imprt.encoding, "utf-8")
+ self.assertEqual(imprt.csv_sep, '|')
+
+ # import user
+ for imp in models.Import.objects.all():
+ imp.delete()
+ nb_imports = 0
+
+ client = self.import_login()
+ response = client.get(reverse("new_import"))
+ self.assertRedirects(response, '/')
+
+ self.set_global_permission([], ("add_import", "change_import"))
+ client = self.import_login()
+ response = client.get(reverse("new_import"))
+ self.assertEqual(response.status_code, 200)
+ data = self._test_create_import_get_data()
+ response = client.post(reverse("new_import"), data)
+ self.assertEqual(nb_imports + 1, models.Import.objects.count())
+ self.assertRedirects(response, '/import-list/')
+
+ def test_edit_import(self):
+ # init
+ imprt = self.create_import(name="My import")
+ imprt2 = self.create_import(name="My-import-2", need_user=False)
data = {
"name": "Test Name",
- "importer_type": importer_type.pk,
+ "importer_type": self.importer_type.pk,
"encoding": "utf-8",
"csv_sep": '|',
"skip_lines": 32,
}
- response = c.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data)
- self.assertEqual(response.status_code, 302)
+ # no login and simple user login
+ for client in (Client(), self.simple_login()):
+ response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk}))
+ self.assertRedirects(response, '/')
+ response = client.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data)
+ self.assertRedirects(response, '/')
- response = c.get(reverse("edit_import", kwargs={"pk": imprt.pk}))
+ client = self.superuser_login()
+ response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk}))
self.assertEqual(response.status_code, 200)
- self.assertContains(response, "Test Name")
- self.assertContains(response, str(importer_type))
- self.assertContains(response, "utf-8")
+ self.assertContains(response, imprt.name)
+ self.assertContains(response, imprt.importer_type)
+ self.assertContains(response, imprt.imported_file)
+ self.assertContains(response, imprt.imported_images)
+ self.assertContains(response, imprt.encoding)
+ self.assertContains(response, imprt.csv_sep)
+ response = client.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data)
+ self.assertRedirects(response, '/import-list/')
imprt = models.Import.objects.get(pk=imprt.pk)
self.assertEqual(imprt.name, "Test Name")
- self.assertEqual(imprt.importer_type, importer_type)
+ self.assertEqual(imprt.importer_type, self.importer_type)
self.assertEqual(imprt.encoding, "utf-8")
self.assertEqual(imprt.csv_sep, "|")
self.assertEqual(imprt.skip_lines, 32)
- def test_create_import(self):
- username, password, user = create_superuser()
- c = Client()
- c.login(username=username, password=password)
+ response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk}))
+ self.assertEqual(response.status_code, 200)
+ self.assertContains(response, "Test Name")
+ self.assertContains(response, str(self.importer_type))
+ self.assertContains(response, "utf-8")
- imp_model = self.create_importer_model()
- importer_type = self.create_importer_type(imp_model)
- csv_path = os.path.join(LIB_BASE_PATH, "ishtar_common", "tests", "insee-test.csv")
+ # import user
+ imprt.delete()
+ imprt2.delete()
+ imprt = self.create_import(name="My import")
+ imprt2 = self.create_import(name="My-import-2", need_user=False)
+
+ client = self.import_login()
+ response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk}))
+ self.assertRedirects(response, '/')
+
+ # import user global permission
+ self.set_global_permission([imprt, imprt2], "change_import")
+ client = self.import_login()
+ response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk}))
+ self.assertEqual(response.status_code, 200)
+ self.assertContains(response, imprt.name)
- data = {
- "name": "Test Name",
- "importer_type": importer_type.pk,
- "imported_file": SimpleUploadedFile(
- name="insee-test.csv",
- content=open(csv_path, "rb").read(),
- ),
- "encoding": "utf-8",
- "csv_sep": '|',
- "skip_lines": 1,
- }
+ response = client.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data)
+ self.assertRedirects(response, '/import-list/')
+ imprt = models.Import.objects.get(pk=imprt.pk)
+ self.assertEqual(imprt.name, "Test Name")
- response = c.post(reverse("new_import"), data)
- self.assertEqual(response.status_code, 302)
+ self.remove_global_permission([imprt, imprt2], ("change_import",))
- imprt = models.Import.objects.get(name="Test Name")
- response = c.get(reverse("edit_import", kwargs={"pk": imprt.pk}))
+ # import user own permission
+ imprt.delete()
+ imprt2.delete()
+ imprt = self.create_import(name="My import")
+ imprt2 = self.create_import(name="My-import-2", need_user=False)
+ self.set_own_permission([imprt], "change_own_import")
+ client = self.import_login()
+ response = client.get(reverse("edit_import", kwargs={"pk": imprt.pk}))
self.assertEqual(response.status_code, 200)
+ self.assertContains(response, imprt.name)
+
+ response = client.post(reverse("edit_import", kwargs={"pk": imprt.pk}), data)
+ self.assertRedirects(response, '/import-list/')
+ imprt = models.Import.objects.get(pk=imprt.pk)
self.assertEqual(imprt.name, "Test Name")
- self.assertEqual(imprt.importer_type, importer_type)
- self.assertEqual(imprt.encoding, "utf-8")
- self.assertEqual(imprt.csv_sep, '|')
- def test_validation_zip_import_image(self):
- username, password, user = create_superuser()
- c = Client()
- c.login(username=username, password=password)
+ response = client.get(reverse("edit_import", kwargs={"pk": imprt2.pk}))
+ self.assertRedirects(response, '/')
- imp_model = self.create_importer_model()
- importer_type = self.create_importer_type(imp_model)
+ def test_validation_zip_import_image(self):
+ # init
image_path = os.path.join(LIB_BASE_PATH, "ishtar_common", "tests", "test.png")
data = {
"name": "Import Zip Not Valid Must Fail",
- "importer_type": importer_type.pk,
+ "importer_type": self.importer_type.pk,
"encoding": "utf-8",
"csv_sep": "|",
"skip_lines": 1,
@@ -2668,13 +2876,130 @@ class ImportTest(BaseImportTest):
content_type="image/png",
),
}
- response = c.post(reverse("new_import"), data)
+
+ # superuser
+ client = self.superuser_login()
+ response = client.post(reverse("new_import"), data)
expected = str(
_('"Associated images" field must be a valid zip file.')
).replace('"', '&quot;')
self.assertIn(expected, response.content.decode("utf-8"))
self.assertEqual(response.status_code, 200)
+ def test_display_csv(self):
+ # init
+ imprt = self.create_import()
+ imprt2 = self.create_import(name="My-import-2", need_user=False)
+ url = reverse("import_display_csv", args=["source", "", imprt.pk])
+ url2 = reverse("import_display_csv", args=["source", "", imprt2.pk])
+
+ # no login
+ client = Client()
+ response = client.get(url)
+ self.assertRedirects(response, "/")
+
+ # superuser
+ client = self.superuser_login()
+ response = client.get(url)
+ self.assertEqual(response.status_code, 200)
+ response = client.get(url2)
+ self.assertEqual(response.status_code, 200)
+
+ # simple login
+ client = self.simple_login()
+ response = client.get(url)
+ self.assertRedirects(response, "/")
+
+ # import user global permission
+ self.set_global_permission([imprt, imprt2], "view_import")
+ client = self.import_login()
+ response = client.get(url)
+ self.assertEqual(response.status_code, 200)
+ response = client.get(url2)
+ self.assertEqual(response.status_code, 200)
+ self.remove_global_permission([imprt, imprt2], ("view_import",))
+
+ # import user own permission
+ self.set_own_permission([imprt], "view_own_import")
+ client = self.import_login()
+ response = client.get(url)
+ self.assertEqual(response.status_code, 200)
+ response = client.get(url2)
+ self.assertRedirects(response, "/")
+
+ def test_ignore_errors(self):
+ # init
+ imprt = self.create_import()
+ imprt2 = self.create_import(name="My-import-2", need_user=False)
+
+ path = os.path.join(
+ LIB_BASE_PATH, "ishtar_common", "tests", "error-file.csv"
+ )
+ imprt.error_file = SimpleUploadedFile(name="error-file.csv", content=open(path, "rb").read(), content_type="text/csv")
+ imprt.save()
+ imprt2.error_file = SimpleUploadedFile(name="error-file.csv", content=open(path, "rb").read(), content_type="text/csv")
+ imprt2.save()
+
+ q = models.ImportLineError.objects.filter(import_item=imprt.pk)
+ self.assertEqual(q.count(), 2)
+ ignored_line = q.all()[0]
+ url = reverse("import_ignore_line", args=[ignored_line.pk])
+ q = models.ImportLineError.objects.filter(import_item=imprt2.pk)
+ self.assertEqual(q.count(), 2)
+ ignored_line2 = q.all()[0]
+ url2 = reverse("import_ignore_line", args=[ignored_line2.pk])
+
+ # no login
+ client = Client()
+ response = client.get(url)
+ self.assertRedirects(response, "/")
+
+ # superuser
+ client = self.superuser_login()
+ response = client.get(url)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line.pk).ignored, True)
+ ignored_line.ignored = False
+ ignored_line.save()
+ response = client.get(url2)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line2.pk).ignored, True)
+ ignored_line2.ignored = False
+ ignored_line2.save()
+
+ # simple login
+ client = self.simple_login()
+ response = client.get(url)
+ self.assertRedirects(response, "/")
+
+ # import user global permission
+ self.set_global_permission([imprt, imprt2], "change_import")
+ client = self.import_login()
+ response = client.get(url)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line.pk).ignored, True)
+ ignored_line.ignored = False
+ ignored_line.save()
+ response = client.get(url2)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line2.pk).ignored, True)
+ ignored_line2.ignored = False
+ ignored_line2.save()
+ self.remove_global_permission([imprt, imprt2], ("change_import",))
+
+ # import user own permission
+ self.set_own_permission([imprt], "change_own_import")
+ client = self.import_login()
+ response = client.get(url)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line.pk).ignored, True)
+ self.assertEqual(response.status_code, 200)
+ response = client.get(url2)
+ self.assertEqual(response.status_code, 404)
+ self.assertEqual(models.ImportLineError.objects.get(pk=ignored_line2.pk).ignored, False)
+
+
+class ImportTest(BaseImportTest):
def test_archive_import(self):
imprt = self.create_import()
with open(imprt.imported_file.path, "r") as f:
@@ -2796,19 +3121,6 @@ class ImportTest(BaseImportTest):
field = getattr(sub_import, k)
self.assertTrue(field, "{} is missing in unarchive".format(k))
- def test_display_csv(self):
- imprt = self.create_import()
- username, password, __ = create_user("test", "test")
- c = Client()
- c.login(username=username, password=password)
- url = "import_display_csv"
- response = c.get(reverse(url, args=["source", "", imprt.pk]), )
- self.assertEqual(response.status_code, 404)
- username, password, __ = create_superuser()
- c.login(username=username, password=password)
- response = c.get(reverse(url, args=["source", "", imprt.pk]))
- self.assertEqual(response.status_code, 200)
-
def test_delete_related(self):
town = models.Town.objects.create(name="my-test")
self.assertEqual(models.Town.objects.filter(name="my-test").count(), 1)
diff --git a/ishtar_common/tests/error-file.csv b/ishtar_common/tests/error-file.csv
new file mode 100644
index 000000000..007d2d3c2
--- /dev/null
+++ b/ishtar_common/tests/error-file.csv
@@ -0,0 +1,3 @@
+"ligne","colonne","erreur"
+"2","1","Valeur requise"
+"3","1","Valeur requise"
diff --git a/ishtar_common/urls.py b/ishtar_common/urls.py
index 52e1324b2..c3d614ac6 100644
--- a/ishtar_common/urls.py
+++ b/ishtar_common/urls.py
@@ -210,62 +210,62 @@ urlpatterns = [
),
url(
r"^import-new/$",
- check_rights(["change_import"])(views.NewImportView.as_view()),
+ check_rights(["add_import", "add_own_import"])(views.NewImportView.as_view()),
name="new_import",
),
url(
r"^import-edit/(?P<pk>[0-9]+)/$",
- check_rights(["change_import"])(views.EditImportView.as_view()),
+ check_rights(["change_import", "change_own_import"])(views.EditImportView.as_view()),
name="edit_import",
),
url(
r"^import-new-gis/$",
- check_rights(["change_import"])(views.NewImportGISView.as_view()),
+ check_rights(["add_import", "add_own_import"])(views.NewImportGISView.as_view()),
name="new_import_gis",
),
url(
r"^import-new-group/$",
- check_rights(["change_import"])(views.NewImportGroupView.as_view()),
+ check_rights(["add_import", "add_own_import"])(views.NewImportGroupView.as_view()),
name="new_import_group",
),
url(
r"^import-list/$",
- check_rights(["change_import"])(views.ImportListView.as_view()),
+ check_rights(["change_import", "change_own_import"])(views.ImportListView.as_view()),
name="current_imports",
),
url(
r"^import-list-table/$",
- check_rights(["change_import"])(views.ImportListTableView.as_view()),
+ check_rights(["change_import", "change_own_import"])(views.ImportListTableView.as_view()),
name="current_imports_table",
),
url(
r"^import-get-status/$",
- check_rights(["change_import"])(views.import_get_status),
+ check_rights(["change_import", "change_own_import"])(views.import_get_status),
name="import_get_status",
),
url(
r"^import-list-old/$",
- check_rights(["change_import"])(views.ImportOldListView.as_view()),
+ check_rights(["change_import", "change_own_import"])(views.ImportOldListView.as_view()),
name="old_imports",
),
url(
r"^import-delete/(?P<pk>[0-9]+)/$",
- views.ImportDeleteView.as_view(),
+ check_rights(["delete_import", "delete_own_import"])(views.ImportDeleteView.as_view()),
name="import_delete",
),
url(
r"^import-group-delete/(?P<pk>[0-9]+)/$",
- views.ImportGroupDeleteView.as_view(),
+ check_rights(["delete_import", "delete_own_import"])(views.ImportGroupDeleteView.as_view()),
name="import_group_delete",
),
url(
r"^import-link-unmatched/(?P<pk>[0-9]+)/$",
- views.ImportLinkView.as_view(),
+ check_rights(["change_import", "change_own_import"])(views.ImportMatchView.as_view()),
name="import_link_unmatched",
),
url(
r"^import-csv-view/(?P<target>source|result|match|error)/(?P<group>group\-)?(?P<pk>[0-9]+)/$",
- views.ImportCSVView.as_view(),
+ check_rights(["view_import", "view_own_import"])(views.ImportCSVView.as_view()),
name="import_display_csv",
),
url(
@@ -281,12 +281,12 @@ urlpatterns = [
),
url(
r"^import-pre-form/(?P<import_id>[0-9]+)/$",
- check_rights(["change_import"])(views.ImportPreFormView.as_view()),
+ check_rights(["change_import", "change_own_import"])(views.ImportPreFormView.as_view()),
name="import_pre_import_form",
),
url(
r"^import-ignore-line/(?P<line_id>[0-9]+)/$",
- views.line_error,
+ check_rights(["change_import", "change_own_import"])(views.line_error),
name="import_ignore_line",
),
url(r"^profile(?:/(?P<pk>[0-9]+))?/$", views.ProfileEdit.as_view(), name="profile"),
diff --git a/ishtar_common/views.py b/ishtar_common/views.py
index b91f3202c..cd22d62eb 100644
--- a/ishtar_common/views.py
+++ b/ishtar_common/views.py
@@ -1490,7 +1490,28 @@ class NewImportView(BaseImportView, CreateView):
page_name = _("Import: create (table)")
-class EditImportView(BaseImportView, UpdateView):
+class ImportPermissionMixin:
+ permission_full = "change_import"
+ permission_own = "change_own_import"
+
+ def dispatch(self, request, *args, **kwargs):
+ import_pk = self.kwargs["pk"]
+ user = request.user
+ if not user or not user.ishtaruser:
+ return redirect("/")
+ model = models.ImportGroup if self.kwargs.get("group", None) else models.Import
+ q = model.query_can_access(user, perm=self.permission_full).filter(pk=import_pk)
+ if not user.is_superuser and not user.ishtaruser.has_right(self.permission_full):
+ if not user.ishtaruser.has_right(self.permission_own):
+ return redirect("/")
+ q = q.filter(Q(importer_type__users__pk=user.ishtaruser.pk))
+ if not q.count():
+ return redirect("/")
+ returned = super().dispatch(request, *args, **kwargs)
+ return returned
+
+
+class EditImportView(ImportPermissionMixin, BaseImportView, UpdateView):
page_name = _("Import: edit (table)")
@@ -1544,6 +1565,21 @@ class ImportPreFormView(IshtarMixin, LoginRequiredMixin, FormView):
return HttpResponseRedirect(self.get_success_url())
+def get_permissions_for_actions(user, imprt, owns, can_edit_all, can_delete_all, can_edit_own, can_delete_own):
+ can_edit, can_delete = False, False
+ is_own = None
+ if can_edit_own or can_delete_own: # need to check owner
+ if imprt.importer_type_id not in owns:
+ # "is_own" only query once by importer type
+ owns[imprt.importer_type.pk] = imprt.importer_type.is_own(user.ishtaruser)
+ is_own = owns[imprt.importer_type_id]
+ if can_edit_all or (can_edit_own and is_own):
+ can_edit = True
+ if can_delete_all or (can_delete_own and is_own):
+ can_delete = True
+ return can_edit, can_delete
+
+
class ImportListView(IshtarMixin, LoginRequiredMixin, ListView):
template_name = "ishtar/import_list.html"
model = models.Import
@@ -1555,15 +1591,31 @@ class ImportListView(IshtarMixin, LoginRequiredMixin, ListView):
def get_queryset(self):
user = self.request.user
- if not user.pk:
+ if not user.pk or not user.ishtaruser:
raise Http404()
- q1 = self._queryset_filter(self.model.query_can_access(user))
+ q1 = self._queryset_filter(self.model.query_can_access(user, "change_import"))
q1 = q1.filter(group__isnull=True).order_by("-end_date", "-creation_date", "-pk")
- q2 = self._queryset_filter(models.ImportGroup.query_can_access(user))
+ q2 = self._queryset_filter(models.ImportGroup.query_can_access(user, "change_import"))
q2 = q2.order_by("-end_date", "-creation_date", "-pk")
- return list(reversed(sorted(list(q1) + list(q2), key=lambda x: (x.end_date or x.creation_date))))
+ values = list(reversed(sorted(list(q1) + list(q2), key=lambda x: (x.end_date or x.creation_date))))
+ can_edit_all, can_delete_all, can_edit_own, can_delete_own = models.Import.get_permissions_for_actions(
+ user, self.request.session
+ )
+ imports = []
+ owns = {}
+ for imprt in values:
+ can_edit, can_delete = get_permissions_for_actions(
+ user, imprt, owns, can_edit_all, can_delete_all, can_edit_own, can_delete_own
+ )
+ imprt.action_list = imprt.get_actions(can_edit=can_edit, can_delete=can_delete)
+ imports.append(imprt)
+ return imports
def post(self, request, *args, **kwargs):
+ can_edit_all, can_delete_all, can_edit_own, can_delete_own = models.Import.get_permissions_for_actions(
+ request.user, request.session
+ )
+ owns = {}
for field in request.POST:
if not field.startswith("import-action-") or not request.POST[field]:
continue
@@ -1576,28 +1628,26 @@ class ImportListView(IshtarMixin, LoginRequiredMixin, ListView):
imprt = model.objects.get(pk=int(field.split("-")[-1]))
except (models.Import.DoesNotExist, ValueError):
continue
- if not self.request.user.is_superuser:
- # user can only edit his own imports
- user = models.IshtarUser.objects.get(pk=self.request.user.pk)
- if imprt.user != user:
- continue
+ can_edit, can_delete = get_permissions_for_actions(
+ request.user, imprt, owns, can_edit_all, can_delete_all, can_edit_own, can_delete_own
+ )
action = request.POST[field]
- if action == "D":
+ if can_delete and action == "D":
url = "import_group_delete" if is_group else "import_delete"
return HttpResponseRedirect(
reverse(url, kwargs={"pk": imprt.pk})
)
- elif action == "ED":
+ elif can_edit and action == "ED":
url = "edit_import_group" if is_group else "edit_import"
return HttpResponseRedirect(
reverse(url, kwargs={"pk": imprt.pk})
)
- elif action == "A":
+ elif can_edit and action == "A":
imprt.initialize(
user=self.request.user.ishtaruser,
session_key=request.session.session_key,
)
- elif action == "I":
+ elif can_edit and action == "I":
if settings.USE_BACKGROUND_TASK:
imprt.delayed_importation(request, request.session.session_key)
else:
@@ -1612,12 +1662,12 @@ class ImportListView(IshtarMixin, LoginRequiredMixin, ListView):
f"{imprt} - {e}",
"warning",
)
- elif action == "CH":
+ elif can_edit and action == "CH":
if settings.USE_BACKGROUND_TASK:
imprt.delayed_check_modified(request.session.session_key)
else:
imprt.check_modified()
- elif action == "IS":
+ elif can_edit and action == "IS":
if imprt.current_line is None:
imprt.current_line = imprt.skip_lines
imprt.save()
@@ -1626,19 +1676,37 @@ class ImportListView(IshtarMixin, LoginRequiredMixin, ListView):
"import_step_by_step", args=[imprt.pk, imprt.current_line + 1]
)
)
- elif action == "AC":
+ elif can_edit and action == "AC":
imprt.archive()
- elif action in ("F", "FE"):
+ elif can_edit and action in ("F", "FE"):
imprt.unarchive(action)
return HttpResponseRedirect(reverse(self.current_url))
def get_context_data(self, **kwargs):
dct = super().get_context_data(**kwargs)
+ add_import_perm = self.request.user.ishtaruser.has_right("add_import", session=self.request.session)
+ import_type_table = models.ImporterType.objects.filter(available=True, is_import=True, type='tab')
+ import_type_gis = models.ImporterType.objects.filter(available=True, is_import=True, type='gis')
+ import_type_group = models.ImporterGroup.objects.filter(available=True)
+ if not add_import_perm and self.request.user.ishtaruser.has_right("add_own_import",
+ session=self.request.session):
+ import_type_table = import_type_table.filter(users__pk=self.request.user.ishtaruser.pk)
+ import_type_gis = import_type_gis.filter(users__pk=self.request.user.ishtaruser.pk)
+ import_type_group = import_type_group.filter(users__pk=self.request.user.ishtaruser.pk)
+ add_import_perm = True
+ has_import_table, has_import_gis, has_import_group = False, False, False
+ if add_import_perm:
+ if import_type_table.count():
+ has_import_table = True
+ if import_type_gis.count():
+ has_import_gis = True
+ if import_type_group.count():
+ has_import_group = True
dct.update({
- "autorefresh_available": settings.USE_BACKGROUND_TASK,
- "has_import_table": models.ImporterType.objects.filter(available=True, is_import=True, type='tab').count(),
- "has_import_gis": models.ImporterType.objects.filter(available=True, is_import=True, type='gis').count(),
- "has_import_group": models.ImporterGroup.objects.filter(available=True).count(),
+ "has_import_table": has_import_table,
+ "has_import_gis": has_import_gis,
+ "has_import_group": has_import_group,
+ "can_create_import": has_import_table or has_import_gis or has_import_group,
})
return dct
@@ -2083,14 +2151,23 @@ def import_get_status(request, current_right=None):
"number_of_line": item.number_of_line,
"progress_percent": item.progress_percent,
})
- item_dct["actions"] = [(key, str(lbl)) for key, lbl in item.get_actions()]
+ can_edit_all, can_delete_all, can_edit_own, can_delete_own = models.Import.get_permissions_for_actions(
+ request.user
+ )
+ can_edit, can_delete = get_permissions_for_actions(
+ request.user, item, {}, can_edit_all, can_delete_all, can_edit_own, can_delete_own
+ )
+ item_dct["actions"] = [
+ (key, str(lbl))
+ for key, lbl in item.get_actions(can_edit=can_edit, can_delete=can_delete)
+ ]
response[key].append(item_dct)
data = json.dumps(response)
return HttpResponse(data, content_type="application/json")
-class ImportLinkView(IshtarMixin, LoginRequiredMixin, ModelFormSetView):
+class ImportMatchView(ImportPermissionMixin, IshtarMixin, LoginRequiredMixin, ModelFormSetView):
template_name = "ishtar/formset_import_match.html"
model = models.TargetKey
page_name = _("Link unmatched items")
@@ -2100,9 +2177,11 @@ class ImportLinkView(IshtarMixin, LoginRequiredMixin, ModelFormSetView):
form_class = forms.TargetKeyForm
formset_class = forms.TargetKeyFormset
max_fields = 250
+ permission_full = "change_import"
+ permission_own = "change_own_import"
def get_formset_kwargs(self):
- kwargs = super(ImportLinkView, self).get_formset_kwargs()
+ kwargs = super().get_formset_kwargs()
kwargs["user"] = self.request.user
return kwargs
@@ -2126,25 +2205,29 @@ class ImportLinkView(IshtarMixin, LoginRequiredMixin, ModelFormSetView):
return reverse("import_link_unmatched", args=[self.kwargs["pk"]])
-class ImportDeleteView(IshtarMixin, LoginRequiredMixin, DeleteView):
+class ImportDeleteView(ImportPermissionMixin, IshtarMixin, LoginRequiredMixin, DeleteView):
template_name = "ishtar/import_delete.html"
model = models.Import
page_name = _("Delete import")
+ permission_full = "delete_import"
+ permission_own = "delete_own_import"
def get_success_url(self):
return reverse("current_imports")
-class ImportGroupDeleteView(IshtarMixin, LoginRequiredMixin, DeleteView):
+class ImportGroupDeleteView(ImportPermissionMixin, IshtarMixin, LoginRequiredMixin, DeleteView):
template_name = "ishtar/import_delete.html"
model = models.ImportGroup
page_name = _("Delete import")
+ permission_full = "delete_import"
+ permission_own = "delete_own_import"
def get_success_url(self):
return reverse("current_imports")
-class ImportCSVView(IshtarMixin, LoginRequiredMixin, TemplateView):
+class ImportCSVView(ImportPermissionMixin, IshtarMixin, LoginRequiredMixin, TemplateView):
template_name = "ishtar/blocks/view_import_csv.html"
ATTRIBUTES = {
"source": "get_imported_values",
@@ -2158,13 +2241,15 @@ class ImportCSVView(IshtarMixin, LoginRequiredMixin, TemplateView):
"result": ("fa fa-th", _("Result")) ,
"match": ("fa fa-arrows-h", _("Match")),
}
+ permission_full = "view_import"
+ permission_own = "view_own_import"
def get(self, request, *args, **kwargs):
user = self.request.user
if not user.pk:
raise Http404()
model = models.ImportGroup if kwargs.get("group", None) else models.Import
- q = model.query_can_access(self.request.user).filter(pk=kwargs.get("pk", -1))
+ q = model.query_can_access(self.request.user, perm=self.permission_full).filter(pk=kwargs.get("pk", -1))
if not q.count():
raise Http404()
self.import_item = q.all()[0]
@@ -2208,7 +2293,7 @@ class ImportCSVView(IshtarMixin, LoginRequiredMixin, TemplateView):
return data
-def line_error(request, line_id):
+def line_error(request, line_id, current_right=None):
"""
Set or unset ignored state of a csv error file
"""
@@ -2219,7 +2304,7 @@ def line_error(request, line_id):
if not q.count():
return
line = q.all()[0]
- q = models.Import.query_can_access(request.user).filter(pk=line.import_item_id)
+ q = models.Import.query_can_access(request.user, perm="change_import").filter(pk=line.import_item_id)
if not q.count():
raise Http404()
line.ignored = not line.ignored