diff options
author | Étienne Loks <etienne.loks@iggdrasil.net> | 2023-10-02 16:39:42 +0200 |
---|---|---|
committer | Étienne Loks <etienne.loks@iggdrasil.net> | 2024-02-05 10:51:52 +0100 |
commit | 443e6592b9a6ea77feeccc9a9cdd7d3e81bd45aa (patch) | |
tree | fdd51a899d97fc5526d5eb99af05ab45f6249c6d /ishtar_common/utils.py | |
parent | be35d0d9508839e9a0966bae94bf702fc9e32a67 (diff) | |
download | Ishtar-443e6592b9a6ea77feeccc9a9cdd7d3e81bd45aa.tar.bz2 Ishtar-443e6592b9a6ea77feeccc9a9cdd7d3e81bd45aa.zip |
🗃️ DB changes to manage user permissions
Diffstat (limited to 'ishtar_common/utils.py')
-rw-r--r-- | ishtar_common/utils.py | 227 |
1 files changed, 225 insertions, 2 deletions
diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py index 9198e8d2c..0f4e7ecd6 100644 --- a/ishtar_common/utils.py +++ b/ishtar_common/utils.py @@ -50,7 +50,7 @@ from django import forms from django.apps import apps from django.conf import settings from django.conf.urls import url -from django.contrib.auth.models import Permission +from django.contrib.auth.models import Permission, User from django.contrib.auth.hashers import Argon2PasswordHasher as BaseArgon2PasswordHasher from django.contrib.contenttypes.models import ContentType from django.contrib.gis.geos import GEOSGeometry @@ -63,6 +63,7 @@ from django.core.files.storage import FileSystemStorage from django.core.validators import EMPTY_VALUES from django.urls import reverse from django.db import models +from django.db.models import Q from django.http import HttpResponseRedirect from django.utils.crypto import get_random_string from django.utils.datastructures import MultiValueDict as BaseMultiValueDict @@ -273,6 +274,228 @@ def check_model_access_control(request, model, available_perms=None): return allowed, own +class OwnPerms: + """ + Manage special permissions for object's owner + """ + + @classmethod + def get_query_owns(cls, ishtaruser): + """ + Query object to get own items + """ + return None # implement for each object + + def can_view(self, request): + if hasattr(self, "LONG_SLUG"): + perm = "view_" + self.LONG_SLUG + else: + perm = "view_" + self.SLUG + return self.can_do(request, perm) + + def can_edit(self, request): + if not getattr(request.user, "ishtaruser", None): + return False + ishtaruser = request.user.ishtaruser + slug = self.LONG_SLUG if hasattr(self, "LONG_SLUG") else self.SLUG + if ishtaruser.has_perm("change_" + slug, session=request.session): + return True + if not ishtaruser.has_perm("change_own_" + slug, session=request.session): + return False + return self.is_own(ishtaruser) + + def can_do(self, request, action_name): + """ + Check permission availability for the current object. + :param request: request object + :param action_name: action name eg: "change_find" - "own" variation is + checked + :return: boolean + """ + if not getattr(request.user, "ishtaruser", None): + return False + splited = action_name.split("_") + action_own_name = splited[0] + "_own_" + "_".join(splited[1:]) + user = request.user + if action_name == "view_findbasket": + action_own_name = "view_own_find" + action_name = "view_find" + return user.ishtaruser.has_right(action_name, request.session) or ( + user.ishtaruser.has_right(action_own_name, request.session) + and self.is_own(user.ishtaruser) + ) + + def is_own(self, user, alt_query_own=None): + """ + Check if the current object is owned by the user + """ + IshtarUser = apps.get_model("ishtar_common", "IshtarUser") + if isinstance(user, IshtarUser): + ishtaruser = user + elif hasattr(user, "ishtaruser"): + ishtaruser = user.ishtaruser + else: + return False + if not alt_query_own: + query = self.get_query_owns(ishtaruser) + else: + query = getattr(self, alt_query_own)(ishtaruser) + if not query: + return False + query &= Q(pk=self.pk) + return self.__class__.objects.filter(query).count() + + @classmethod + def has_item_of(cls, user): + """ + Check if the user own some items + """ + IshtarUser = apps.get_model("ishtar_common", "IshtarUser") + if isinstance(user, IshtarUser): + ishtaruser = user + elif hasattr(user, "ishtaruser"): + ishtaruser = user.ishtaruser + else: + return False + query = cls.get_query_owns(ishtaruser) + if not query: + return False + return cls.objects.filter(query).count() + + @classmethod + def _return_get_owns( + cls, owns, values, get_short_menu_class, label_key="cached_label" + ): + if not owns: + return [] + sorted_values = [] + if hasattr(cls, "BASKET_MODEL"): + owns_len = len(owns) + for idx, item in enumerate(reversed(owns)): + if get_short_menu_class: + item = item[0] + if type(item) == cls.BASKET_MODEL: + basket = owns.pop(owns_len - idx - 1) + sorted_values.append(basket) + sorted_values = list(reversed(sorted_values)) + if not values: + if not get_short_menu_class: + return sorted_values + list( + sorted(owns, key=lambda x: getattr(x, label_key) or "") + ) + return sorted_values + list( + sorted(owns, key=lambda x: getattr(x[0], label_key) or "") + ) + if not get_short_menu_class: + return sorted_values + list(sorted(owns, key=lambda x: x[label_key] or "")) + return sorted_values + list(sorted(owns, key=lambda x: x[0][label_key] or "")) + + @classmethod + def get_owns( + cls, + user, + replace_query=None, + limit=None, + values=None, + get_short_menu_class=False, + menu_filtr=None, + ): + """ + Get Own items + """ + if not replace_query: + replace_query = {} + if hasattr(user, "is_authenticated") and not user.is_authenticated: + returned = cls.objects.filter(pk__isnull=True) + if values: + returned = [] + return returned + IshtarUser = apps.get_model("ishtar_common", "IshtarUser") + if isinstance(user, User): + try: + ishtaruser = IshtarUser.objects.get(user_ptr=user) + except IshtarUser.DoesNotExist: + returned = cls.objects.filter(pk__isnull=True) + if values: + returned = [] + return returned + elif isinstance(user, IshtarUser): + ishtaruser = user + else: + if values: + return [] + return cls.objects.filter(pk__isnull=True) + items = [] + if hasattr(cls, "BASKET_MODEL"): + items = list(cls.BASKET_MODEL.objects.filter(user=ishtaruser).all()) + query = cls.get_query_owns(ishtaruser) + if not query and not replace_query: + returned = cls.objects.filter(pk__isnull=True) + if values: + returned = [] + return returned + if query: + q = cls.objects.filter(query) + else: # replace_query + q = cls.objects.filter(replace_query) + if values: + q = q.values(*values) + if limit: + items += list(q.order_by("-pk")[:limit]) + else: + items += list(q.order_by(*cls._meta.ordering).all()) + if get_short_menu_class: + if values: + if "id" not in values: + raise NotImplementedError( + "Call of get_owns with get_short_menu_class option and" + " no 'id' in values is not implemented" + ) + my_items = [] + for i in items: + if hasattr(cls, "BASKET_MODEL") and type(i) == cls.BASKET_MODEL: + dct = dict([(k, getattr(i, k)) for k in values]) + my_items.append( + (dct, cls.BASKET_MODEL.get_short_menu_class(i.pk)) + ) + else: + my_items.append((i, cls.get_short_menu_class(i["id"]))) + items = my_items + else: + items = [(i, cls.get_short_menu_class(i.pk)) for i in items] + return items + + @classmethod + def _get_query_owns_dicts(cls, ishtaruser): + """ + List of query own dict to construct the query. + Each dict is joined with an AND operator, each dict key, values are + joined with OR operator + """ + return [] + + @classmethod + def _construct_query_own(cls, prefix, dct_list): + q = None + for subquery_dict in dct_list: + subquery = None + for k in subquery_dict: + subsubquery = Q(**{prefix + k: subquery_dict[k]}) + if subquery: + subquery |= subsubquery + else: + subquery = subsubquery + if not subquery: + continue + if q: + q &= subquery + else: + q = subquery + return q + + + + def update_data(data, new_data, merge=False): """ Update a data directory taking account of key detail @@ -285,7 +508,7 @@ def update_data(data, new_data, merge=False): if new_data: return new_data return data - if new_data and data_2 != data: + if new_data and new_data != data: return data + " ; " + new_data return data for k in data: |