summaryrefslogtreecommitdiff
path: root/ishtar_common/utils.py
diff options
context:
space:
mode:
authorÉtienne Loks <etienne.loks@iggdrasil.net>2023-10-02 16:39:42 +0200
committerÉtienne Loks <etienne.loks@iggdrasil.net>2024-04-16 16:38:32 +0200
commita7a0b8e6cf8d67cc50eefe79a65caa93f6059169 (patch)
treed83ad46dcd595fc282038dfd4200357f3c019f6c /ishtar_common/utils.py
parentfaf34feb2bcebd3fe4fa14f21aa65b49f76c886d (diff)
downloadIshtar-a7a0b8e6cf8d67cc50eefe79a65caa93f6059169.tar.bz2
Ishtar-a7a0b8e6cf8d67cc50eefe79a65caa93f6059169.zip
🗃️ DB changes to manage user permissions
Diffstat (limited to 'ishtar_common/utils.py')
-rw-r--r--ishtar_common/utils.py227
1 files changed, 225 insertions, 2 deletions
diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py
index 4e50c506c..7b708cd76 100644
--- a/ishtar_common/utils.py
+++ b/ishtar_common/utils.py
@@ -51,7 +51,7 @@ from django import forms
from django.apps import apps
from django.conf import settings
from django.conf.urls import url
-from django.contrib.auth.models import Permission
+from django.contrib.auth.models import Permission, User
from django.contrib.auth.hashers import Argon2PasswordHasher as BaseArgon2PasswordHasher
from django.contrib.contenttypes.models import ContentType
from django.contrib.gis.geos import GEOSGeometry
@@ -64,6 +64,7 @@ from django.core.files.storage import FileSystemStorage
from django.core.validators import EMPTY_VALUES
from django.urls import reverse
from django.db import models
+from django.db.models import Q
from django.http import HttpResponseRedirect
from django.utils.crypto import get_random_string
from django.utils.datastructures import MultiValueDict as BaseMultiValueDict
@@ -274,6 +275,228 @@ def check_model_access_control(request, model, available_perms=None):
return allowed, own
+class OwnPerms:
+ """
+ Manage special permissions for object's owner
+ """
+
+ @classmethod
+ def get_query_owns(cls, ishtaruser):
+ """
+ Query object to get own items
+ """
+ return None # implement for each object
+
+ def can_view(self, request):
+ if hasattr(self, "LONG_SLUG"):
+ perm = "view_" + self.LONG_SLUG
+ else:
+ perm = "view_" + self.SLUG
+ return self.can_do(request, perm)
+
+ def can_edit(self, request):
+ if not getattr(request.user, "ishtaruser", None):
+ return False
+ ishtaruser = request.user.ishtaruser
+ slug = self.LONG_SLUG if hasattr(self, "LONG_SLUG") else self.SLUG
+ if ishtaruser.has_perm("change_" + slug, session=request.session):
+ return True
+ if not ishtaruser.has_perm("change_own_" + slug, session=request.session):
+ return False
+ return self.is_own(ishtaruser)
+
+ def can_do(self, request, action_name):
+ """
+ Check permission availability for the current object.
+ :param request: request object
+ :param action_name: action name eg: "change_find" - "own" variation is
+ checked
+ :return: boolean
+ """
+ if not getattr(request.user, "ishtaruser", None):
+ return False
+ splited = action_name.split("_")
+ action_own_name = splited[0] + "_own_" + "_".join(splited[1:])
+ user = request.user
+ if action_name == "view_findbasket":
+ action_own_name = "view_own_find"
+ action_name = "view_find"
+ return user.ishtaruser.has_right(action_name, request.session) or (
+ user.ishtaruser.has_right(action_own_name, request.session)
+ and self.is_own(user.ishtaruser)
+ )
+
+ def is_own(self, user, alt_query_own=None):
+ """
+ Check if the current object is owned by the user
+ """
+ IshtarUser = apps.get_model("ishtar_common", "IshtarUser")
+ if isinstance(user, IshtarUser):
+ ishtaruser = user
+ elif hasattr(user, "ishtaruser"):
+ ishtaruser = user.ishtaruser
+ else:
+ return False
+ if not alt_query_own:
+ query = self.get_query_owns(ishtaruser)
+ else:
+ query = getattr(self, alt_query_own)(ishtaruser)
+ if not query:
+ return False
+ query &= Q(pk=self.pk)
+ return self.__class__.objects.filter(query).count()
+
+ @classmethod
+ def has_item_of(cls, user):
+ """
+ Check if the user own some items
+ """
+ IshtarUser = apps.get_model("ishtar_common", "IshtarUser")
+ if isinstance(user, IshtarUser):
+ ishtaruser = user
+ elif hasattr(user, "ishtaruser"):
+ ishtaruser = user.ishtaruser
+ else:
+ return False
+ query = cls.get_query_owns(ishtaruser)
+ if not query:
+ return False
+ return cls.objects.filter(query).count()
+
+ @classmethod
+ def _return_get_owns(
+ cls, owns, values, get_short_menu_class, label_key="cached_label"
+ ):
+ if not owns:
+ return []
+ sorted_values = []
+ if hasattr(cls, "BASKET_MODEL"):
+ owns_len = len(owns)
+ for idx, item in enumerate(reversed(owns)):
+ if get_short_menu_class:
+ item = item[0]
+ if type(item) == cls.BASKET_MODEL:
+ basket = owns.pop(owns_len - idx - 1)
+ sorted_values.append(basket)
+ sorted_values = list(reversed(sorted_values))
+ if not values:
+ if not get_short_menu_class:
+ return sorted_values + list(
+ sorted(owns, key=lambda x: getattr(x, label_key) or "")
+ )
+ return sorted_values + list(
+ sorted(owns, key=lambda x: getattr(x[0], label_key) or "")
+ )
+ if not get_short_menu_class:
+ return sorted_values + list(sorted(owns, key=lambda x: x[label_key] or ""))
+ return sorted_values + list(sorted(owns, key=lambda x: x[0][label_key] or ""))
+
+ @classmethod
+ def get_owns(
+ cls,
+ user,
+ replace_query=None,
+ limit=None,
+ values=None,
+ get_short_menu_class=False,
+ menu_filtr=None,
+ ):
+ """
+ Get Own items
+ """
+ if not replace_query:
+ replace_query = {}
+ if hasattr(user, "is_authenticated") and not user.is_authenticated:
+ returned = cls.objects.filter(pk__isnull=True)
+ if values:
+ returned = []
+ return returned
+ IshtarUser = apps.get_model("ishtar_common", "IshtarUser")
+ if isinstance(user, User):
+ try:
+ ishtaruser = IshtarUser.objects.get(user_ptr=user)
+ except IshtarUser.DoesNotExist:
+ returned = cls.objects.filter(pk__isnull=True)
+ if values:
+ returned = []
+ return returned
+ elif isinstance(user, IshtarUser):
+ ishtaruser = user
+ else:
+ if values:
+ return []
+ return cls.objects.filter(pk__isnull=True)
+ items = []
+ if hasattr(cls, "BASKET_MODEL"):
+ items = list(cls.BASKET_MODEL.objects.filter(user=ishtaruser).all())
+ query = cls.get_query_owns(ishtaruser)
+ if not query and not replace_query:
+ returned = cls.objects.filter(pk__isnull=True)
+ if values:
+ returned = []
+ return returned
+ if query:
+ q = cls.objects.filter(query)
+ else: # replace_query
+ q = cls.objects.filter(replace_query)
+ if values:
+ q = q.values(*values)
+ if limit:
+ items += list(q.order_by("-pk")[:limit])
+ else:
+ items += list(q.order_by(*cls._meta.ordering).all())
+ if get_short_menu_class:
+ if values:
+ if "id" not in values:
+ raise NotImplementedError(
+ "Call of get_owns with get_short_menu_class option and"
+ " no 'id' in values is not implemented"
+ )
+ my_items = []
+ for i in items:
+ if hasattr(cls, "BASKET_MODEL") and type(i) == cls.BASKET_MODEL:
+ dct = dict([(k, getattr(i, k)) for k in values])
+ my_items.append(
+ (dct, cls.BASKET_MODEL.get_short_menu_class(i.pk))
+ )
+ else:
+ my_items.append((i, cls.get_short_menu_class(i["id"])))
+ items = my_items
+ else:
+ items = [(i, cls.get_short_menu_class(i.pk)) for i in items]
+ return items
+
+ @classmethod
+ def _get_query_owns_dicts(cls, ishtaruser):
+ """
+ List of query own dict to construct the query.
+ Each dict is joined with an AND operator, each dict key, values are
+ joined with OR operator
+ """
+ return []
+
+ @classmethod
+ def _construct_query_own(cls, prefix, dct_list):
+ q = None
+ for subquery_dict in dct_list:
+ subquery = None
+ for k in subquery_dict:
+ subsubquery = Q(**{prefix + k: subquery_dict[k]})
+ if subquery:
+ subquery |= subsubquery
+ else:
+ subquery = subsubquery
+ if not subquery:
+ continue
+ if q:
+ q &= subquery
+ else:
+ q = subquery
+ return q
+
+
+
+
def update_data(data, new_data, merge=False):
"""
Update a data directory taking account of key detail
@@ -286,7 +509,7 @@ def update_data(data, new_data, merge=False):
if new_data:
return new_data
return data
- if new_data and data_2 != data:
+ if new_data and new_data != data:
return data + " ; " + new_data
return data
for k in data: