diff options
-rw-r--r-- | archaeological_context_records/models.py | 10 | ||||
-rw-r--r-- | archaeological_context_records/tests.py | 272 | ||||
-rw-r--r-- | archaeological_files/models.py | 10 | ||||
-rw-r--r-- | archaeological_finds/models_finds.py | 12 | ||||
-rw-r--r-- | archaeological_finds/models_treatments.py | 5 | ||||
-rw-r--r-- | archaeological_finds/tests.py | 185 | ||||
-rw-r--r-- | archaeological_operations/models.py | 10 | ||||
-rw-r--r-- | archaeological_operations/tests.py | 288 | ||||
-rw-r--r-- | archaeological_warehouse/models.py | 6 | ||||
-rw-r--r-- | ishtar_common/admin.py | 5 | ||||
-rw-r--r-- | ishtar_common/menu_base.py | 2 | ||||
-rw-r--r-- | ishtar_common/menus.py | 50 | ||||
-rw-r--r-- | ishtar_common/models.py | 95 | ||||
-rw-r--r-- | ishtar_common/models_common.py | 107 | ||||
-rw-r--r-- | ishtar_common/models_rest.py | 30 | ||||
-rw-r--r-- | ishtar_common/rest.py | 5 | ||||
-rw-r--r-- | ishtar_common/utils.py | 77 | ||||
-rw-r--r-- | ishtar_common/views.py | 18 | ||||
-rw-r--r-- | ishtar_common/views_item.py | 135 |
19 files changed, 1107 insertions, 215 deletions
diff --git a/archaeological_context_records/models.py b/archaeological_context_records/models.py index a88a7d161..1bb09717d 100644 --- a/archaeological_context_records/models.py +++ b/archaeological_context_records/models.py @@ -25,7 +25,7 @@ from django.conf import settings from django.contrib.gis.db import models from django.contrib.postgres.indexes import GinIndex from django.contrib.sites.models import Site -from django.db import connection, transaction, OperationalError, IntegrityError +from django.db import transaction, OperationalError, IntegrityError from django.db.models import Q from django.db.models.signals import post_delete, post_save, m2m_changed from django.urls import reverse @@ -37,7 +37,7 @@ from ishtar_common.utils import ( cached_label_changed, m2m_historization_changed, post_save_geo, - task, + SearchAltName, ) from ishtar_common.models import ( @@ -58,7 +58,6 @@ from ishtar_common.models import ( get_current_profile, document_attached_changed, HistoryModel, - SearchAltName, GeoItem, CompleteIdentifierItem, SearchVectorConfig, @@ -826,6 +825,7 @@ class ContextRecord( "town_label_with_areas", ] UPPER_GEO = ["operation", "archaeological_site"] + UPPER_PERMISSIONS = [(Operation, "operation_id")] history = HistoricalRecords(bases=[HistoryModel]) objects = UUIDModelManager() @@ -1214,6 +1214,10 @@ class ContextRecord( return actions @classmethod + def get_limit_to_area_query(cls, town_ids): + return Q(operation__towns__pk__in=town_ids) + + @classmethod def get_query_owns(cls, ishtaruser): return ( cls._construct_query_own( diff --git a/archaeological_context_records/tests.py b/archaeological_context_records/tests.py index 8a16facf5..f550d23ce 100644 --- a/archaeological_context_records/tests.py +++ b/archaeological_context_records/tests.py @@ -44,7 +44,8 @@ from ishtar_common.models import ( ) from ishtar_common import forms_common -from archaeological_operations.tests import OperationInitTest, ImportTest +from archaeological_operations.tests import OperationInitTest, ImportTest, \ + TestPermissionRequest from archaeological_operations import models as models_ope from archaeological_operations.views import RELATION_FORMSET_EXTRA_FORM from archaeological_context_records import models @@ -942,10 +943,11 @@ class ContextRecordSearchTest(ContextRecordInit, TestCase, SearchText): self._test_search(c, result, context="Text period search") -class ContextRecordPermissionTest(ContextRecordInit, TestCase): +class ContextRecordOldPermissionTest(ContextRecordInit, TestCase): fixtures = CONTEXT_RECORD_TOWNS_FIXTURES def setUp(self): + print("Theses tests should fail on v5") profile_type = ProfileType.objects.create( label="xxCollaborateur", txt_idx="xxcollaborator", @@ -1056,6 +1058,272 @@ class ContextRecordPermissionTest(ContextRecordInit, TestCase): self.assertRedirects(response, "/") +class ContextRecordPermissionTest(ContextRecordInit, TestPermissionRequest, + TestCase): + fixtures = CONTEXT_RECORD_TOWNS_FIXTURES + + def setUp(self): + IshtarSiteProfile.objects.create() + + self.setup_permission_requests( + "ope", + "operation", + permissions=["view_own_operation", "change_own_operation"], + create_profiles=False + ) + self.setup_permission_requests( + "cr", + "contextrecord", + permissions=["view_own_contextrecord", "change_own_contextrecord"], + perm_requests=['id="new-*"', 'excavator="{USER}"'] + ) + + self.users = {} + username, password, user = create_superuser() + self.users["superuser"] = (username, password, user) + + upstream_username, upstream_password, upstream_user = create_user() + UserProfile.objects.create( + profile_type=self.profile_types["cr_upstream"], + person=upstream_user.ishtaruser.person, + current=True, + ) + self.users["upstream"] = (upstream_username, upstream_password, upstream_user) + + # nosec: hard coded password for test purposes + associated_username, associated_password, associated_user = create_user( # nosec + username="vador", password="darth" + ) + profile = UserProfile.objects.create( + profile_type=self.profile_types["cr_associated_items"], + person=associated_user.ishtaruser.person, + current=True, + ) + self.users["associated"] = ( + associated_username, associated_password, associated_user + ) + + # nosec: hard coded password for test purposes + areas_username, areas_password, areas_user = create_user( # nosec + username="luke", password="iamyourfather" + ) + profile = UserProfile.objects.create( + profile_type=self.profile_types["cr_areas"], + person=areas_user.ishtaruser.person, + current=True, + ) + self.users["areas"] = ( + areas_username, areas_password, areas_user + ) + + town = Town.objects.create(name="Tatouine", numero_insee="66000") + area = Area.objects.create(label="Galaxie", txt_idx="galaxie") + area.towns.add(town) + profile.areas.add(area) + + # nosec: hard coded password for test purposes + simple_up_username, simple_up_password, simple_up_user = create_user( # nosec + username="r2d2", password="bipbip" + ) + UserProfile.objects.create( + profile_type=self.profile_types["cr_upstream"], + person=simple_up_user.ishtaruser.person, + current=True, + ) + self.users["simple_upstream"] = ( + simple_up_username, simple_up_password, simple_up_user + ) + + # nosec: hard coded password for test purposes + request_username, request_password, request_user = create_user( # nosec + username="c6po", password="bopbop" + ) + UserProfile.objects.create( + profile_type=self.profile_types["cr_request_1"], + person=request_user.ishtaruser.person, + current=True, + ) + self.users["request"] = ( + request_username, request_password, request_user + ) + + # nosec: hard coded password for test purposes + request2_username, request2_password, request2_user = create_user( # nosec + username="cowboy", password="bebop" + ) + UserProfile.objects.create( + profile_type=self.profile_types["cr_request_2"], + person=request2_user.ishtaruser.person, + current=True, + ) + self.users["request2"] = ( + request2_username, request2_password, request2_user + ) + + # nosec: hard coded password for test purposes + req_area_username, req_area_password, req_area_user = create_user( # nosec + username="chewee", password="bwawa" + ) + profile = UserProfile.objects.create( + profile_type=self.profile_types["cr_request_areas_1"], + person=req_area_user.ishtaruser.person, + current=True, + ) + self.users["request_areas"] = ( + req_area_username, req_area_password, req_area_user + ) + profile.areas.add(area) + + self.orgas = self.create_orgas(user) + self.operations = self.create_operation(user, self.orgas[0], + values={"code_patriarche": "OPE01"}) + self.operations += self.create_operation(upstream_user, self.orgas[0], + values={"code_patriarche": "OPE02"}) + self.operations[0].towns.add(town) + + self.operations[1].ishtar_users.add(simple_up_user.ishtaruser) + + self.create_context_record( + user=user, data={"label": "new-CR1", "operation": self.operations[0]} + ) + self.create_context_record( + user=user, data={"label": "new-CR2", "operation": self.operations[1]} + ) + self.create_context_record( + user=associated_user, + data={"label": "old-CR3", "operation": self.operations[0]} + ) + self.cr_1 = self.context_records[0] + self.cr_2 = self.context_records[1] + self.cr_2.ishtar_users.add(associated_user.ishtaruser) + + associated_user.ishtaruser.generate_permission() + areas_user.ishtaruser.generate_permission() + simple_up_user.ishtaruser.generate_permission() + request_user.ishtaruser.generate_permission() + req_area_user.ishtaruser.generate_permission() + request2_user.ishtaruser.generate_permission() + + # upstream with associated request for operation + gp = Group.objects.get(name="ope_xxx") + self.profile_types["cr_upstream"].groups.add(gp) + self.profile_types["cr_upstream"].permission_requests.add( + self.permission_requests["ope_associated_items"] + ) + upstream_user.ishtaruser.generate_permission() + + def test_own_search(self): + # no result when no authentification + c = Client() + response = c.get(reverse("get-contextrecord")) + self.assertTrue(not json.loads(response.content.decode())) + + url = reverse("get-contextrecord") + # possession + direct "history_creator_id" + # two context record available via operation + self._test_search( + url, + 'possession + direct "history_creator_id"', + self.users["associated"], + 2 + ) + + # upstream + direct "history_creator_id" + # one context record available via operation - can view operation + self._test_search( + url, + 'upstream + direct "history_creator_id"', + self.users["upstream"], + 1 + ) + + # upstream: no upstream permission only via direct association + # one context record available via operation + self._test_search( + url, + 'upstream: no upstream permission only via direct association', + self.users["simple_upstream"], + 1 + ) + + # area filter + # only one "own" operation available + self._test_search( + url, + 'areas filter', + self.users["areas"], + 2 + ) + + # request + # 2 context record available via request + self._test_search( + url, + 'request', + self.users["request"], + 2 + ) + + # request limited by area + # 2 context record available via request but only one match the area + self._test_search( + url, + 'request limited by area', + self.users["request_areas"], + 1 + ) + # request with {USER} + # one context record available via request + self._test_search( + url, + 'request2', + self.users["request2"], + 0 + ) + __, __, request2_user = self.users["request2"] + cr2 = self.context_records[2] + cr2.excavator = request2_user.ishtaruser.person + cr2.save() + request2_user.ishtaruser.generate_permission() + self._test_search( + url, + 'request2', + self.users["request2"], + 1 + ) + + def test_own_modify(self): + # no result when no authentification + c = Client() + response = c.get(reverse("record_modify", args=[self.cr_2.pk])) + self.assertRedirects(response, "/") + + modif_url = "/record_modification/operation-record_modification" + + # upstream + c = Client() + upstream_username, upstream_password, upstream_user = self.users["upstream"] + c.login(username=upstream_username, password=upstream_password) + response = c.get(reverse("record_modify", args=[self.cr_2.pk]), follow=True) + self.assertRedirects(response, modif_url) + response = c.get(modif_url) + + self.assertEqual(response.status_code, 200) + response = c.get(reverse("record_modify", args=[self.cr_1.pk]), follow=True) + self.assertRedirects(response, "/") + + # area filter + c = Client() + areas_username, areas_password, areas_user = self.users["areas"] + c.login(username=areas_username, password=areas_password) + response = c.get(reverse("record_modify", args=[self.cr_1.pk]), follow=True) + self.assertRedirects(response, modif_url) + response = c.get(modif_url) + self.assertEqual(response.status_code, 200) + response = c.get(reverse("record_modify", args=[self.cr_2.pk]), follow=True) + self.assertRedirects(response, "/") + + class RecordRelationsTest(ContextRecordInit, TestCase): fixtures = OPERATION_TOWNS_FIXTURES model = models.ContextRecord diff --git a/archaeological_files/models.py b/archaeological_files/models.py index 778b1c251..b3815c95d 100644 --- a/archaeological_files/models.py +++ b/archaeological_files/models.py @@ -27,7 +27,7 @@ from django.contrib.gis.db import models from django.contrib.postgres.indexes import GinIndex from django.core.cache import cache from django.core.validators import MinValueValidator, MaxValueValidator -from django.db.models import Max +from django.db.models import Q, Max from django.db.models.signals import post_save, m2m_changed, post_delete from django.urls import reverse @@ -41,6 +41,7 @@ from ishtar_common.utils import ( get_current_year, get_generated_id, m2m_historization_changed, + SearchAltName, ) from ishtar_common.models import ( @@ -53,7 +54,6 @@ from ishtar_common.models import ( Person, Organization, Town, - Dashboard, DashboardFormItem, HistoricalRecords, ValueGetter, @@ -62,11 +62,9 @@ from ishtar_common.models import ( post_save_cache, Document, HistoryModel, - SearchAltName, SearchVectorConfig, DocumentItem, CompleteIdentifierItem, - HierarchicalType, ) from archaeological_operations.models import ( @@ -1059,6 +1057,10 @@ class File( return cls @classmethod + def get_limit_to_area_query(cls, town_ids): + return Q(towns__pk__in=town_ids) | Q(main_town_id__in=town_ids) + + @classmethod def get_owns( cls, user, menu_filtr=None, limit=None, values=None, get_short_menu_class=False, no_auth_check=False, query=False diff --git a/archaeological_finds/models_finds.py b/archaeological_finds/models_finds.py index 9ba25cc83..ece7d08b8 100644 --- a/archaeological_finds/models_finds.py +++ b/archaeological_finds/models_finds.py @@ -37,6 +37,7 @@ from ishtar_common.utils import ( m2m_historization_changed, pgettext_lazy, post_save_geo, + SearchAltName, ugettext_lazy as _ ) @@ -67,7 +68,6 @@ from ishtar_common.models import ( Person, post_save_cache, QuickAction, - SearchAltName, SearchVectorConfig, ValueGetter, ) @@ -2006,6 +2006,12 @@ class Find( "excavation_ids", "weight_string", ] + UPPER_PERMISSIONS = [ + (Operation, "base_finds__context_record__operation_id"), + (ContextRecord, "base_finds__context_record_id"), + (("archaeological_warehouse", "Warehouse"), "container__location_id"), + (("archaeological_warehouse", "Warehouse"), "container_ref__responsibility_id"), + ] SHEET_ALTERNATIVES = [("museum", "museum_find")] objects = UUIDModelManager() @@ -2991,6 +2997,10 @@ class Find( return new @classmethod + def get_limit_to_area_query(cls, town_ids): + return Q(base_finds__context_record__operation__towns__pk__in=town_ids) + + @classmethod def _get_query_owns(cls, ishtaruser, prefix=""): q = ( cls._construct_query_own( diff --git a/archaeological_finds/models_treatments.py b/archaeological_finds/models_treatments.py index 5ba50728b..45dc26c16 100644 --- a/archaeological_finds/models_treatments.py +++ b/archaeological_finds/models_treatments.py @@ -49,7 +49,6 @@ from ishtar_common.models import ( document_attached_changed, MainItem, HistoryModel, - SearchAltName, SearchVectorConfig, DocumentItem, ) @@ -57,8 +56,9 @@ from ishtar_common.models_common import CompleteIdentifierItem, HistoricalRecord from ishtar_common.utils import ( cached_label_changed, get_current_year, - update_data, m2m_historization_changed, + SearchAltName, + update_data, ) @@ -91,6 +91,7 @@ class TreatmentState(GeneralType): 'available': True}) return treat_state + post_save.connect(post_save_cache, sender=TreatmentState) post_delete.connect(post_save_cache, sender=TreatmentState) diff --git a/archaeological_finds/tests.py b/archaeological_finds/tests.py index 5df18cf64..e0532effc 100644 --- a/archaeological_finds/tests.py +++ b/archaeological_finds/tests.py @@ -95,7 +95,7 @@ from ishtar_common.tests import ( SearchText, ) from archaeological_operations.tests import ImportTest, create_operation, \ - create_administrativact + create_administrativact, TestPermissionRequest from archaeological_context_records.tests import ContextRecordInit from archaeological_operations.serializers import operation_serialization @@ -1918,11 +1918,12 @@ class FindAutocompleteTest(FindInit, TestCase): self.assertEqual(res[2]["id"], find4.pk) # 12 - contains -class FindPermissionTest(FindInit, TestCase): +class FindOldPermissionTest(FindInit, TestCase): fixtures = FIND_FIXTURES model = models.Find def setUp(self): + print("Theses tests should fail on v5") profile_type = ProfileType.objects.create( label="xxCollaborateur", txt_idx="xxcollaborator", @@ -2021,6 +2022,186 @@ class FindPermissionTest(FindInit, TestCase): self.assertEqual(json.loads(content)["recordsTotal"], 1) +class FindPermissionTest(FindInit, TestPermissionRequest, TestCase): + fixtures = FIND_FIXTURES + model = models.Find + + def setUp(self): + self.setup_permission_requests( + "find", + "find", + permissions=["view_own_find", "change_own_find"], + perm_requests=['id="new-*"', 'excavator="{USER}"'] + ) + + self.users = {} + username, password, user = create_superuser() + self.users["superuser"] = (username, password, user) + + upstream_username, upstream_password, upstream_user = create_user( + username="up", password="up" + ) + UserProfile.objects.create( + profile_type=self.profile_types["find_upstream"], + person=upstream_user.ishtaruser.person, + current=True, + ) + self.users["upstream"] = (upstream_username, upstream_password, upstream_user) + + # nosec: hard coded password for test purposes + areas_username, areas_password, areas_user = create_user( # nosec + username="luke", password="iamyourfather" + ) + profile = UserProfile.objects.create( + profile_type=self.profile_types["find_areas"], + person=areas_user.ishtaruser.person, + current=True, + ) + self.users["areas"] = ( + areas_username, areas_password, areas_user + ) + + town = Town.objects.create(name="Tatouine", numero_insee="66000") + area = Area.objects.create(label="Galaxie", txt_idx="galaxie") + area.towns.add(town) + profile.areas.add(area) + + self.orgas = self.create_orgas(user) + self.create_operation(user, self.orgas[0]) + self.create_operation(areas_user, self.orgas[0]) + + self.create_context_record( + user=user, data={"label": "CR 1", "operation": self.operations[0]} + ) + self.create_context_record( + user=areas_user, data={"label": "CR 2", "operation": self.operations[1]} + ) + self.cr_1 = self.context_records[-2] + self.cr_2 = self.context_records[-1] + + self.create_finds( + data_base={"context_record": self.cr_1}, user=user, force=True + ) + self.create_finds( + data_base={"context_record": self.cr_2}, user=areas_user, force=True + ) + + self.find_1 = self.finds[-2] + self.find_2 = self.finds[-1] + self.operations[-1].towns.add(town) + + self.operations[-1].context_record.all()[0].ishtar_users.add( + upstream_user.ishtaruser + ) + + associated_username, associated_password, associated_user = create_user( + username="as", password="as" + ) + UserProfile.objects.create( + profile_type=self.profile_types["find_associated_items"], + person=associated_user.ishtaruser.person, + current=True, + ) + self.users["associated"] = ( + associated_username, associated_password, associated_user + ) + + # read permission + self.basket = models.FindBasket.objects.create( + label="My basket", + user=IshtarUser.objects.get(pk=user.pk), + ) + self.basket.items.add(self.find_1) + self.basket.shared_with.add(associated_user.ishtaruser) + + upstream_user.ishtaruser.generate_permission() + areas_user.ishtaruser.generate_permission() + associated_user.ishtaruser.generate_permission() + + def test_own_search(self): + # no result when no authentification + c = Client() + response = c.get(reverse("get-find")) + self.assertTrue(not response.content or not json.loads(response.content)) + + url = reverse("get-find") + + # possession of associated operation + # only one "own" context record available + self._test_search( + url, + 'possession', + self.users["upstream"], + 1 + ) + + # area filter + # only one "own" operation available + self._test_search( + url, + 'areas filter', + self.users["areas"], + 1 + ) + + # filter associated by basket + self._test_search( + url, + 'associated basket filter', + self.users["associated"], + 1 + ) + + def test_own_modify(self): + # no result when no authentification + c = Client() + response = c.get(reverse("find_modify", args=[self.cr_2.pk])) + self.assertRedirects(response, "/") + + modif_url = "/find_modification/find-find_modification" + + # upstream + c = Client() + upstream_username, upstream_password, upstream_user = self.users["upstream"] + c.login(username=upstream_username, password=upstream_password) + response = c.get(reverse("find_modify", args=[self.find_2.pk]), follow=True) + self.assertRedirects(response, modif_url) + response = c.get(modif_url) + + self.assertEqual(response.status_code, 200) + response = c.get(reverse("find_modify", args=[self.find_1.pk]), follow=True) + self.assertRedirects(response, "/") + + # area filter + c = Client() + areas_username, areas_password, areas_user = self.users["areas"] + c.login(username=areas_username, password=areas_password) + response = c.get(reverse("find_modify", args=[self.find_2.pk]), follow=True) + self.assertRedirects(response, modif_url) + response = c.get(modif_url) + self.assertEqual(response.status_code, 200) + response = c.get(reverse("find_modify", args=[self.find_1.pk]), follow=True) + self.assertRedirects(response, "/") + + # basket filter + c = Client() + basket_username, basket_password, basket_user = self.users["associated"] + c.login(username=basket_username, password=basket_password) + response = c.get(reverse("find_modify", args=[self.find_1.pk]), follow=True) + self.assertRedirects(response, "/") + + self.basket.shared_write_with.add(basket_user.ishtaruser) + basket_user.ishtaruser.generate_permission() + + response = c.get(reverse("find_modify", args=[self.find_1.pk]), follow=True) + self.assertRedirects(response, modif_url) + response = c.get(modif_url) + self.assertEqual(response.status_code, 200) + + response = c.get(reverse("find_modify", args=[self.find_2.pk]), follow=True) + self.assertRedirects(response, "/") + + class FindQATest(FindInit, TestCase): fixtures = WAREHOUSE_FIXTURES model = models.Find diff --git a/archaeological_operations/models.py b/archaeological_operations/models.py index 2fafa56ed..9e43f264b 100644 --- a/archaeological_operations/models.py +++ b/archaeological_operations/models.py @@ -66,7 +66,6 @@ from ishtar_common.models import ( get_current_profile, document_attached_changed, HistoryModel, - SearchAltName, GeoItem, CompleteIdentifierItem, SearchVectorConfig, @@ -85,6 +84,7 @@ from ishtar_common.utils import ( mode, m2m_historization_changed, post_save_geo, + SearchAltName, SheetItem, ) @@ -851,6 +851,10 @@ class ArchaeologicalSite( return actions @classmethod + def get_limit_to_area_query(cls, town_ids): + return Q(towns__pk__in=town_ids) + + @classmethod def _get_query_owns_dicts(cls, ishtaruser, no_rel=False): profile = ishtaruser.current_profile town_ids = [] @@ -2309,6 +2313,10 @@ class Operation( return round(float(self.cost) / self.surface, 2) @classmethod + def get_limit_to_area_query(cls, town_ids): + return Q(towns__pk__in=town_ids) + + @classmethod def _get_query_owns_dicts(cls, ishtaruser, no_rel=False): profile = ishtaruser.current_profile town_ids = [] diff --git a/archaeological_operations/tests.py b/archaeological_operations/tests.py index fa2471bbd..10bc52967 100644 --- a/archaeological_operations/tests.py +++ b/archaeological_operations/tests.py @@ -69,10 +69,8 @@ from ishtar_common.models import ( Town, ImporterColumn, ImportColumnValue, + PermissionRequest, Person, - Author, - SourceType, - AuthorType, DocumentTemplate, PersonType, TargetKeyGroup, @@ -2014,14 +2012,14 @@ class OperationInitTest(object): self.parcels.pop(0) return self.create_operation()[-1] - def create_operation(self, user=None, orga=None): + def create_operation(self, user=None, orga=None, values=None): if not orga: self.get_default_orga(user) if not user: self.get_default_user() if not getattr(self, "operations", None): self.operations = [] - self.operations.append(create_operation(user, orga)) + self.operations.append(create_operation(user, orga, values=values)) return self.operations def get_default_operation(self, force=False, user=None): @@ -3472,10 +3470,94 @@ class OperationSearchTest(TestCase, OperationInitTest, SearchText): self.assertEqual(values["data"], expected_result) -class OperationPermissionTest(TestCase, OperationInitTest): +class TestPermissionRequest: + def setup_permission_requests(self, prefix, model_name, permissions, + perm_requests=None, create_profiles=True): + content_type = ContentType.objects.get(model=model_name) + if not hasattr(self, "permission_requests"): + self.permission_requests = {} + self.permission_requests.update({ + f"{prefix}_associated_items": PermissionRequest.objects.create( + model=content_type, + request="", + include_associated_items=True, + include_upstream_items=False, + slug=f'{prefix}-associated_items' + ), + f"{prefix}_upstream": PermissionRequest.objects.create( + model=content_type, + request="", + include_associated_items=False, + include_upstream_items=True, + slug=f"{prefix}-upstream" + ), + f"{prefix}_areas": PermissionRequest.objects.create( + model=content_type, + request="", + include_associated_items=False, + include_upstream_items=False, + limit_to_attached_areas=True, + slug=f"{prefix}-areas" + ), + }) + if perm_requests: + for idx, request in enumerate(perm_requests): + pr = PermissionRequest.objects.create( + model=content_type, + request=request, + include_associated_items=False, + include_upstream_items=False, + slug=f"{prefix}-req-{idx+1}" + ) + self.permission_requests[f"{prefix}_request_{idx + 1}"] = pr + pr = PermissionRequest.objects.create( + model=content_type, + request=request, + include_associated_items=False, + include_upstream_items=False, + limit_to_attached_areas=True, + slug=f"{prefix}-req-areas-{idx+1}" + ) + self.permission_requests[f"{prefix}_request_areas_{idx + 1}"] = pr + gp = Group.objects.create(name=f"{prefix}_xxx") + for permission in permissions: + for p in Permission.objects.filter(codename=permission).all(): + gp.permissions.add(p) + if not create_profiles: + return + if not hasattr(self, "profile_types"): + self.profile_types = {} + for key in self.permission_requests.keys(): + profile_type = ProfileType.objects.create( + label=key, + txt_idx=key, + ) + profile_type.groups.add(gp) + profile_type.permission_requests.add(self.permission_requests[key]) + self.profile_types[key] = profile_type + + def _test_search(self, url, label, user, expected): + c = Client() + username, password, __ = user + c.login(username=username, password=password) + response = c.get(url) + content = response.content.decode() + if expected: + self.assertTrue( + json.loads(content), + f"search own test for {label}" + ) + self.assertEqual( + json.loads(content)["recordsTotal"], expected, + f"search own test for {label} - {expected} expected" + ) + + +class OperationOldPermissionTest(TestCase, OperationInitTest): fixtures = FILE_FIXTURES def setUp(self): + print("Theses tests should fail on v5") IshtarSiteProfile.objects.get_or_create(slug="default", active=True) self.username, self.password, self.user = create_superuser() @@ -3638,6 +3720,200 @@ class OperationPermissionTest(TestCase, OperationInitTest): self.assertRedirects(response, "/") +class OperationPermissionTest(TestCase, TestPermissionRequest, OperationInitTest): + fixtures = FILE_FIXTURES + + def setUp(self): + IshtarSiteProfile.objects.get_or_create(slug="default", active=True) + + self.setup_permission_requests( + "ope", + "operation", + permissions=["view_own_operation", "change_own_operation"], + ) + + self.users = {} + username, password, user = create_superuser() + self.users["superuser"] = (username, password, user) + + # nosec: hard coded password for test purposes + associated_username, associated_password, associated_user = create_user( # nosec + username="vador", password="darth" + ) + profile = UserProfile.objects.create( + profile_type=self.profile_types["ope_associated_items"], + person=associated_user.ishtaruser.person, + current=True, + ) + self.users["associated"] = ( + associated_username, associated_password, associated_user + ) + + # nosec: hard coded password for test purposes + areas_username, areas_password, areas_user = create_user( # nosec + username="luke", password="iamyourfather" + ) + profile = UserProfile.objects.create( + profile_type=self.profile_types["ope_areas"], + person=areas_user.ishtaruser.person, + current=True, + ) + self.users["areas"] = ( + areas_username, areas_password, areas_user + ) + + town = Town.objects.create(name="Tatouine", numero_insee="66000") + area = Area.objects.create(label="Galaxie", txt_idx="galaxie") + area.towns.add(town) + profile.areas.add(area) + + self.orgas = self.create_orgas(user) + self.create_operation(user, self.orgas[0]) + self.operations = self.create_operation(user, self.orgas[0]) + self.operations[1].towns.add(town) + self.item = self.operations[0] + associated_user.ishtaruser.generate_permission() + + def test_permission_generation(self): + __, __, associated_user = self.users["associated"] + self.assertFalse( + associated_user.has_perm( + "archaeological_operations.change_own_operation", + self.operations[1] + ) + ) + self.assertFalse( + associated_user.has_perm( + "archaeological_operations.view_own_operation", + self.operations[1] + ) + ) + self.operations[1].ishtar_users.add(associated_user.ishtaruser) + associated_user.ishtaruser.generate_permission() + self.assertTrue( + associated_user.has_perm( + "archaeological_operations.view_own_operation", + self.operations[1] + ) + ) + self.assertTrue( + associated_user.has_perm( + "archaeological_operations.change_own_operation", + self.operations[1] + ) + ) + # general permission is assigned + self.assertTrue( + associated_user.has_perm( + "archaeological_operations.change_own_operation", + ) + ) + self.assertFalse( + associated_user.has_perm( + "archaeological_operations.change_own_operation", + self.operations[0] + ) + ) + + def test_own_search(self): + # no result when no authentification + c = Client() + response = c.get(reverse("get-operation"), {"year": "2010"}) + self.assertFalse(json.loads(response.content.decode())) + + associated_username, associated_password, associated_user = self.users["associated"] + # possession + c = Client() + c.login(username=associated_username, password=associated_password) + response = c.get(reverse("get-operation"), {"year": "2010"}) + content = json.loads(response.content.decode()) + self.assertTrue(content) + self.assertEqual(content["recordsTotal"], 0) + + self.operations[1].ishtar_users.add(associated_user.ishtaruser) + associated_user.ishtaruser.generate_permission() + + response = c.get(reverse("get-operation"), {"year": "2010"}) + # only one "own" operation available + content = json.loads(response.content.decode()) + self.assertTrue(content) + self.assertEqual(content["recordsTotal"], 1) + + operator_key = pgettext_lazy("key for text search", "operator") + response = c.get(reverse("get-operation"), {operator_key: self.orgas[0].name}) + self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1) + response = c.get( + reverse("get-operation"), + {"search_vector": '{}="{}"'.format(operator_key, self.orgas[0].name)}, + ) + self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1) + + # area filter + areas_username, areas_password, areas_user = self.users["areas"] + areas_user.ishtaruser.generate_permission() + c = Client() + c.login(username=areas_username, password=areas_password) + response = c.get(reverse("get-operation"), {"year": "2010"}) + # only one "own" operation available + self.assertTrue(json.loads(response.content.decode())) + self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1) + response = c.get(reverse("get-operation"), {operator_key: self.orgas[0].name}) + self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1) + response = c.get( + reverse("get-operation"), + {"search_vector": '{}="{}"'.format(operator_key, self.orgas[0].name)}, + ) + self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1) + + def test_own_modify(self): + operation_pk1 = self.operations[0].pk + operation_pk2 = self.operations[1].pk + + modif_url = "/operation_modification/general-operation_modification" + + # no result when no authentification + c = Client() + response = c.get(reverse("operation_modify", args=[operation_pk2])) + self.assertRedirects(response, "/") + + # possession + associated_username, associated_password, associated_user = self.users["associated"] + c = Client() + c.login(username=associated_username, password=associated_password) + associated_user.ishtaruser.generate_permission() + response = c.get(reverse("operation_modify", args=[operation_pk2]), follow=True) + self.assertRedirects(response, "/") + + self.operations[1].ishtar_users.add(associated_user.ishtaruser) + associated_user.ishtaruser.generate_permission() + response = c.get(reverse("operation_modify", args=[operation_pk2]), follow=True) + self.assertRedirects(response, modif_url) + + response = c.get(modif_url) + self.assertEqual(response.status_code, 200) + response = c.get(reverse("operation_modify", args=[operation_pk1]), follow=True) + self.assertRedirects(response, "/") + + profile_type = ProfileType.objects.get(txt_idx="collaborator") + profile_type.groups.add( + Group.objects.get( + name="Opérations rattachées : " "modification/suppression" + ) + ) + + # area filter + areas_username, areas_password, areas_user = self.users["areas"] + areas_user.ishtaruser.generate_permission() + c = Client() + c.login(username=areas_username, password=areas_password) + response = c.get(reverse("operation_modify", args=[operation_pk2]), follow=True) + self.assertRedirects(response, modif_url) + response = c.get(modif_url) + self.assertEqual(response.status_code, 200) + response = c.get(reverse("operation_modify", args=[operation_pk1]), follow=True) + self.assertRedirects(response, "/") + + class LabelTest(TestCase, OperationInitTest): fixtures = FILE_FIXTURES diff --git a/archaeological_warehouse/models.py b/archaeological_warehouse/models.py index 9aa5549db..5a854c079 100644 --- a/archaeological_warehouse/models.py +++ b/archaeological_warehouse/models.py @@ -45,7 +45,6 @@ from ishtar_common.models_common import ( post_save_cache, DashboardFormItem, document_attached_changed, - SearchAltName, GeoItem, CompleteIdentifierItem, SearchVectorConfig, @@ -60,6 +59,7 @@ from ishtar_common.utils import ( cached_label_changed, cached_label_and_geo_changed, get_generated_id, + SearchAltName, ) logger = logging.getLogger(__name__) @@ -1144,6 +1144,10 @@ class Container( "localisation_list": "localisation_list", } SERIALIZE_PROPERTIES = MainItem.SERIALIZE_PROPERTIES + ["short_label"] + UPPER_PERMISSIONS = [ + (Warehouse, "location_id"), + (Warehouse, "responsibility_id"), + ] objects = UUIDModelManager() diff --git a/ishtar_common/admin.py b/ishtar_common/admin.py index 743d643a3..369821b45 100644 --- a/ishtar_common/admin.py +++ b/ishtar_common/admin.py @@ -72,7 +72,8 @@ from django import forms from ishtar_common import models, models_common, models_rest from ishtar_common.apps import admin_site from ishtar_common.model_merging import merge_model_objects -from ishtar_common.utils import get_cache, create_slug, get_person_gdpr_log +from ishtar_common.utils import API_MAIN_CONTENT_TYPES, get_cache, create_slug,\ + get_person_gdpr_log from ishtar_common import forms as common_forms, forms_common as other_common_forms from ishtar_common.serializers import restore_serialized, IMPORT_MODEL_LIST @@ -2561,7 +2562,7 @@ admin_site.register(models_rest.ApiUser, ApiUserAdmin) def get_api_choices(): pks = [] - for app_label, model_name in models_rest.MAIN_CONTENT_TYPES: + for app_label, model_name in API_MAIN_CONTENT_TYPES: try: ct = ContentType.objects.get(app_label=app_label, model=model_name) pks.append(ct.pk) diff --git a/ishtar_common/menu_base.py b/ishtar_common/menu_base.py index e0f206a57..feb05c6db 100644 --- a/ishtar_common/menu_base.py +++ b/ishtar_common/menu_base.py @@ -17,7 +17,7 @@ # See the file COPYING for details. -from ishtar_common.models import get_current_profile +from ishtar_common.utils import get_current_profile class SectionItem: diff --git a/ishtar_common/menus.py b/ishtar_common/menus.py index aae127a09..c455fa73a 100644 --- a/ishtar_common/menus.py +++ b/ishtar_common/menus.py @@ -30,33 +30,34 @@ from django.urls import reverse from django.contrib.auth.models import User -_extra_menus = [] -# collect menu from INSTALLED_APPS -for app in settings.INSTALLED_APPS: - mod = __import__(app, fromlist=["ishtar_menu"]) - if hasattr(mod, "ishtar_menu"): - menu = getattr(mod, "ishtar_menu") - _extra_menus += menu.MENU_SECTIONS - -# sort -__section_items = [mnu for order, mnu in sorted(_extra_menus, key=lambda x: x[0])] -# regroup menus -_section_items, __keys = [], [] -for section_item in __section_items: - if section_item.idx not in __keys: - __keys.append(section_item.idx) - _section_items.append(section_item) - continue - section_childs = _section_items[__keys.index(section_item.idx)].childs - childs_idx = [child.idx for child in section_childs] - for child in section_item.childs: - if child.idx not in childs_idx: - section_childs.append(child) +def get_section_items(): + _extra_menus = [] + # collect menu from INSTALLED_APPS + for app in settings.INSTALLED_APPS: + mod = __import__(app, fromlist=["ishtar_menu"]) + if hasattr(mod, "ishtar_menu"): + menu = getattr(mod, "ishtar_menu") + _extra_menus += menu.MENU_SECTIONS + + # sort + __section_items = [mnu for order, mnu in sorted(_extra_menus, key=lambda x: x[0])] + # regroup menus + _section_items, __keys = [], [] + for section_item in __section_items: + if section_item.idx not in __keys: + __keys.append(section_item.idx) + _section_items.append(section_item) + continue + section_childs = _section_items[__keys.index(section_item.idx)].childs + childs_idx = [child.idx for child in section_childs] + for child in section_item.childs: + if child.idx not in childs_idx: + section_childs.append(child) + return _section_items -class Menu: - ref_childs = _section_items +class Menu: def __init__(self, user, current_action=None, session=None): self.user = user self.initialized = False @@ -74,6 +75,7 @@ class Menu: self.selected_idx = None self.session = session self.items_by_idx = {} + self.ref_childs = get_section_items() def reinit_menu_for_all_user(self): """ diff --git a/ishtar_common/models.py b/ishtar_common/models.py index 8e5b7f703..3e51f8cb1 100644 --- a/ishtar_common/models.py +++ b/ishtar_common/models.py @@ -97,6 +97,7 @@ from ishtar_common.utils import ( InlineClass ) from ishtar_common.utils_secretary import IshtarSecretaryRenderer +from ishtar_common.views_item import get_item from ishtar_common.alternative_configs import ( ALTERNATE_CONFIGS, @@ -140,7 +141,8 @@ from ishtar_common.utils import ( cached_label_changed, generate_relation_graph, max_size_help, - JSON_SERIALIZATION + JSON_SERIALIZATION, + SearchAltName, ) from ishtar_common.models_common import ( @@ -175,7 +177,6 @@ from ishtar_common.models_common import ( PermissionRequest, post_save_cache, QuickAction, - SearchAltName, SearchVectorConfig, SpatialReferenceSystem, TemplateItem, @@ -198,6 +199,7 @@ __all__ = [ "ImporterColumn", "ImporterDuplicateField", "Imported", + "PermissionRequest", "Regexp", "ImportTarget", "ItemKey", @@ -3466,7 +3468,8 @@ class ProfileTypeSummary(ProfileType): class UserProfile(models.Model): name = models.CharField(_("Name"), blank=True, default="", max_length=100) profile_type = models.ForeignKey( - ProfileType, verbose_name=_("Profile type"), on_delete=models.PROTECT + ProfileType, verbose_name=_("Profile type"), on_delete=models.PROTECT, + related_name="user_profiles" ) areas = models.ManyToManyField( "Area", verbose_name=_("Areas"), blank=True, related_name="profiles" @@ -3521,7 +3524,9 @@ class UserProfile(models.Model): def duplicate(self, **kwargs): areas = [area for area in self.areas.all()] - external_sources = [external_source for external_source in self.external_sources.all()] + external_sources = [ + external_source for external_source in self.external_sources.all() + ] new_item = self new_item.pk = None name = self.name @@ -3541,50 +3546,95 @@ class UserProfile(models.Model): new_item.external_sources.add(src) return new_item - def _generate_permission(self, ishtar_user, content_type, permission_request): + def _generate_permission(self, ishtar_user, content_type, permission_request, + permissions, permission_type): item_ids = [] model_class = content_type.model_class() - # TODO: gérer les paniers if permission_request.include_associated_items: - item_ids += model_class.filter( + item_ids += model_class.objects.filter( ishtar_users__pk=ishtar_user.pk ).values_list("pk", flat=True) + item_ids += model_class.objects.filter( + history_creator_id=ishtar_user.pk + ).values_list("pk", flat=True) + if content_type.model == "find" and \ + permission_type in ("view", "change"): + Find = apps.get_model("archaeological_finds", "Find") + k = "basket__shared_write_with" if permission_type == "change" \ + else "basket__shared_with" + item_ids += list( + Find.objects.filter(**{k: ishtar_user}).values_list("pk", flat=True) + ) + print("ishtar_common/models.py - 3561", item_ids, ishtar_user, content_type, permission_type) if permission_request.include_upstream_items: - # TODO.... - item_ids += model_class.get_ids_from_upper_permissions(ishtar_user.user_ptr.pk) + item_ids += model_class.get_ids_from_upper_permissions( + ishtar_user.user_ptr.pk, permissions + ) + print("ishtar_common/models.py - 3566", item_ids, ishtar_user, content_type, permission_type) if permission_request.request or permission_request.limit_to_attached_areas: - # TODO - pass - query = model_class.objects + _get_item = get_item( + content_type.model_class(), + "", "", no_permission_check=True, + ) + result = [] + query = permission_request.request + if query: + if "{USER}" in query: + query = query.replace("{USER}", f"id:{ishtar_user.person_id}") + query = {"search_vector": query} + q = _get_item(None, return_query=True, ishtaruser=ishtar_user, + query=query) + result = list(q.values_list("pk", flat=True)) + if permission_request.limit_to_attached_areas: + profile = ishtar_user.current_profile + if not profile: # no areas attached + return [] + town_ids = list(profile.query_towns.values_list("pk", flat=True)) + result_limit = [] + get_limit_to_area_query = getattr( + model_class, "get_limit_to_area_query", None + ) + q = get_limit_to_area_query(town_ids) if get_limit_to_area_query else None + if q: + result_limit = list( + model_class.objects.filter(q).values_list("pk", flat=True) + ) + if result: + result = [pk for pk in result if pk in result_limit] + else: + result = result_limit + item_ids += result + print("ishtar_common/models.py - 3600", item_ids, ishtar_user, content_type, permission_type) return item_ids - def generate_permission(self, content_type): + def generate_permission(self, content_type, permission_type): ishtar_user = self.person.ishtaruser # add base permissions for group in self.profile_type.groups.all(): - for perm in group.permissions.all(): + for perm in group.permissions.filter( + codename__startswith=permission_type).all(): ishtar_user.user_ptr.user_permissions.add(perm) q_has_perm = self.profile_type.groups.filter( permissions__content_type=content_type, - permissions__codename__contains="_own_" + permissions__codename__startswith=f"{permission_type}_own_", ) if not q_has_perm.count(): # no permission to generate return permissions = [] for group in q_has_perm.all(): - permissions += list(group.permissions.values_list("pk", flat=True)) + permissions += list(group.permissions.filter( + codename__contains=permission_type + ).all()) q_req = self.profile_type.permission_requests.filter( model=content_type, active=True ) item_ids = [] if not q_req.count(): # TODO v5: delete old behaviour - """ print(f"WARNING: no permission request for content {content_type.name} and profile {self}") print("Using old behaviour") - """ model_class = content_type.model_class() query = model_class.get_owns(user=ishtar_user, query=True, no_auth_check=True) if query: @@ -3594,13 +3644,15 @@ class UserProfile(models.Model): else: for perm_request in q_req.all(): item_ids += self._generate_permission( - ishtar_user, content_type, perm_request + ishtar_user, content_type, perm_request, permissions, + permission_type ) user_id = ishtar_user.user_ptr.pk object_permissions = [] item_ids = list(set(item_ids)) permissions = list(set(permissions)) - for permission_id in permissions: + for permission in permissions: + permission_id = permission.pk exclude = list(UserObjectPermission.objects.filter( content_type_id=content_type.pk, permission_id=permission_id, user_id=user_id @@ -3900,7 +3952,8 @@ class IshtarUser(FullSearch): for ct in content_types: for profile in self.person.profiles.all(): - profile.generate_permission(ct) + for permission_type in ("view", "change", "delete"): + profile.generate_permission(ct, permission_type) def full_label(self): return self.person.full_label() diff --git a/ishtar_common/models_common.py b/ishtar_common/models_common.py index 28f5aba00..920b71584 100644 --- a/ishtar_common/models_common.py +++ b/ishtar_common/models_common.py @@ -17,11 +17,12 @@ import re import shutil import tempfile import time +from unidecode import unidecode from django import forms from django.apps import apps from django.conf import settings -from django.contrib.auth.models import User +from django.contrib.auth.models import Permission, User from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.gis.db import models @@ -49,6 +50,8 @@ from ishtar_common.utils import ( get_image_path, get_columns_from_class, human_date, + HistoryError, + SearchAltName, SheetItem ) from simple_history.models import HistoricalRecords as BaseHistoricalRecords @@ -56,7 +59,8 @@ from simple_history.signals import ( post_create_historical_record, pre_create_historical_record, ) -from unidecode import unidecode + +from guardian.models import UserObjectPermission from ishtar_common.data_importer import post_importer_action, ImporterError from ishtar_common.model_managers import TypeManager @@ -64,19 +68,20 @@ from ishtar_common.model_merging import merge_model_objects from ishtar_common.models_imports import Import from ishtar_common.templatetags.link_to_window import simple_link_to_window from ishtar_common.utils import ( - get_cache, + cached_label_changed, disable_for_loaddata, + duplicate_item, + external_id_changed, + GENERAL_TYPE_PREFIX, get_all_field_names, + get_cache, + get_current_profile, + get_generated_id, merge_tsvectors, - cached_label_changed, - external_id_changed, + OwnPerms, post_save_geo, post_save_geodata, task, - duplicate_item, - get_generated_id, - get_current_profile, - OwnPerms ) @@ -513,12 +518,6 @@ class GeneralType(Cached, models.Model): res[parent_id].append((item["id"], item["label"])) return res - PREFIX = "│ " - PREFIX_EMPTY = " " - PREFIX_MEDIUM = "├ " - PREFIX_LAST = "└ " - PREFIX_CODES = ["\u2502", "\u251C", "\u2514"] - @classmethod def _get_childs( cls, @@ -559,20 +558,20 @@ class GeneralType(Cached, models.Model): cprefix -= 1 if not cprefix: if (idx + 1) == total: - p += cls.PREFIX_LAST + p += GENERAL_TYPE_PREFIX["prefix_last"] else: - p += cls.PREFIX_MEDIUM + p += GENERAL_TYPE_PREFIX["prefix_medium"] elif is_last: if mylast_of: clast = mylast_of.pop(0) if clast: - p += cls.PREFIX_EMPTY + p += GENERAL_TYPE_PREFIX["prefix_empty"] else: - p += cls.PREFIX + p += GENERAL_TYPE_PREFIX["prefix"] else: - p += cls.PREFIX_EMPTY + p += GENERAL_TYPE_PREFIX["prefix_empty"] else: - p += cls.PREFIX + p += GENERAL_TYPE_PREFIX["prefix"] lst.append((child[0], SafeText(p + child[1]))) clast_of = last_of[:] clast_of.append(idx + 1 == total) @@ -1149,17 +1148,6 @@ class FullSearch(models.Model): return changed -class SearchAltName(object): - def __init__( - self, search_key, search_query, extra_query=None, distinct_query=False, related_name=None - ): - self.search_key = search_key - self.search_query = search_query - self.extra_query = extra_query or {} - self.distinct_query = distinct_query - self.related_name = related_name - - class Imported(models.Model): imports = models.ManyToManyField( Import, blank=True, related_name="imported_%(app_label)s_%(class)s", @@ -1386,14 +1374,6 @@ class FixAssociated: setattr(item, subkey, new_value) -class HistoryError(Exception): - def __init__(self, value): - self.value = value - - def __str__(self): - return repr(self.value) - - class HistoricalRecords(BaseHistoricalRecords): def get_extra_fields(self, model, fields): def get_history_m2m(attr): @@ -1561,6 +1541,7 @@ class BaseHistorizedItem( EXTERNAL_ID_KEY = "" EXTERNAL_ID_DEPENDENCIES = [] HISTORICAL_M2M = [] + UPPER_PERMISSIONS = [] history_modifier = models.ForeignKey( User, @@ -1634,8 +1615,50 @@ class BaseHistorizedItem( return cls._meta.verbose_name @classmethod - def get_ids_from_upper_permissions(cls, user_id): - return [] + def get_ids_from_upper_permissions(cls, user_id, base_permissions): + if not cls.UPPER_PERMISSIONS: + return [] + ProfileType = apps.get_model("ishtar_common", "ProfileType") + item_ids = [] + for model, attr in cls.UPPER_PERMISSIONS: + if isinstance(model, tuple): + app_label, model_name = model + model = apps.get_model(app_label, model_name) + permissions = list(set([ + "_".join(permission.codename.split("_")[:-1]) + + f"_{model._meta.model_name}" + for permission in base_permissions + ])) + q = ProfileType.objects.filter( + user_profiles__person__ishtaruser=user_id, + groups__permissions__codename__in=permissions + ) + lst = [] + if not q.count(): + # no permissions associated for upstream model get direct attachement + lst = model.objects.filter( + ishtar_users__pk=user_id + ).values_list("pk", flat=True) + else: + perms = [] + for codename in permissions: + perms += [ + perm + for perm in Permission.objects.filter( + codename=codename).all() + ] + lst = [] + for permission in perms: + lst += list( + UserObjectPermission.objects.filter( + permission=permission, + user_id=user_id + ).values_list("object_pk", flat=True) + ) + item_ids += cls.objects.filter( + **{f"{attr}__in": lst} + ).values_list("pk", flat=True) + return list(set(item_ids)) def is_locked(self, user=None): if not user: diff --git a/ishtar_common/models_rest.py b/ishtar_common/models_rest.py index 05b1206d1..c47b04168 100644 --- a/ishtar_common/models_rest.py +++ b/ishtar_common/models_rest.py @@ -10,6 +10,12 @@ from django.contrib.postgres.fields import ArrayField from django.core.files import File from django.utils.text import slugify +from django.apps import apps +from django.template import loader + +from ishtar_common.models_common import SheetFilter +from ishtar_common.utils import ugettext_lazy as _ + UnoCalc = None ITALIC = None if settings.USE_LIBREOFFICE: @@ -19,30 +25,6 @@ if settings.USE_LIBREOFFICE: except ImportError: pass -from django.apps import apps -from django.template import loader - -from ishtar_common.models_common import SheetFilter -from ishtar_common.utils import ugettext_lazy as _ - - -APP_CONTENT_TYPES = [ - ("archaeological_operations", "operation"), - ("archaeological_context_records", "contextrecord"), - ("archaeological_finds", "find"), - ("archaeological_warehouse", "warehouse"), - ("archaeological_files", "file"), -] - -MAIN_CONTENT_TYPES = APP_CONTENT_TYPES + [ - ("archaeological_operations", "archaeologicalsite"), - ("archaeological_warehouse", "container"), -] - -MAIN_MODELS = dict( - [(model_name, app_name) for app_name, model_name in MAIN_CONTENT_TYPES] -) - class ApiUser(models.Model): user_ptr = models.OneToOneField( diff --git a/ishtar_common/rest.py b/ishtar_common/rest.py index 0a831634a..699440f15 100644 --- a/ishtar_common/rest.py +++ b/ishtar_common/rest.py @@ -1,10 +1,6 @@ -import datetime -import requests - from django.conf import settings from django.db.models import Q from django.http import HttpResponse -from django.shortcuts import reverse from django.utils.translation import activate, deactivate from rest_framework import authentication, permissions, generics @@ -13,7 +9,6 @@ from rest_framework.views import APIView from ishtar_common import models_rest from ishtar_common.models_common import GeneralType -from ishtar_common.models_imports import Importer from ishtar_common.views_item import get_item diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py index 709f020a4..11ff45fa7 100644 --- a/ishtar_common/utils.py +++ b/ishtar_common/utils.py @@ -333,6 +333,83 @@ class SheetItem: return +def get_current_item_keys(): + return ( + ("file", apps.get_model("archaeological_files", "File")), + ("operation", apps.get_model("archaeological_operations", "Operation")), + ("site", apps.get_model("archaeological_operations", "ArchaeologicalSite")), + ("contextrecord", + apps.get_model("archaeological_context_records", "ContextRecord")), + ("warehouse", apps.get_model("archaeological_warehouse", "Warehouse")), + ("container", apps.get_model("archaeological_warehouse", "Container")), + ("find", apps.get_model("archaeological_finds", "Find")), + ("findbasket", apps.get_model("archaeological_finds", "FindBasket")), + ("treatmentfile", apps.get_model("archaeological_finds", "TreatmentFile")), + ("treatment", apps.get_model("archaeological_finds", "Treatment")), + ("administrativeact", + apps.get_model("archaeological_operations", "AdministrativeAct")), + ("administrativeactop", + apps.get_model("archaeological_operations", "AdministrativeAct")), + ("administrativeactfile", + apps.get_model("archaeological_operations", "AdministrativeAct")), + ("administrativeacttreatment", + apps.get_model("archaeological_operations", "AdministrativeAct")), + ("administrativeacttreatmentfile", + apps.get_model("archaeological_operations", "AdministrativeAct")), + ) + + +def get_current_item_keys_dict(): + return dict(get_current_item_keys()) + + +API_APP_CONTENT_TYPES = [ + ("archaeological_operations", "operation"), + ("archaeological_context_records", "contextrecord"), + ("archaeological_finds", "find"), + ("archaeological_warehouse", "warehouse"), + ("archaeological_files", "file"), +] + +API_MAIN_CONTENT_TYPES = API_APP_CONTENT_TYPES + [ + ("archaeological_operations", "archaeologicalsite"), + ("archaeological_warehouse", "container"), +] + +API_MAIN_MODELS = dict( + [(model_name, app_name) for app_name, model_name in API_MAIN_CONTENT_TYPES] +) + + +class HistoryError(Exception): + def __init__(self, value): + self.value = value + + def __str__(self): + return repr(self.value) + + +class SearchAltName(object): + def __init__( + self, search_key, search_query, extra_query=None, distinct_query=False, + related_name=None + ): + self.search_key = search_key + self.search_query = search_query + self.extra_query = extra_query or {} + self.distinct_query = distinct_query + self.related_name = related_name + + +GENERAL_TYPE_PREFIX = { + "prefix": "│ ", + "prefix_empty": " ", + "prefix_medium": "├ ", + "prefix_last": "└ ", + "prefix_codes": ["\u2502", "\u251C", "\u2514"] +} + + class OwnPerms: """ Manage special permissions for object's owner diff --git a/ishtar_common/views.py b/ishtar_common/views.py index 6c209a848..f01e848a0 100644 --- a/ishtar_common/views.py +++ b/ishtar_common/views.py @@ -79,6 +79,8 @@ from ishtar_common.utils_migrations import HOMEPAGE_DEFAULT, HOMEPAGE_TITLE from ishtar_common.utils import ( clean_session_cache, CSV_OPTIONS, + get_current_item_keys, + get_current_item_keys_dict, get_field_labels_from_path, get_person_gdpr_log, get_random_item_image_link, @@ -92,13 +94,7 @@ from ishtar_common.utils import ( from ishtar_common.widgets import JQueryAutoComplete from ishtar_common import tasks -convert_document = None -if settings.USE_LIBREOFFICE: - from ishtar_common.libreoffice import convert_document - from .views_item import ( - CURRENT_ITEM_KEYS, - CURRENT_ITEM_KEYS_DICT, check_permission, display_item, get_item, @@ -108,6 +104,10 @@ from .views_item import ( get_short_html_detail, ) +convert_document = None +if settings.USE_LIBREOFFICE: + from ishtar_common.libreoffice import convert_document + logger = logging.getLogger(__name__) @@ -700,7 +700,8 @@ def shortcut_menu(request): def get_current_items(request): currents = {} - for key, model in CURRENT_ITEM_KEYS: + current_item_keys = get_current_item_keys() + for key, model in current_item_keys: currents[key] = None if key in request.session and request.session[key]: try: @@ -711,7 +712,8 @@ def get_current_items(request): def unpin(request, item_type, cascade=False): - if item_type not in CURRENT_ITEM_KEYS_DICT.keys(): + current_item_keys_dict = get_current_item_keys_dict() + if item_type not in current_item_keys_dict.keys(): logger.warning("unpin unknow type: {}".format(item_type)) return HttpResponse("nok") if "administrativeact" in item_type: diff --git a/ishtar_common/views_item.py b/ishtar_common/views_item.py index 23752c8a9..b5c63cc65 100644 --- a/ishtar_common/views_item.py +++ b/ishtar_common/views_item.py @@ -13,6 +13,7 @@ import requests import subprocess # nosec from tempfile import NamedTemporaryFile +from django.apps import apps from django.conf import settings from django.contrib.auth.models import Permission from django.contrib.contenttypes.models import ContentType @@ -46,6 +47,7 @@ from django.utils.translation import ( deactivate, pgettext_lazy, ) +from guardian.models import UserObjectPermission from tidylib import tidy_document as tidy from unidecode import unidecode from weasyprint import HTML, CSS @@ -54,50 +56,24 @@ from weasyprint.fonts import FontConfiguration from bootstrap_datepicker.widgets import DateField from ishtar_common.utils import ( + API_MAIN_MODELS, check_model_access_control, CSV_OPTIONS, + GENERAL_TYPE_PREFIX, get_all_field_names, - Round, + get_current_item_keys_dict, + get_current_profile, + HistoryError, PRIVATE_FIELDS, + SearchAltName, + Round, ) -from ishtar_common.models import get_current_profile, GeneralType, SearchAltName -from ishtar_common.models_common import HistoryError from .menus import Menu -from . import models, models_rest -from archaeological_files.models import File -from archaeological_operations.models import ( - Operation, - ArchaeologicalSite, - AdministrativeAct, -) -from archaeological_context_records.models import ContextRecord -from archaeological_finds.models import Find, FindBasket, Treatment, TreatmentFile -from archaeological_warehouse.models import Warehouse, Container - logger = logging.getLogger(__name__) ENCODING = settings.ENCODING or "utf-8" -CURRENT_ITEM_KEYS = ( - ("file", File), - ("operation", Operation), - ("site", ArchaeologicalSite), - ("contextrecord", ContextRecord), - ("warehouse", Warehouse), - ("container", Container), - ("find", Find), - ("findbasket", FindBasket), - ("treatmentfile", TreatmentFile), - ("treatment", Treatment), - ("administrativeact", AdministrativeAct), - ("administrativeactop", AdministrativeAct), - ("administrativeactfile", AdministrativeAct), - ("administrativeacttreatment", AdministrativeAct), - ("administrativeacttreatmentfile", AdministrativeAct), -) -CURRENT_ITEM_KEYS_DICT = dict(CURRENT_ITEM_KEYS) - HIERARCHIC_LEVELS = 5 LIST_FIELDS = { # key: hierarchic depth @@ -354,11 +330,11 @@ def show_source_item(request, source_id, model, name, base_dct, extra_dct): source_id, external_id = int(source_id), int(external_id) except ValueError: raise Http404() - models_rest.ApiExternalSource.objects.get() + ApiExternalSource = apps.get_model("ishtar_common", "ApiExternalSource") # TODO: check permissions try: - src = models_rest.ApiExternalSource.objects.get(pk=source_id) - except models_rest.ApiExternalSource.DoesNotExist: + src = ApiExternalSource.objects.get(pk=source_id) + except ApiExternalSource.DoesNotExist: return HttpResponse("{}", content_type="text/plain") url = src.url if not url.endswith("/"): @@ -618,6 +594,8 @@ def _get_values(request, val): else: vals = [val] new_vals = [] + Organization = apps.get_model("ishtar_common", "Organization") + Person = apps.get_model("ishtar_common", "Person") for v in vals: if callable(v): try: @@ -626,7 +604,7 @@ def _get_values(request, val): continue try: if ( - not isinstance(v, (models.Person, models.Organization)) + not isinstance(v, (Person, Organization)) and hasattr(v, "url") and v.url ): @@ -1144,7 +1122,7 @@ def _manage_dated_fields(dated_fields, dct): def _clean_type_val(val): - for prefix in GeneralType.PREFIX_CODES: + for prefix in GENERAL_TYPE_PREFIX["prefix_codes"]: val = val.replace(prefix, "") val = val.strip() if val.startswith('"') and val.endswith('"'): @@ -1210,6 +1188,7 @@ def _manage_hierarchic_fields(model, dct, and_reqs): hierarchic_fields = HIERARCHIC_FIELDS[:] if hasattr(model, "hierarchic_fields"): hierarchic_fields += model.hierarchic_fields() + Town = apps.get_model("ishtar_common", "Town") for reqs in dct.copy(): if type(reqs) not in (list, tuple): reqs = [reqs] @@ -1257,6 +1236,7 @@ def _manage_hierarchic_fields(model, dct, and_reqs): and_reqs.append(main_req) continue + Container = apps.get_model("archaeological_warehouse", "Container") for val in vals: attr = "cached_label__iexact" if val.endswith("*"): @@ -1277,7 +1257,7 @@ def _manage_hierarchic_fields(model, dct, and_reqs): vals = [v.replace('"', "") for v in val.split(";")] town_ids = [] for val in vals: - q = models.Town.objects.filter(cached_label__iexact=val).values_list( + q = Town.objects.filter(cached_label__iexact=val).values_list( "id", flat=True) if not q.count(): continue @@ -1287,7 +1267,7 @@ def _manage_hierarchic_fields(model, dct, and_reqs): for rel_query in ("parents__", "children__"): for idx in range(HIERARCHIC_LEVELS): k = rel_query * (idx + 1) + "pk" - q = models.Town.objects.filter( + q = Town.objects.filter( **{k: town_id}).values_list("id", flat=True) if not q.count(): break @@ -1595,6 +1575,7 @@ def _manage_default_search( dct, request, model, default_name, my_base_request, my_relative_session_names ): pinned_search = "" + current_item_keys_dict = get_current_item_keys_dict() pin_key = "pin-search-" + default_name base_request = my_base_request if isinstance(my_base_request, dict) else {} dct = {k: v for k, v in dct.items() if v} @@ -1606,6 +1587,7 @@ def _manage_default_search( ): # an item is pinned value = request.session[default_name] if "basket-" in value: + FindBasket = apps.get_model("archaeological_finds", "FindBasket") try: dct = {"basket__pk": request.session[default_name].split("-")[-1]} pinned_search = str(FindBasket.objects.get(pk=dct["basket__pk"])) @@ -1630,9 +1612,9 @@ def _manage_default_search( name in request.session and request.session[name] and "basket-" not in request.session[name] - and name in CURRENT_ITEM_KEYS_DICT + and name in current_item_keys_dict ): - up_model = CURRENT_ITEM_KEYS_DICT[name] + up_model = current_item_keys_dict[name] try: dct.update({key: request.session[name]}) up_item = up_model.objects.get(pk=dct[key]) @@ -2031,6 +2013,7 @@ def get_item( no_link=False, no_limit=False, return_query=False, + ishtaruser=None, # could be provided when request is None **dct, ): available_perms = [] @@ -2044,7 +2027,8 @@ def get_item( if "json" in data_type: EMPTY = "[]" - if data_type not in ("json", "csv", "json-image", "json-map", "json-stats"): + if not return_query and data_type not in ( + "json", "csv", "json-image", "json-map", "json-stats"): return HttpResponse(EMPTY, content_type="text/plain") if data_type == "json-stats" and len(model.STATISTIC_MODALITIES) < 2: @@ -2070,6 +2054,7 @@ def get_item( own = True if ( full == "shortcut" + and request and "SHORTCUT_SEARCH" in request.session and request.session["SHORTCUT_SEARCH"] == "own" ): @@ -2077,13 +2062,23 @@ def get_item( query_own = None if own: - q = models.IshtarUser.objects.filter(user_ptr=request.user) - if not q.count(): - return HttpResponse(EMPTY, content_type="text/plain") + # TODO: verify alt_query_own + """ if alt_query_own: query_own = getattr(model, alt_query_own)(q.all()[0]) else: query_own = model.get_query_owns(q.all()[0]) + print(query_own) # TODO - get old request to transform them + """ + user_pk = request.user.pk if request else ishtaruser.pk + q = UserObjectPermission.objects.filter( + user_id=user_pk, + permission__codename=f"view_own_{model._meta.model_name}", + content_type=ContentType.objects.get_for_model(model) + ) + query_own = Q( + pk__in=[int(pk) for pk in q.values_list("object_pk", flat=True)] + ) query_parameters = {} @@ -2191,14 +2186,10 @@ def get_item( request_keys.update(my_extra_request_keys) # manage search on json fields and excluded fields - if ( - search_form - and request - and request.user - and getattr(request.user, "ishtaruser", None) - ): + if search_form: + ishtaruser = request.user.ishtaruser if request else ishtaruser available, __, excluded_fields, json_fields = search_form.check_custom_form( - request.user.ishtaruser + ishtaruser ) # for now no manage on excluded_fields: should we prevent search on # some fields regarding the user concerned? @@ -2218,10 +2209,12 @@ def get_item( if "query" in dct: request_items = dct["query"] request_items["submited"] = True - elif request.method == "POST": + elif request and request.method == "POST": request_items = request.POST - else: + elif request: request_items = request.GET + else: + return HttpResponse(EMPTY, content_type="text/plain") count = dct.get("count", False) @@ -2271,7 +2264,7 @@ def get_item( key = "name__icontains" else: key = "cached_label__icontains" - dct[key] = request.GET.get("term", None) + dct[key] = (request and request.GET.get("term", None)) or None try: old = "old" in request_items and int(request_items["old"]) @@ -2332,11 +2325,12 @@ def get_item( # manage default and pinned search and not bookmark if ( not has_a_search + and request and not request_items.get("search_vector", "") and not request_items.get("submited", "") and full != "shortcut" ): - if data_type == "csv" and func_name in request.session: + if data_type == "csv" and func_name and func_name in request.session: dct = request.session[func_name] else: # default search @@ -2451,6 +2445,10 @@ def get_item( # manage hierarchic in shortcut menu if full == "shortcut": + File = apps.get_model("archaeological_files", "File") + Operation = apps.get_model("archaeological_operations", "Operation") + ContextRecord = apps.get_model("archaeological_context_records", "ContextRecord") + Find = apps.get_model("archaeological_finds", "Find") ASSOCIATED_ITEMS = { Operation: (File, "associated_file__pk"), ContextRecord: (Operation, "operation__pk"), @@ -2463,6 +2461,7 @@ def get_item( if current: dct = {upper_key: current} query &= Q(**dct) + # print("ishtar_common/views_item.py - 2455") # print(query, distinct_queries, base_query, exc_query, extras) items = model.objects.filter(query) for d_q in distinct_queries: @@ -2540,6 +2539,7 @@ def get_item( for col in cols: query_table_cols += col.split("|") + Document = apps.get_model("ishtar_common", "Document") # contextual (full, simple, etc.) col contxt = full and "full" or "simple" if ( @@ -2559,7 +2559,7 @@ def get_item( table_cols.append("cached_label") if data_type == "json-image": prefix = "" - if model != models.Document: + if model != Document: prefix = "main_image__" query_table_cols.append(prefix + "thumbnail") table_cols.append(prefix + "thumbnail") @@ -2584,7 +2584,7 @@ def get_item( for k in request_items: if not k.startswith("order["): continue - num = int(k.split("]")[0][len("order[") :]) + num = int(k.split("]")[0][len("order["):]) if num not in sorts: sorts[num] = ["", ""] # sign, col_num if k.endswith("[dir]"): @@ -2907,9 +2907,10 @@ def adapt_distant_search(params, src, model): search_vector = params["search_vector"][0] match = RE_FACET.search(search_vector) final_search_vector = "" + ApiKeyMatch = apps.get_model("ishtar_common", "ApiKeyMatch") while match: key, value, __ = match.groups() - q = models_rest.ApiKeyMatch.objects.filter( + q = ApiKeyMatch.objects.filter( source=src, search_model__model__iexact=model, search_keys__contains=[key], @@ -2929,10 +2930,11 @@ def adapt_distant_search(params, src, model): def get_distant_item(request, model, external_source_id, data_type=None): - # TODO: check permissions + # TODO: verify/test check permissions + ApiExternalSource = apps.get_model("ishtar_common", "ApiExternalSource") try: - src = models_rest.ApiExternalSource.objects.get(pk=external_source_id) - except (models_rest.ApiExternalSource.DoesNotExist, ValueError): + src = ApiExternalSource.objects.get(pk=external_source_id) + except (ApiExternalSource.DoesNotExist, ValueError): return HttpResponse("{}", content_type="text/plain") url = src.url url += reverse(f"api-search-{model}") @@ -2954,7 +2956,7 @@ def get_distant_item(request, model, external_source_id, data_type=None): "submited", "data_type", ] - app = models_rest.MAIN_MODELS[model] + app = API_MAIN_MODELS[model] model_class = ContentType.objects.get(app_label=app, model=model).model_class() bool_fields = model_class.REVERSED_BOOL_FIELDS + model_class.BOOL_FIELDS + model_class.CALLABLE_BOOL_FIELDS is_empty_params = not any( @@ -3010,9 +3012,10 @@ def external_export(request, source_id, model_name, slug): if not url: return HttpResponse('Unauthorized', status=401) + ApiExternalSource = apps.get_model("ishtar_common", "ApiExternalSource") try: - src = models_rest.ApiExternalSource.objects.get(pk=source_id) - except (models_rest.ApiExternalSource.DoesNotExist, ValueError): + src = ApiExternalSource.objects.get(pk=source_id) + except (ApiExternalSource.DoesNotExist, ValueError): return HttpResponse('Unauthorized', status=401) url = src.url + url |