summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--archaeological_context_records/models.py10
-rw-r--r--archaeological_context_records/tests.py272
-rw-r--r--archaeological_files/models.py10
-rw-r--r--archaeological_finds/models_finds.py12
-rw-r--r--archaeological_finds/models_treatments.py5
-rw-r--r--archaeological_finds/tests.py185
-rw-r--r--archaeological_operations/models.py10
-rw-r--r--archaeological_operations/tests.py288
-rw-r--r--archaeological_warehouse/models.py6
-rw-r--r--ishtar_common/admin.py5
-rw-r--r--ishtar_common/menu_base.py2
-rw-r--r--ishtar_common/menus.py50
-rw-r--r--ishtar_common/models.py95
-rw-r--r--ishtar_common/models_common.py107
-rw-r--r--ishtar_common/models_rest.py30
-rw-r--r--ishtar_common/rest.py5
-rw-r--r--ishtar_common/utils.py77
-rw-r--r--ishtar_common/views.py18
-rw-r--r--ishtar_common/views_item.py135
19 files changed, 1107 insertions, 215 deletions
diff --git a/archaeological_context_records/models.py b/archaeological_context_records/models.py
index a88a7d161..1bb09717d 100644
--- a/archaeological_context_records/models.py
+++ b/archaeological_context_records/models.py
@@ -25,7 +25,7 @@ from django.conf import settings
from django.contrib.gis.db import models
from django.contrib.postgres.indexes import GinIndex
from django.contrib.sites.models import Site
-from django.db import connection, transaction, OperationalError, IntegrityError
+from django.db import transaction, OperationalError, IntegrityError
from django.db.models import Q
from django.db.models.signals import post_delete, post_save, m2m_changed
from django.urls import reverse
@@ -37,7 +37,7 @@ from ishtar_common.utils import (
cached_label_changed,
m2m_historization_changed,
post_save_geo,
- task,
+ SearchAltName,
)
from ishtar_common.models import (
@@ -58,7 +58,6 @@ from ishtar_common.models import (
get_current_profile,
document_attached_changed,
HistoryModel,
- SearchAltName,
GeoItem,
CompleteIdentifierItem,
SearchVectorConfig,
@@ -826,6 +825,7 @@ class ContextRecord(
"town_label_with_areas",
]
UPPER_GEO = ["operation", "archaeological_site"]
+ UPPER_PERMISSIONS = [(Operation, "operation_id")]
history = HistoricalRecords(bases=[HistoryModel])
objects = UUIDModelManager()
@@ -1214,6 +1214,10 @@ class ContextRecord(
return actions
@classmethod
+ def get_limit_to_area_query(cls, town_ids):
+ return Q(operation__towns__pk__in=town_ids)
+
+ @classmethod
def get_query_owns(cls, ishtaruser):
return (
cls._construct_query_own(
diff --git a/archaeological_context_records/tests.py b/archaeological_context_records/tests.py
index 8a16facf5..f550d23ce 100644
--- a/archaeological_context_records/tests.py
+++ b/archaeological_context_records/tests.py
@@ -44,7 +44,8 @@ from ishtar_common.models import (
)
from ishtar_common import forms_common
-from archaeological_operations.tests import OperationInitTest, ImportTest
+from archaeological_operations.tests import OperationInitTest, ImportTest, \
+ TestPermissionRequest
from archaeological_operations import models as models_ope
from archaeological_operations.views import RELATION_FORMSET_EXTRA_FORM
from archaeological_context_records import models
@@ -942,10 +943,11 @@ class ContextRecordSearchTest(ContextRecordInit, TestCase, SearchText):
self._test_search(c, result, context="Text period search")
-class ContextRecordPermissionTest(ContextRecordInit, TestCase):
+class ContextRecordOldPermissionTest(ContextRecordInit, TestCase):
fixtures = CONTEXT_RECORD_TOWNS_FIXTURES
def setUp(self):
+ print("Theses tests should fail on v5")
profile_type = ProfileType.objects.create(
label="xxCollaborateur",
txt_idx="xxcollaborator",
@@ -1056,6 +1058,272 @@ class ContextRecordPermissionTest(ContextRecordInit, TestCase):
self.assertRedirects(response, "/")
+class ContextRecordPermissionTest(ContextRecordInit, TestPermissionRequest,
+ TestCase):
+ fixtures = CONTEXT_RECORD_TOWNS_FIXTURES
+
+ def setUp(self):
+ IshtarSiteProfile.objects.create()
+
+ self.setup_permission_requests(
+ "ope",
+ "operation",
+ permissions=["view_own_operation", "change_own_operation"],
+ create_profiles=False
+ )
+ self.setup_permission_requests(
+ "cr",
+ "contextrecord",
+ permissions=["view_own_contextrecord", "change_own_contextrecord"],
+ perm_requests=['id="new-*"', 'excavator="{USER}"']
+ )
+
+ self.users = {}
+ username, password, user = create_superuser()
+ self.users["superuser"] = (username, password, user)
+
+ upstream_username, upstream_password, upstream_user = create_user()
+ UserProfile.objects.create(
+ profile_type=self.profile_types["cr_upstream"],
+ person=upstream_user.ishtaruser.person,
+ current=True,
+ )
+ self.users["upstream"] = (upstream_username, upstream_password, upstream_user)
+
+ # nosec: hard coded password for test purposes
+ associated_username, associated_password, associated_user = create_user( # nosec
+ username="vador", password="darth"
+ )
+ profile = UserProfile.objects.create(
+ profile_type=self.profile_types["cr_associated_items"],
+ person=associated_user.ishtaruser.person,
+ current=True,
+ )
+ self.users["associated"] = (
+ associated_username, associated_password, associated_user
+ )
+
+ # nosec: hard coded password for test purposes
+ areas_username, areas_password, areas_user = create_user( # nosec
+ username="luke", password="iamyourfather"
+ )
+ profile = UserProfile.objects.create(
+ profile_type=self.profile_types["cr_areas"],
+ person=areas_user.ishtaruser.person,
+ current=True,
+ )
+ self.users["areas"] = (
+ areas_username, areas_password, areas_user
+ )
+
+ town = Town.objects.create(name="Tatouine", numero_insee="66000")
+ area = Area.objects.create(label="Galaxie", txt_idx="galaxie")
+ area.towns.add(town)
+ profile.areas.add(area)
+
+ # nosec: hard coded password for test purposes
+ simple_up_username, simple_up_password, simple_up_user = create_user( # nosec
+ username="r2d2", password="bipbip"
+ )
+ UserProfile.objects.create(
+ profile_type=self.profile_types["cr_upstream"],
+ person=simple_up_user.ishtaruser.person,
+ current=True,
+ )
+ self.users["simple_upstream"] = (
+ simple_up_username, simple_up_password, simple_up_user
+ )
+
+ # nosec: hard coded password for test purposes
+ request_username, request_password, request_user = create_user( # nosec
+ username="c6po", password="bopbop"
+ )
+ UserProfile.objects.create(
+ profile_type=self.profile_types["cr_request_1"],
+ person=request_user.ishtaruser.person,
+ current=True,
+ )
+ self.users["request"] = (
+ request_username, request_password, request_user
+ )
+
+ # nosec: hard coded password for test purposes
+ request2_username, request2_password, request2_user = create_user( # nosec
+ username="cowboy", password="bebop"
+ )
+ UserProfile.objects.create(
+ profile_type=self.profile_types["cr_request_2"],
+ person=request2_user.ishtaruser.person,
+ current=True,
+ )
+ self.users["request2"] = (
+ request2_username, request2_password, request2_user
+ )
+
+ # nosec: hard coded password for test purposes
+ req_area_username, req_area_password, req_area_user = create_user( # nosec
+ username="chewee", password="bwawa"
+ )
+ profile = UserProfile.objects.create(
+ profile_type=self.profile_types["cr_request_areas_1"],
+ person=req_area_user.ishtaruser.person,
+ current=True,
+ )
+ self.users["request_areas"] = (
+ req_area_username, req_area_password, req_area_user
+ )
+ profile.areas.add(area)
+
+ self.orgas = self.create_orgas(user)
+ self.operations = self.create_operation(user, self.orgas[0],
+ values={"code_patriarche": "OPE01"})
+ self.operations += self.create_operation(upstream_user, self.orgas[0],
+ values={"code_patriarche": "OPE02"})
+ self.operations[0].towns.add(town)
+
+ self.operations[1].ishtar_users.add(simple_up_user.ishtaruser)
+
+ self.create_context_record(
+ user=user, data={"label": "new-CR1", "operation": self.operations[0]}
+ )
+ self.create_context_record(
+ user=user, data={"label": "new-CR2", "operation": self.operations[1]}
+ )
+ self.create_context_record(
+ user=associated_user,
+ data={"label": "old-CR3", "operation": self.operations[0]}
+ )
+ self.cr_1 = self.context_records[0]
+ self.cr_2 = self.context_records[1]
+ self.cr_2.ishtar_users.add(associated_user.ishtaruser)
+
+ associated_user.ishtaruser.generate_permission()
+ areas_user.ishtaruser.generate_permission()
+ simple_up_user.ishtaruser.generate_permission()
+ request_user.ishtaruser.generate_permission()
+ req_area_user.ishtaruser.generate_permission()
+ request2_user.ishtaruser.generate_permission()
+
+ # upstream with associated request for operation
+ gp = Group.objects.get(name="ope_xxx")
+ self.profile_types["cr_upstream"].groups.add(gp)
+ self.profile_types["cr_upstream"].permission_requests.add(
+ self.permission_requests["ope_associated_items"]
+ )
+ upstream_user.ishtaruser.generate_permission()
+
+ def test_own_search(self):
+ # no result when no authentification
+ c = Client()
+ response = c.get(reverse("get-contextrecord"))
+ self.assertTrue(not json.loads(response.content.decode()))
+
+ url = reverse("get-contextrecord")
+ # possession + direct "history_creator_id"
+ # two context record available via operation
+ self._test_search(
+ url,
+ 'possession + direct "history_creator_id"',
+ self.users["associated"],
+ 2
+ )
+
+ # upstream + direct "history_creator_id"
+ # one context record available via operation - can view operation
+ self._test_search(
+ url,
+ 'upstream + direct "history_creator_id"',
+ self.users["upstream"],
+ 1
+ )
+
+ # upstream: no upstream permission only via direct association
+ # one context record available via operation
+ self._test_search(
+ url,
+ 'upstream: no upstream permission only via direct association',
+ self.users["simple_upstream"],
+ 1
+ )
+
+ # area filter
+ # only one "own" operation available
+ self._test_search(
+ url,
+ 'areas filter',
+ self.users["areas"],
+ 2
+ )
+
+ # request
+ # 2 context record available via request
+ self._test_search(
+ url,
+ 'request',
+ self.users["request"],
+ 2
+ )
+
+ # request limited by area
+ # 2 context record available via request but only one match the area
+ self._test_search(
+ url,
+ 'request limited by area',
+ self.users["request_areas"],
+ 1
+ )
+ # request with {USER}
+ # one context record available via request
+ self._test_search(
+ url,
+ 'request2',
+ self.users["request2"],
+ 0
+ )
+ __, __, request2_user = self.users["request2"]
+ cr2 = self.context_records[2]
+ cr2.excavator = request2_user.ishtaruser.person
+ cr2.save()
+ request2_user.ishtaruser.generate_permission()
+ self._test_search(
+ url,
+ 'request2',
+ self.users["request2"],
+ 1
+ )
+
+ def test_own_modify(self):
+ # no result when no authentification
+ c = Client()
+ response = c.get(reverse("record_modify", args=[self.cr_2.pk]))
+ self.assertRedirects(response, "/")
+
+ modif_url = "/record_modification/operation-record_modification"
+
+ # upstream
+ c = Client()
+ upstream_username, upstream_password, upstream_user = self.users["upstream"]
+ c.login(username=upstream_username, password=upstream_password)
+ response = c.get(reverse("record_modify", args=[self.cr_2.pk]), follow=True)
+ self.assertRedirects(response, modif_url)
+ response = c.get(modif_url)
+
+ self.assertEqual(response.status_code, 200)
+ response = c.get(reverse("record_modify", args=[self.cr_1.pk]), follow=True)
+ self.assertRedirects(response, "/")
+
+ # area filter
+ c = Client()
+ areas_username, areas_password, areas_user = self.users["areas"]
+ c.login(username=areas_username, password=areas_password)
+ response = c.get(reverse("record_modify", args=[self.cr_1.pk]), follow=True)
+ self.assertRedirects(response, modif_url)
+ response = c.get(modif_url)
+ self.assertEqual(response.status_code, 200)
+ response = c.get(reverse("record_modify", args=[self.cr_2.pk]), follow=True)
+ self.assertRedirects(response, "/")
+
+
class RecordRelationsTest(ContextRecordInit, TestCase):
fixtures = OPERATION_TOWNS_FIXTURES
model = models.ContextRecord
diff --git a/archaeological_files/models.py b/archaeological_files/models.py
index 778b1c251..b3815c95d 100644
--- a/archaeological_files/models.py
+++ b/archaeological_files/models.py
@@ -27,7 +27,7 @@ from django.contrib.gis.db import models
from django.contrib.postgres.indexes import GinIndex
from django.core.cache import cache
from django.core.validators import MinValueValidator, MaxValueValidator
-from django.db.models import Max
+from django.db.models import Q, Max
from django.db.models.signals import post_save, m2m_changed, post_delete
from django.urls import reverse
@@ -41,6 +41,7 @@ from ishtar_common.utils import (
get_current_year,
get_generated_id,
m2m_historization_changed,
+ SearchAltName,
)
from ishtar_common.models import (
@@ -53,7 +54,6 @@ from ishtar_common.models import (
Person,
Organization,
Town,
- Dashboard,
DashboardFormItem,
HistoricalRecords,
ValueGetter,
@@ -62,11 +62,9 @@ from ishtar_common.models import (
post_save_cache,
Document,
HistoryModel,
- SearchAltName,
SearchVectorConfig,
DocumentItem,
CompleteIdentifierItem,
- HierarchicalType,
)
from archaeological_operations.models import (
@@ -1059,6 +1057,10 @@ class File(
return cls
@classmethod
+ def get_limit_to_area_query(cls, town_ids):
+ return Q(towns__pk__in=town_ids) | Q(main_town_id__in=town_ids)
+
+ @classmethod
def get_owns(
cls, user, menu_filtr=None, limit=None, values=None, get_short_menu_class=False,
no_auth_check=False, query=False
diff --git a/archaeological_finds/models_finds.py b/archaeological_finds/models_finds.py
index 9ba25cc83..ece7d08b8 100644
--- a/archaeological_finds/models_finds.py
+++ b/archaeological_finds/models_finds.py
@@ -37,6 +37,7 @@ from ishtar_common.utils import (
m2m_historization_changed,
pgettext_lazy,
post_save_geo,
+ SearchAltName,
ugettext_lazy as _
)
@@ -67,7 +68,6 @@ from ishtar_common.models import (
Person,
post_save_cache,
QuickAction,
- SearchAltName,
SearchVectorConfig,
ValueGetter,
)
@@ -2006,6 +2006,12 @@ class Find(
"excavation_ids",
"weight_string",
]
+ UPPER_PERMISSIONS = [
+ (Operation, "base_finds__context_record__operation_id"),
+ (ContextRecord, "base_finds__context_record_id"),
+ (("archaeological_warehouse", "Warehouse"), "container__location_id"),
+ (("archaeological_warehouse", "Warehouse"), "container_ref__responsibility_id"),
+ ]
SHEET_ALTERNATIVES = [("museum", "museum_find")]
objects = UUIDModelManager()
@@ -2991,6 +2997,10 @@ class Find(
return new
@classmethod
+ def get_limit_to_area_query(cls, town_ids):
+ return Q(base_finds__context_record__operation__towns__pk__in=town_ids)
+
+ @classmethod
def _get_query_owns(cls, ishtaruser, prefix=""):
q = (
cls._construct_query_own(
diff --git a/archaeological_finds/models_treatments.py b/archaeological_finds/models_treatments.py
index 5ba50728b..45dc26c16 100644
--- a/archaeological_finds/models_treatments.py
+++ b/archaeological_finds/models_treatments.py
@@ -49,7 +49,6 @@ from ishtar_common.models import (
document_attached_changed,
MainItem,
HistoryModel,
- SearchAltName,
SearchVectorConfig,
DocumentItem,
)
@@ -57,8 +56,9 @@ from ishtar_common.models_common import CompleteIdentifierItem, HistoricalRecord
from ishtar_common.utils import (
cached_label_changed,
get_current_year,
- update_data,
m2m_historization_changed,
+ SearchAltName,
+ update_data,
)
@@ -91,6 +91,7 @@ class TreatmentState(GeneralType):
'available': True})
return treat_state
+
post_save.connect(post_save_cache, sender=TreatmentState)
post_delete.connect(post_save_cache, sender=TreatmentState)
diff --git a/archaeological_finds/tests.py b/archaeological_finds/tests.py
index 5df18cf64..e0532effc 100644
--- a/archaeological_finds/tests.py
+++ b/archaeological_finds/tests.py
@@ -95,7 +95,7 @@ from ishtar_common.tests import (
SearchText,
)
from archaeological_operations.tests import ImportTest, create_operation, \
- create_administrativact
+ create_administrativact, TestPermissionRequest
from archaeological_context_records.tests import ContextRecordInit
from archaeological_operations.serializers import operation_serialization
@@ -1918,11 +1918,12 @@ class FindAutocompleteTest(FindInit, TestCase):
self.assertEqual(res[2]["id"], find4.pk) # 12 - contains
-class FindPermissionTest(FindInit, TestCase):
+class FindOldPermissionTest(FindInit, TestCase):
fixtures = FIND_FIXTURES
model = models.Find
def setUp(self):
+ print("Theses tests should fail on v5")
profile_type = ProfileType.objects.create(
label="xxCollaborateur",
txt_idx="xxcollaborator",
@@ -2021,6 +2022,186 @@ class FindPermissionTest(FindInit, TestCase):
self.assertEqual(json.loads(content)["recordsTotal"], 1)
+class FindPermissionTest(FindInit, TestPermissionRequest, TestCase):
+ fixtures = FIND_FIXTURES
+ model = models.Find
+
+ def setUp(self):
+ self.setup_permission_requests(
+ "find",
+ "find",
+ permissions=["view_own_find", "change_own_find"],
+ perm_requests=['id="new-*"', 'excavator="{USER}"']
+ )
+
+ self.users = {}
+ username, password, user = create_superuser()
+ self.users["superuser"] = (username, password, user)
+
+ upstream_username, upstream_password, upstream_user = create_user(
+ username="up", password="up"
+ )
+ UserProfile.objects.create(
+ profile_type=self.profile_types["find_upstream"],
+ person=upstream_user.ishtaruser.person,
+ current=True,
+ )
+ self.users["upstream"] = (upstream_username, upstream_password, upstream_user)
+
+ # nosec: hard coded password for test purposes
+ areas_username, areas_password, areas_user = create_user( # nosec
+ username="luke", password="iamyourfather"
+ )
+ profile = UserProfile.objects.create(
+ profile_type=self.profile_types["find_areas"],
+ person=areas_user.ishtaruser.person,
+ current=True,
+ )
+ self.users["areas"] = (
+ areas_username, areas_password, areas_user
+ )
+
+ town = Town.objects.create(name="Tatouine", numero_insee="66000")
+ area = Area.objects.create(label="Galaxie", txt_idx="galaxie")
+ area.towns.add(town)
+ profile.areas.add(area)
+
+ self.orgas = self.create_orgas(user)
+ self.create_operation(user, self.orgas[0])
+ self.create_operation(areas_user, self.orgas[0])
+
+ self.create_context_record(
+ user=user, data={"label": "CR 1", "operation": self.operations[0]}
+ )
+ self.create_context_record(
+ user=areas_user, data={"label": "CR 2", "operation": self.operations[1]}
+ )
+ self.cr_1 = self.context_records[-2]
+ self.cr_2 = self.context_records[-1]
+
+ self.create_finds(
+ data_base={"context_record": self.cr_1}, user=user, force=True
+ )
+ self.create_finds(
+ data_base={"context_record": self.cr_2}, user=areas_user, force=True
+ )
+
+ self.find_1 = self.finds[-2]
+ self.find_2 = self.finds[-1]
+ self.operations[-1].towns.add(town)
+
+ self.operations[-1].context_record.all()[0].ishtar_users.add(
+ upstream_user.ishtaruser
+ )
+
+ associated_username, associated_password, associated_user = create_user(
+ username="as", password="as"
+ )
+ UserProfile.objects.create(
+ profile_type=self.profile_types["find_associated_items"],
+ person=associated_user.ishtaruser.person,
+ current=True,
+ )
+ self.users["associated"] = (
+ associated_username, associated_password, associated_user
+ )
+
+ # read permission
+ self.basket = models.FindBasket.objects.create(
+ label="My basket",
+ user=IshtarUser.objects.get(pk=user.pk),
+ )
+ self.basket.items.add(self.find_1)
+ self.basket.shared_with.add(associated_user.ishtaruser)
+
+ upstream_user.ishtaruser.generate_permission()
+ areas_user.ishtaruser.generate_permission()
+ associated_user.ishtaruser.generate_permission()
+
+ def test_own_search(self):
+ # no result when no authentification
+ c = Client()
+ response = c.get(reverse("get-find"))
+ self.assertTrue(not response.content or not json.loads(response.content))
+
+ url = reverse("get-find")
+
+ # possession of associated operation
+ # only one "own" context record available
+ self._test_search(
+ url,
+ 'possession',
+ self.users["upstream"],
+ 1
+ )
+
+ # area filter
+ # only one "own" operation available
+ self._test_search(
+ url,
+ 'areas filter',
+ self.users["areas"],
+ 1
+ )
+
+ # filter associated by basket
+ self._test_search(
+ url,
+ 'associated basket filter',
+ self.users["associated"],
+ 1
+ )
+
+ def test_own_modify(self):
+ # no result when no authentification
+ c = Client()
+ response = c.get(reverse("find_modify", args=[self.cr_2.pk]))
+ self.assertRedirects(response, "/")
+
+ modif_url = "/find_modification/find-find_modification"
+
+ # upstream
+ c = Client()
+ upstream_username, upstream_password, upstream_user = self.users["upstream"]
+ c.login(username=upstream_username, password=upstream_password)
+ response = c.get(reverse("find_modify", args=[self.find_2.pk]), follow=True)
+ self.assertRedirects(response, modif_url)
+ response = c.get(modif_url)
+
+ self.assertEqual(response.status_code, 200)
+ response = c.get(reverse("find_modify", args=[self.find_1.pk]), follow=True)
+ self.assertRedirects(response, "/")
+
+ # area filter
+ c = Client()
+ areas_username, areas_password, areas_user = self.users["areas"]
+ c.login(username=areas_username, password=areas_password)
+ response = c.get(reverse("find_modify", args=[self.find_2.pk]), follow=True)
+ self.assertRedirects(response, modif_url)
+ response = c.get(modif_url)
+ self.assertEqual(response.status_code, 200)
+ response = c.get(reverse("find_modify", args=[self.find_1.pk]), follow=True)
+ self.assertRedirects(response, "/")
+
+ # basket filter
+ c = Client()
+ basket_username, basket_password, basket_user = self.users["associated"]
+ c.login(username=basket_username, password=basket_password)
+ response = c.get(reverse("find_modify", args=[self.find_1.pk]), follow=True)
+ self.assertRedirects(response, "/")
+
+ self.basket.shared_write_with.add(basket_user.ishtaruser)
+ basket_user.ishtaruser.generate_permission()
+
+ response = c.get(reverse("find_modify", args=[self.find_1.pk]), follow=True)
+ self.assertRedirects(response, modif_url)
+ response = c.get(modif_url)
+ self.assertEqual(response.status_code, 200)
+
+ response = c.get(reverse("find_modify", args=[self.find_2.pk]), follow=True)
+ self.assertRedirects(response, "/")
+
+
class FindQATest(FindInit, TestCase):
fixtures = WAREHOUSE_FIXTURES
model = models.Find
diff --git a/archaeological_operations/models.py b/archaeological_operations/models.py
index 2fafa56ed..9e43f264b 100644
--- a/archaeological_operations/models.py
+++ b/archaeological_operations/models.py
@@ -66,7 +66,6 @@ from ishtar_common.models import (
get_current_profile,
document_attached_changed,
HistoryModel,
- SearchAltName,
GeoItem,
CompleteIdentifierItem,
SearchVectorConfig,
@@ -85,6 +84,7 @@ from ishtar_common.utils import (
mode,
m2m_historization_changed,
post_save_geo,
+ SearchAltName,
SheetItem,
)
@@ -851,6 +851,10 @@ class ArchaeologicalSite(
return actions
@classmethod
+ def get_limit_to_area_query(cls, town_ids):
+ return Q(towns__pk__in=town_ids)
+
+ @classmethod
def _get_query_owns_dicts(cls, ishtaruser, no_rel=False):
profile = ishtaruser.current_profile
town_ids = []
@@ -2309,6 +2313,10 @@ class Operation(
return round(float(self.cost) / self.surface, 2)
@classmethod
+ def get_limit_to_area_query(cls, town_ids):
+ return Q(towns__pk__in=town_ids)
+
+ @classmethod
def _get_query_owns_dicts(cls, ishtaruser, no_rel=False):
profile = ishtaruser.current_profile
town_ids = []
diff --git a/archaeological_operations/tests.py b/archaeological_operations/tests.py
index fa2471bbd..10bc52967 100644
--- a/archaeological_operations/tests.py
+++ b/archaeological_operations/tests.py
@@ -69,10 +69,8 @@ from ishtar_common.models import (
Town,
ImporterColumn,
ImportColumnValue,
+ PermissionRequest,
Person,
- Author,
- SourceType,
- AuthorType,
DocumentTemplate,
PersonType,
TargetKeyGroup,
@@ -2014,14 +2012,14 @@ class OperationInitTest(object):
self.parcels.pop(0)
return self.create_operation()[-1]
- def create_operation(self, user=None, orga=None):
+ def create_operation(self, user=None, orga=None, values=None):
if not orga:
self.get_default_orga(user)
if not user:
self.get_default_user()
if not getattr(self, "operations", None):
self.operations = []
- self.operations.append(create_operation(user, orga))
+ self.operations.append(create_operation(user, orga, values=values))
return self.operations
def get_default_operation(self, force=False, user=None):
@@ -3472,10 +3470,94 @@ class OperationSearchTest(TestCase, OperationInitTest, SearchText):
self.assertEqual(values["data"], expected_result)
-class OperationPermissionTest(TestCase, OperationInitTest):
+class TestPermissionRequest:
+ def setup_permission_requests(self, prefix, model_name, permissions,
+ perm_requests=None, create_profiles=True):
+ content_type = ContentType.objects.get(model=model_name)
+ if not hasattr(self, "permission_requests"):
+ self.permission_requests = {}
+ self.permission_requests.update({
+ f"{prefix}_associated_items": PermissionRequest.objects.create(
+ model=content_type,
+ request="",
+ include_associated_items=True,
+ include_upstream_items=False,
+ slug=f'{prefix}-associated_items'
+ ),
+ f"{prefix}_upstream": PermissionRequest.objects.create(
+ model=content_type,
+ request="",
+ include_associated_items=False,
+ include_upstream_items=True,
+ slug=f"{prefix}-upstream"
+ ),
+ f"{prefix}_areas": PermissionRequest.objects.create(
+ model=content_type,
+ request="",
+ include_associated_items=False,
+ include_upstream_items=False,
+ limit_to_attached_areas=True,
+ slug=f"{prefix}-areas"
+ ),
+ })
+ if perm_requests:
+ for idx, request in enumerate(perm_requests):
+ pr = PermissionRequest.objects.create(
+ model=content_type,
+ request=request,
+ include_associated_items=False,
+ include_upstream_items=False,
+ slug=f"{prefix}-req-{idx+1}"
+ )
+ self.permission_requests[f"{prefix}_request_{idx + 1}"] = pr
+ pr = PermissionRequest.objects.create(
+ model=content_type,
+ request=request,
+ include_associated_items=False,
+ include_upstream_items=False,
+ limit_to_attached_areas=True,
+ slug=f"{prefix}-req-areas-{idx+1}"
+ )
+ self.permission_requests[f"{prefix}_request_areas_{idx + 1}"] = pr
+ gp = Group.objects.create(name=f"{prefix}_xxx")
+ for permission in permissions:
+ for p in Permission.objects.filter(codename=permission).all():
+ gp.permissions.add(p)
+ if not create_profiles:
+ return
+ if not hasattr(self, "profile_types"):
+ self.profile_types = {}
+ for key in self.permission_requests.keys():
+ profile_type = ProfileType.objects.create(
+ label=key,
+ txt_idx=key,
+ )
+ profile_type.groups.add(gp)
+ profile_type.permission_requests.add(self.permission_requests[key])
+ self.profile_types[key] = profile_type
+
+ def _test_search(self, url, label, user, expected):
+ c = Client()
+ username, password, __ = user
+ c.login(username=username, password=password)
+ response = c.get(url)
+ content = response.content.decode()
+ if expected:
+ self.assertTrue(
+ json.loads(content),
+ f"search own test for {label}"
+ )
+ self.assertEqual(
+ json.loads(content)["recordsTotal"], expected,
+ f"search own test for {label} - {expected} expected"
+ )
+
+
+class OperationOldPermissionTest(TestCase, OperationInitTest):
fixtures = FILE_FIXTURES
def setUp(self):
+ print("Theses tests should fail on v5")
IshtarSiteProfile.objects.get_or_create(slug="default", active=True)
self.username, self.password, self.user = create_superuser()
@@ -3638,6 +3720,200 @@ class OperationPermissionTest(TestCase, OperationInitTest):
self.assertRedirects(response, "/")
+class OperationPermissionTest(TestCase, TestPermissionRequest, OperationInitTest):
+ fixtures = FILE_FIXTURES
+
+ def setUp(self):
+ IshtarSiteProfile.objects.get_or_create(slug="default", active=True)
+
+ self.setup_permission_requests(
+ "ope",
+ "operation",
+ permissions=["view_own_operation", "change_own_operation"],
+ )
+
+ self.users = {}
+ username, password, user = create_superuser()
+ self.users["superuser"] = (username, password, user)
+
+ # nosec: hard coded password for test purposes
+ associated_username, associated_password, associated_user = create_user( # nosec
+ username="vador", password="darth"
+ )
+ profile = UserProfile.objects.create(
+ profile_type=self.profile_types["ope_associated_items"],
+ person=associated_user.ishtaruser.person,
+ current=True,
+ )
+ self.users["associated"] = (
+ associated_username, associated_password, associated_user
+ )
+
+ # nosec: hard coded password for test purposes
+ areas_username, areas_password, areas_user = create_user( # nosec
+ username="luke", password="iamyourfather"
+ )
+ profile = UserProfile.objects.create(
+ profile_type=self.profile_types["ope_areas"],
+ person=areas_user.ishtaruser.person,
+ current=True,
+ )
+ self.users["areas"] = (
+ areas_username, areas_password, areas_user
+ )
+
+ town = Town.objects.create(name="Tatouine", numero_insee="66000")
+ area = Area.objects.create(label="Galaxie", txt_idx="galaxie")
+ area.towns.add(town)
+ profile.areas.add(area)
+
+ self.orgas = self.create_orgas(user)
+ self.create_operation(user, self.orgas[0])
+ self.operations = self.create_operation(user, self.orgas[0])
+ self.operations[1].towns.add(town)
+ self.item = self.operations[0]
+ associated_user.ishtaruser.generate_permission()
+
+ def test_permission_generation(self):
+ __, __, associated_user = self.users["associated"]
+ self.assertFalse(
+ associated_user.has_perm(
+ "archaeological_operations.change_own_operation",
+ self.operations[1]
+ )
+ )
+ self.assertFalse(
+ associated_user.has_perm(
+ "archaeological_operations.view_own_operation",
+ self.operations[1]
+ )
+ )
+ self.operations[1].ishtar_users.add(associated_user.ishtaruser)
+ associated_user.ishtaruser.generate_permission()
+ self.assertTrue(
+ associated_user.has_perm(
+ "archaeological_operations.view_own_operation",
+ self.operations[1]
+ )
+ )
+ self.assertTrue(
+ associated_user.has_perm(
+ "archaeological_operations.change_own_operation",
+ self.operations[1]
+ )
+ )
+ # general permission is assigned
+ self.assertTrue(
+ associated_user.has_perm(
+ "archaeological_operations.change_own_operation",
+ )
+ )
+ self.assertFalse(
+ associated_user.has_perm(
+ "archaeological_operations.change_own_operation",
+ self.operations[0]
+ )
+ )
+
+ def test_own_search(self):
+ # no result when no authentification
+ c = Client()
+ response = c.get(reverse("get-operation"), {"year": "2010"})
+ self.assertFalse(json.loads(response.content.decode()))
+
+ associated_username, associated_password, associated_user = self.users["associated"]
+ # possession
+ c = Client()
+ c.login(username=associated_username, password=associated_password)
+ response = c.get(reverse("get-operation"), {"year": "2010"})
+ content = json.loads(response.content.decode())
+ self.assertTrue(content)
+ self.assertEqual(content["recordsTotal"], 0)
+
+ self.operations[1].ishtar_users.add(associated_user.ishtaruser)
+ associated_user.ishtaruser.generate_permission()
+
+ response = c.get(reverse("get-operation"), {"year": "2010"})
+ # only one "own" operation available
+ content = json.loads(response.content.decode())
+ self.assertTrue(content)
+ self.assertEqual(content["recordsTotal"], 1)
+
+ operator_key = pgettext_lazy("key for text search", "operator")
+ response = c.get(reverse("get-operation"), {operator_key: self.orgas[0].name})
+ self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1)
+ response = c.get(
+ reverse("get-operation"),
+ {"search_vector": '{}="{}"'.format(operator_key, self.orgas[0].name)},
+ )
+ self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1)
+
+ # area filter
+ areas_username, areas_password, areas_user = self.users["areas"]
+ areas_user.ishtaruser.generate_permission()
+ c = Client()
+ c.login(username=areas_username, password=areas_password)
+ response = c.get(reverse("get-operation"), {"year": "2010"})
+ # only one "own" operation available
+ self.assertTrue(json.loads(response.content.decode()))
+ self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1)
+ response = c.get(reverse("get-operation"), {operator_key: self.orgas[0].name})
+ self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1)
+ response = c.get(
+ reverse("get-operation"),
+ {"search_vector": '{}="{}"'.format(operator_key, self.orgas[0].name)},
+ )
+ self.assertEqual(json.loads(response.content.decode())["recordsTotal"], 1)
+
+ def test_own_modify(self):
+ operation_pk1 = self.operations[0].pk
+ operation_pk2 = self.operations[1].pk
+
+ modif_url = "/operation_modification/general-operation_modification"
+
+ # no result when no authentification
+ c = Client()
+ response = c.get(reverse("operation_modify", args=[operation_pk2]))
+ self.assertRedirects(response, "/")
+
+ # possession
+ associated_username, associated_password, associated_user = self.users["associated"]
+ c = Client()
+ c.login(username=associated_username, password=associated_password)
+ associated_user.ishtaruser.generate_permission()
+ response = c.get(reverse("operation_modify", args=[operation_pk2]), follow=True)
+ self.assertRedirects(response, "/")
+
+ self.operations[1].ishtar_users.add(associated_user.ishtaruser)
+ associated_user.ishtaruser.generate_permission()
+ response = c.get(reverse("operation_modify", args=[operation_pk2]), follow=True)
+ self.assertRedirects(response, modif_url)
+
+ response = c.get(modif_url)
+ self.assertEqual(response.status_code, 200)
+ response = c.get(reverse("operation_modify", args=[operation_pk1]), follow=True)
+ self.assertRedirects(response, "/")
+
+ profile_type = ProfileType.objects.get(txt_idx="collaborator")
+ profile_type.groups.add(
+ Group.objects.get(
+ name="Opérations rattachées : " "modification/suppression"
+ )
+ )
+
+ # area filter
+ areas_username, areas_password, areas_user = self.users["areas"]
+ areas_user.ishtaruser.generate_permission()
+ c = Client()
+ c.login(username=areas_username, password=areas_password)
+ response = c.get(reverse("operation_modify", args=[operation_pk2]), follow=True)
+ self.assertRedirects(response, modif_url)
+ response = c.get(modif_url)
+ self.assertEqual(response.status_code, 200)
+ response = c.get(reverse("operation_modify", args=[operation_pk1]), follow=True)
+ self.assertRedirects(response, "/")
+
+
class LabelTest(TestCase, OperationInitTest):
fixtures = FILE_FIXTURES
diff --git a/archaeological_warehouse/models.py b/archaeological_warehouse/models.py
index 9aa5549db..5a854c079 100644
--- a/archaeological_warehouse/models.py
+++ b/archaeological_warehouse/models.py
@@ -45,7 +45,6 @@ from ishtar_common.models_common import (
post_save_cache,
DashboardFormItem,
document_attached_changed,
- SearchAltName,
GeoItem,
CompleteIdentifierItem,
SearchVectorConfig,
@@ -60,6 +59,7 @@ from ishtar_common.utils import (
cached_label_changed,
cached_label_and_geo_changed,
get_generated_id,
+ SearchAltName,
)
logger = logging.getLogger(__name__)
@@ -1144,6 +1144,10 @@ class Container(
"localisation_list": "localisation_list",
}
SERIALIZE_PROPERTIES = MainItem.SERIALIZE_PROPERTIES + ["short_label"]
+ UPPER_PERMISSIONS = [
+ (Warehouse, "location_id"),
+ (Warehouse, "responsibility_id"),
+ ]
objects = UUIDModelManager()
diff --git a/ishtar_common/admin.py b/ishtar_common/admin.py
index 743d643a3..369821b45 100644
--- a/ishtar_common/admin.py
+++ b/ishtar_common/admin.py
@@ -72,7 +72,8 @@ from django import forms
from ishtar_common import models, models_common, models_rest
from ishtar_common.apps import admin_site
from ishtar_common.model_merging import merge_model_objects
-from ishtar_common.utils import get_cache, create_slug, get_person_gdpr_log
+from ishtar_common.utils import API_MAIN_CONTENT_TYPES, get_cache, create_slug,\
+ get_person_gdpr_log
from ishtar_common import forms as common_forms, forms_common as other_common_forms
from ishtar_common.serializers import restore_serialized, IMPORT_MODEL_LIST
@@ -2561,7 +2562,7 @@ admin_site.register(models_rest.ApiUser, ApiUserAdmin)
def get_api_choices():
pks = []
- for app_label, model_name in models_rest.MAIN_CONTENT_TYPES:
+ for app_label, model_name in API_MAIN_CONTENT_TYPES:
try:
ct = ContentType.objects.get(app_label=app_label, model=model_name)
pks.append(ct.pk)
diff --git a/ishtar_common/menu_base.py b/ishtar_common/menu_base.py
index e0f206a57..feb05c6db 100644
--- a/ishtar_common/menu_base.py
+++ b/ishtar_common/menu_base.py
@@ -17,7 +17,7 @@
# See the file COPYING for details.
-from ishtar_common.models import get_current_profile
+from ishtar_common.utils import get_current_profile
class SectionItem:
diff --git a/ishtar_common/menus.py b/ishtar_common/menus.py
index aae127a09..c455fa73a 100644
--- a/ishtar_common/menus.py
+++ b/ishtar_common/menus.py
@@ -30,33 +30,34 @@ from django.urls import reverse
from django.contrib.auth.models import User
-_extra_menus = []
-# collect menu from INSTALLED_APPS
-for app in settings.INSTALLED_APPS:
- mod = __import__(app, fromlist=["ishtar_menu"])
- if hasattr(mod, "ishtar_menu"):
- menu = getattr(mod, "ishtar_menu")
- _extra_menus += menu.MENU_SECTIONS
-
-# sort
-__section_items = [mnu for order, mnu in sorted(_extra_menus, key=lambda x: x[0])]
-# regroup menus
-_section_items, __keys = [], []
-for section_item in __section_items:
- if section_item.idx not in __keys:
- __keys.append(section_item.idx)
- _section_items.append(section_item)
- continue
- section_childs = _section_items[__keys.index(section_item.idx)].childs
- childs_idx = [child.idx for child in section_childs]
- for child in section_item.childs:
- if child.idx not in childs_idx:
- section_childs.append(child)
+def get_section_items():
+ _extra_menus = []
+ # collect menu from INSTALLED_APPS
+ for app in settings.INSTALLED_APPS:
+ mod = __import__(app, fromlist=["ishtar_menu"])
+ if hasattr(mod, "ishtar_menu"):
+ menu = getattr(mod, "ishtar_menu")
+ _extra_menus += menu.MENU_SECTIONS
+
+ # sort
+ __section_items = [mnu for order, mnu in sorted(_extra_menus, key=lambda x: x[0])]
+ # regroup menus
+ _section_items, __keys = [], []
+ for section_item in __section_items:
+ if section_item.idx not in __keys:
+ __keys.append(section_item.idx)
+ _section_items.append(section_item)
+ continue
+ section_childs = _section_items[__keys.index(section_item.idx)].childs
+ childs_idx = [child.idx for child in section_childs]
+ for child in section_item.childs:
+ if child.idx not in childs_idx:
+ section_childs.append(child)
+ return _section_items
-class Menu:
- ref_childs = _section_items
+class Menu:
def __init__(self, user, current_action=None, session=None):
self.user = user
self.initialized = False
@@ -74,6 +75,7 @@ class Menu:
self.selected_idx = None
self.session = session
self.items_by_idx = {}
+ self.ref_childs = get_section_items()
def reinit_menu_for_all_user(self):
"""
diff --git a/ishtar_common/models.py b/ishtar_common/models.py
index 8e5b7f703..3e51f8cb1 100644
--- a/ishtar_common/models.py
+++ b/ishtar_common/models.py
@@ -97,6 +97,7 @@ from ishtar_common.utils import (
InlineClass
)
from ishtar_common.utils_secretary import IshtarSecretaryRenderer
+from ishtar_common.views_item import get_item
from ishtar_common.alternative_configs import (
ALTERNATE_CONFIGS,
@@ -140,7 +141,8 @@ from ishtar_common.utils import (
cached_label_changed,
generate_relation_graph,
max_size_help,
- JSON_SERIALIZATION
+ JSON_SERIALIZATION,
+ SearchAltName,
)
from ishtar_common.models_common import (
@@ -175,7 +177,6 @@ from ishtar_common.models_common import (
PermissionRequest,
post_save_cache,
QuickAction,
- SearchAltName,
SearchVectorConfig,
SpatialReferenceSystem,
TemplateItem,
@@ -198,6 +199,7 @@ __all__ = [
"ImporterColumn",
"ImporterDuplicateField",
"Imported",
+ "PermissionRequest",
"Regexp",
"ImportTarget",
"ItemKey",
@@ -3466,7 +3468,8 @@ class ProfileTypeSummary(ProfileType):
class UserProfile(models.Model):
name = models.CharField(_("Name"), blank=True, default="", max_length=100)
profile_type = models.ForeignKey(
- ProfileType, verbose_name=_("Profile type"), on_delete=models.PROTECT
+ ProfileType, verbose_name=_("Profile type"), on_delete=models.PROTECT,
+ related_name="user_profiles"
)
areas = models.ManyToManyField(
"Area", verbose_name=_("Areas"), blank=True, related_name="profiles"
@@ -3521,7 +3524,9 @@ class UserProfile(models.Model):
def duplicate(self, **kwargs):
areas = [area for area in self.areas.all()]
- external_sources = [external_source for external_source in self.external_sources.all()]
+ external_sources = [
+ external_source for external_source in self.external_sources.all()
+ ]
new_item = self
new_item.pk = None
name = self.name
@@ -3541,50 +3546,95 @@ class UserProfile(models.Model):
new_item.external_sources.add(src)
return new_item
- def _generate_permission(self, ishtar_user, content_type, permission_request):
+ def _generate_permission(self, ishtar_user, content_type, permission_request,
+ permissions, permission_type):
item_ids = []
model_class = content_type.model_class()
- # TODO: gérer les paniers
if permission_request.include_associated_items:
- item_ids += model_class.filter(
+ item_ids += model_class.objects.filter(
ishtar_users__pk=ishtar_user.pk
).values_list("pk", flat=True)
+ item_ids += model_class.objects.filter(
+ history_creator_id=ishtar_user.pk
+ ).values_list("pk", flat=True)
+ if content_type.model == "find" and \
+ permission_type in ("view", "change"):
+ Find = apps.get_model("archaeological_finds", "Find")
+ k = "basket__shared_write_with" if permission_type == "change" \
+ else "basket__shared_with"
+ item_ids += list(
+ Find.objects.filter(**{k: ishtar_user}).values_list("pk", flat=True)
+ )
+ print("ishtar_common/models.py - 3561", item_ids, ishtar_user, content_type, permission_type)
if permission_request.include_upstream_items:
- # TODO....
- item_ids += model_class.get_ids_from_upper_permissions(ishtar_user.user_ptr.pk)
+ item_ids += model_class.get_ids_from_upper_permissions(
+ ishtar_user.user_ptr.pk, permissions
+ )
+ print("ishtar_common/models.py - 3566", item_ids, ishtar_user, content_type, permission_type)
if permission_request.request or permission_request.limit_to_attached_areas:
- # TODO
- pass
- query = model_class.objects
+ _get_item = get_item(
+ content_type.model_class(),
+ "", "", no_permission_check=True,
+ )
+ result = []
+ query = permission_request.request
+ if query:
+ if "{USER}" in query:
+ query = query.replace("{USER}", f"id:{ishtar_user.person_id}")
+ query = {"search_vector": query}
+ q = _get_item(None, return_query=True, ishtaruser=ishtar_user,
+ query=query)
+ result = list(q.values_list("pk", flat=True))
+ if permission_request.limit_to_attached_areas:
+ profile = ishtar_user.current_profile
+ if not profile: # no areas attached
+ return []
+ town_ids = list(profile.query_towns.values_list("pk", flat=True))
+ result_limit = []
+ get_limit_to_area_query = getattr(
+ model_class, "get_limit_to_area_query", None
+ )
+ q = get_limit_to_area_query(town_ids) if get_limit_to_area_query else None
+ if q:
+ result_limit = list(
+ model_class.objects.filter(q).values_list("pk", flat=True)
+ )
+ if result:
+ result = [pk for pk in result if pk in result_limit]
+ else:
+ result = result_limit
+ item_ids += result
+ print("ishtar_common/models.py - 3600", item_ids, ishtar_user, content_type, permission_type)
return item_ids
- def generate_permission(self, content_type):
+ def generate_permission(self, content_type, permission_type):
ishtar_user = self.person.ishtaruser
# add base permissions
for group in self.profile_type.groups.all():
- for perm in group.permissions.all():
+ for perm in group.permissions.filter(
+ codename__startswith=permission_type).all():
ishtar_user.user_ptr.user_permissions.add(perm)
q_has_perm = self.profile_type.groups.filter(
permissions__content_type=content_type,
- permissions__codename__contains="_own_"
+ permissions__codename__startswith=f"{permission_type}_own_",
)
if not q_has_perm.count(): # no permission to generate
return
permissions = []
for group in q_has_perm.all():
- permissions += list(group.permissions.values_list("pk", flat=True))
+ permissions += list(group.permissions.filter(
+ codename__contains=permission_type
+ ).all())
q_req = self.profile_type.permission_requests.filter(
model=content_type, active=True
)
item_ids = []
if not q_req.count():
# TODO v5: delete old behaviour
- """
print(f"WARNING: no permission request for content {content_type.name} and profile {self}")
print("Using old behaviour")
- """
model_class = content_type.model_class()
query = model_class.get_owns(user=ishtar_user, query=True, no_auth_check=True)
if query:
@@ -3594,13 +3644,15 @@ class UserProfile(models.Model):
else:
for perm_request in q_req.all():
item_ids += self._generate_permission(
- ishtar_user, content_type, perm_request
+ ishtar_user, content_type, perm_request, permissions,
+ permission_type
)
user_id = ishtar_user.user_ptr.pk
object_permissions = []
item_ids = list(set(item_ids))
permissions = list(set(permissions))
- for permission_id in permissions:
+ for permission in permissions:
+ permission_id = permission.pk
exclude = list(UserObjectPermission.objects.filter(
content_type_id=content_type.pk, permission_id=permission_id,
user_id=user_id
@@ -3900,7 +3952,8 @@ class IshtarUser(FullSearch):
for ct in content_types:
for profile in self.person.profiles.all():
- profile.generate_permission(ct)
+ for permission_type in ("view", "change", "delete"):
+ profile.generate_permission(ct, permission_type)
def full_label(self):
return self.person.full_label()
diff --git a/ishtar_common/models_common.py b/ishtar_common/models_common.py
index 28f5aba00..920b71584 100644
--- a/ishtar_common/models_common.py
+++ b/ishtar_common/models_common.py
@@ -17,11 +17,12 @@ import re
import shutil
import tempfile
import time
+from unidecode import unidecode
from django import forms
from django.apps import apps
from django.conf import settings
-from django.contrib.auth.models import User
+from django.contrib.auth.models import Permission, User
from django.contrib.contenttypes.models import ContentType
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.gis.db import models
@@ -49,6 +50,8 @@ from ishtar_common.utils import (
get_image_path,
get_columns_from_class,
human_date,
+ HistoryError,
+ SearchAltName,
SheetItem
)
from simple_history.models import HistoricalRecords as BaseHistoricalRecords
@@ -56,7 +59,8 @@ from simple_history.signals import (
post_create_historical_record,
pre_create_historical_record,
)
-from unidecode import unidecode
+
+from guardian.models import UserObjectPermission
from ishtar_common.data_importer import post_importer_action, ImporterError
from ishtar_common.model_managers import TypeManager
@@ -64,19 +68,20 @@ from ishtar_common.model_merging import merge_model_objects
from ishtar_common.models_imports import Import
from ishtar_common.templatetags.link_to_window import simple_link_to_window
from ishtar_common.utils import (
- get_cache,
+ cached_label_changed,
disable_for_loaddata,
+ duplicate_item,
+ external_id_changed,
+ GENERAL_TYPE_PREFIX,
get_all_field_names,
+ get_cache,
+ get_current_profile,
+ get_generated_id,
merge_tsvectors,
- cached_label_changed,
- external_id_changed,
+ OwnPerms,
post_save_geo,
post_save_geodata,
task,
- duplicate_item,
- get_generated_id,
- get_current_profile,
- OwnPerms
)
@@ -513,12 +518,6 @@ class GeneralType(Cached, models.Model):
res[parent_id].append((item["id"], item["label"]))
return res
- PREFIX = "│ "
- PREFIX_EMPTY = "  "
- PREFIX_MEDIUM = "├ "
- PREFIX_LAST = "└ "
- PREFIX_CODES = ["\u2502", "\u251C", "\u2514"]
-
@classmethod
def _get_childs(
cls,
@@ -559,20 +558,20 @@ class GeneralType(Cached, models.Model):
cprefix -= 1
if not cprefix:
if (idx + 1) == total:
- p += cls.PREFIX_LAST
+ p += GENERAL_TYPE_PREFIX["prefix_last"]
else:
- p += cls.PREFIX_MEDIUM
+ p += GENERAL_TYPE_PREFIX["prefix_medium"]
elif is_last:
if mylast_of:
clast = mylast_of.pop(0)
if clast:
- p += cls.PREFIX_EMPTY
+ p += GENERAL_TYPE_PREFIX["prefix_empty"]
else:
- p += cls.PREFIX
+ p += GENERAL_TYPE_PREFIX["prefix"]
else:
- p += cls.PREFIX_EMPTY
+ p += GENERAL_TYPE_PREFIX["prefix_empty"]
else:
- p += cls.PREFIX
+ p += GENERAL_TYPE_PREFIX["prefix"]
lst.append((child[0], SafeText(p + child[1])))
clast_of = last_of[:]
clast_of.append(idx + 1 == total)
@@ -1149,17 +1148,6 @@ class FullSearch(models.Model):
return changed
-class SearchAltName(object):
- def __init__(
- self, search_key, search_query, extra_query=None, distinct_query=False, related_name=None
- ):
- self.search_key = search_key
- self.search_query = search_query
- self.extra_query = extra_query or {}
- self.distinct_query = distinct_query
- self.related_name = related_name
-
-
class Imported(models.Model):
imports = models.ManyToManyField(
Import, blank=True, related_name="imported_%(app_label)s_%(class)s",
@@ -1386,14 +1374,6 @@ class FixAssociated:
setattr(item, subkey, new_value)
-class HistoryError(Exception):
- def __init__(self, value):
- self.value = value
-
- def __str__(self):
- return repr(self.value)
-
-
class HistoricalRecords(BaseHistoricalRecords):
def get_extra_fields(self, model, fields):
def get_history_m2m(attr):
@@ -1561,6 +1541,7 @@ class BaseHistorizedItem(
EXTERNAL_ID_KEY = ""
EXTERNAL_ID_DEPENDENCIES = []
HISTORICAL_M2M = []
+ UPPER_PERMISSIONS = []
history_modifier = models.ForeignKey(
User,
@@ -1634,8 +1615,50 @@ class BaseHistorizedItem(
return cls._meta.verbose_name
@classmethod
- def get_ids_from_upper_permissions(cls, user_id):
- return []
+ def get_ids_from_upper_permissions(cls, user_id, base_permissions):
+ if not cls.UPPER_PERMISSIONS:
+ return []
+ ProfileType = apps.get_model("ishtar_common", "ProfileType")
+ item_ids = []
+ for model, attr in cls.UPPER_PERMISSIONS:
+ if isinstance(model, tuple):
+ app_label, model_name = model
+ model = apps.get_model(app_label, model_name)
+ permissions = list(set([
+ "_".join(permission.codename.split("_")[:-1])
+ + f"_{model._meta.model_name}"
+ for permission in base_permissions
+ ]))
+ q = ProfileType.objects.filter(
+ user_profiles__person__ishtaruser=user_id,
+ groups__permissions__codename__in=permissions
+ )
+ lst = []
+ if not q.count():
+ # no permissions associated for upstream model get direct attachement
+ lst = model.objects.filter(
+ ishtar_users__pk=user_id
+ ).values_list("pk", flat=True)
+ else:
+ perms = []
+ for codename in permissions:
+ perms += [
+ perm
+ for perm in Permission.objects.filter(
+ codename=codename).all()
+ ]
+ lst = []
+ for permission in perms:
+ lst += list(
+ UserObjectPermission.objects.filter(
+ permission=permission,
+ user_id=user_id
+ ).values_list("object_pk", flat=True)
+ )
+ item_ids += cls.objects.filter(
+ **{f"{attr}__in": lst}
+ ).values_list("pk", flat=True)
+ return list(set(item_ids))
def is_locked(self, user=None):
if not user:
diff --git a/ishtar_common/models_rest.py b/ishtar_common/models_rest.py
index 05b1206d1..c47b04168 100644
--- a/ishtar_common/models_rest.py
+++ b/ishtar_common/models_rest.py
@@ -10,6 +10,12 @@ from django.contrib.postgres.fields import ArrayField
from django.core.files import File
from django.utils.text import slugify
+from django.apps import apps
+from django.template import loader
+
+from ishtar_common.models_common import SheetFilter
+from ishtar_common.utils import ugettext_lazy as _
+
UnoCalc = None
ITALIC = None
if settings.USE_LIBREOFFICE:
@@ -19,30 +25,6 @@ if settings.USE_LIBREOFFICE:
except ImportError:
pass
-from django.apps import apps
-from django.template import loader
-
-from ishtar_common.models_common import SheetFilter
-from ishtar_common.utils import ugettext_lazy as _
-
-
-APP_CONTENT_TYPES = [
- ("archaeological_operations", "operation"),
- ("archaeological_context_records", "contextrecord"),
- ("archaeological_finds", "find"),
- ("archaeological_warehouse", "warehouse"),
- ("archaeological_files", "file"),
-]
-
-MAIN_CONTENT_TYPES = APP_CONTENT_TYPES + [
- ("archaeological_operations", "archaeologicalsite"),
- ("archaeological_warehouse", "container"),
-]
-
-MAIN_MODELS = dict(
- [(model_name, app_name) for app_name, model_name in MAIN_CONTENT_TYPES]
-)
-
class ApiUser(models.Model):
user_ptr = models.OneToOneField(
diff --git a/ishtar_common/rest.py b/ishtar_common/rest.py
index 0a831634a..699440f15 100644
--- a/ishtar_common/rest.py
+++ b/ishtar_common/rest.py
@@ -1,10 +1,6 @@
-import datetime
-import requests
-
from django.conf import settings
from django.db.models import Q
from django.http import HttpResponse
-from django.shortcuts import reverse
from django.utils.translation import activate, deactivate
from rest_framework import authentication, permissions, generics
@@ -13,7 +9,6 @@ from rest_framework.views import APIView
from ishtar_common import models_rest
from ishtar_common.models_common import GeneralType
-from ishtar_common.models_imports import Importer
from ishtar_common.views_item import get_item
diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py
index 709f020a4..11ff45fa7 100644
--- a/ishtar_common/utils.py
+++ b/ishtar_common/utils.py
@@ -333,6 +333,83 @@ class SheetItem:
return
+def get_current_item_keys():
+ return (
+ ("file", apps.get_model("archaeological_files", "File")),
+ ("operation", apps.get_model("archaeological_operations", "Operation")),
+ ("site", apps.get_model("archaeological_operations", "ArchaeologicalSite")),
+ ("contextrecord",
+ apps.get_model("archaeological_context_records", "ContextRecord")),
+ ("warehouse", apps.get_model("archaeological_warehouse", "Warehouse")),
+ ("container", apps.get_model("archaeological_warehouse", "Container")),
+ ("find", apps.get_model("archaeological_finds", "Find")),
+ ("findbasket", apps.get_model("archaeological_finds", "FindBasket")),
+ ("treatmentfile", apps.get_model("archaeological_finds", "TreatmentFile")),
+ ("treatment", apps.get_model("archaeological_finds", "Treatment")),
+ ("administrativeact",
+ apps.get_model("archaeological_operations", "AdministrativeAct")),
+ ("administrativeactop",
+ apps.get_model("archaeological_operations", "AdministrativeAct")),
+ ("administrativeactfile",
+ apps.get_model("archaeological_operations", "AdministrativeAct")),
+ ("administrativeacttreatment",
+ apps.get_model("archaeological_operations", "AdministrativeAct")),
+ ("administrativeacttreatmentfile",
+ apps.get_model("archaeological_operations", "AdministrativeAct")),
+ )
+
+
+def get_current_item_keys_dict():
+ return dict(get_current_item_keys())
+
+
+API_APP_CONTENT_TYPES = [
+ ("archaeological_operations", "operation"),
+ ("archaeological_context_records", "contextrecord"),
+ ("archaeological_finds", "find"),
+ ("archaeological_warehouse", "warehouse"),
+ ("archaeological_files", "file"),
+]
+
+API_MAIN_CONTENT_TYPES = API_APP_CONTENT_TYPES + [
+ ("archaeological_operations", "archaeologicalsite"),
+ ("archaeological_warehouse", "container"),
+]
+
+API_MAIN_MODELS = dict(
+ [(model_name, app_name) for app_name, model_name in API_MAIN_CONTENT_TYPES]
+)
+
+
+class HistoryError(Exception):
+ def __init__(self, value):
+ self.value = value
+
+ def __str__(self):
+ return repr(self.value)
+
+
+class SearchAltName(object):
+ def __init__(
+ self, search_key, search_query, extra_query=None, distinct_query=False,
+ related_name=None
+ ):
+ self.search_key = search_key
+ self.search_query = search_query
+ self.extra_query = extra_query or {}
+ self.distinct_query = distinct_query
+ self.related_name = related_name
+
+
+GENERAL_TYPE_PREFIX = {
+ "prefix": "│ ",
+ "prefix_empty": "  ",
+ "prefix_medium": "├ ",
+ "prefix_last": "└ ",
+ "prefix_codes": ["\u2502", "\u251C", "\u2514"]
+}
+
+
class OwnPerms:
"""
Manage special permissions for object's owner
diff --git a/ishtar_common/views.py b/ishtar_common/views.py
index 6c209a848..f01e848a0 100644
--- a/ishtar_common/views.py
+++ b/ishtar_common/views.py
@@ -79,6 +79,8 @@ from ishtar_common.utils_migrations import HOMEPAGE_DEFAULT, HOMEPAGE_TITLE
from ishtar_common.utils import (
clean_session_cache,
CSV_OPTIONS,
+ get_current_item_keys,
+ get_current_item_keys_dict,
get_field_labels_from_path,
get_person_gdpr_log,
get_random_item_image_link,
@@ -92,13 +94,7 @@ from ishtar_common.utils import (
from ishtar_common.widgets import JQueryAutoComplete
from ishtar_common import tasks
-convert_document = None
-if settings.USE_LIBREOFFICE:
- from ishtar_common.libreoffice import convert_document
-
from .views_item import (
- CURRENT_ITEM_KEYS,
- CURRENT_ITEM_KEYS_DICT,
check_permission,
display_item,
get_item,
@@ -108,6 +104,10 @@ from .views_item import (
get_short_html_detail,
)
+convert_document = None
+if settings.USE_LIBREOFFICE:
+ from ishtar_common.libreoffice import convert_document
+
logger = logging.getLogger(__name__)
@@ -700,7 +700,8 @@ def shortcut_menu(request):
def get_current_items(request):
currents = {}
- for key, model in CURRENT_ITEM_KEYS:
+ current_item_keys = get_current_item_keys()
+ for key, model in current_item_keys:
currents[key] = None
if key in request.session and request.session[key]:
try:
@@ -711,7 +712,8 @@ def get_current_items(request):
def unpin(request, item_type, cascade=False):
- if item_type not in CURRENT_ITEM_KEYS_DICT.keys():
+ current_item_keys_dict = get_current_item_keys_dict()
+ if item_type not in current_item_keys_dict.keys():
logger.warning("unpin unknow type: {}".format(item_type))
return HttpResponse("nok")
if "administrativeact" in item_type:
diff --git a/ishtar_common/views_item.py b/ishtar_common/views_item.py
index 23752c8a9..b5c63cc65 100644
--- a/ishtar_common/views_item.py
+++ b/ishtar_common/views_item.py
@@ -13,6 +13,7 @@ import requests
import subprocess # nosec
from tempfile import NamedTemporaryFile
+from django.apps import apps
from django.conf import settings
from django.contrib.auth.models import Permission
from django.contrib.contenttypes.models import ContentType
@@ -46,6 +47,7 @@ from django.utils.translation import (
deactivate,
pgettext_lazy,
)
+from guardian.models import UserObjectPermission
from tidylib import tidy_document as tidy
from unidecode import unidecode
from weasyprint import HTML, CSS
@@ -54,50 +56,24 @@ from weasyprint.fonts import FontConfiguration
from bootstrap_datepicker.widgets import DateField
from ishtar_common.utils import (
+ API_MAIN_MODELS,
check_model_access_control,
CSV_OPTIONS,
+ GENERAL_TYPE_PREFIX,
get_all_field_names,
- Round,
+ get_current_item_keys_dict,
+ get_current_profile,
+ HistoryError,
PRIVATE_FIELDS,
+ SearchAltName,
+ Round,
)
-from ishtar_common.models import get_current_profile, GeneralType, SearchAltName
-from ishtar_common.models_common import HistoryError
from .menus import Menu
-from . import models, models_rest
-from archaeological_files.models import File
-from archaeological_operations.models import (
- Operation,
- ArchaeologicalSite,
- AdministrativeAct,
-)
-from archaeological_context_records.models import ContextRecord
-from archaeological_finds.models import Find, FindBasket, Treatment, TreatmentFile
-from archaeological_warehouse.models import Warehouse, Container
-
logger = logging.getLogger(__name__)
ENCODING = settings.ENCODING or "utf-8"
-CURRENT_ITEM_KEYS = (
- ("file", File),
- ("operation", Operation),
- ("site", ArchaeologicalSite),
- ("contextrecord", ContextRecord),
- ("warehouse", Warehouse),
- ("container", Container),
- ("find", Find),
- ("findbasket", FindBasket),
- ("treatmentfile", TreatmentFile),
- ("treatment", Treatment),
- ("administrativeact", AdministrativeAct),
- ("administrativeactop", AdministrativeAct),
- ("administrativeactfile", AdministrativeAct),
- ("administrativeacttreatment", AdministrativeAct),
- ("administrativeacttreatmentfile", AdministrativeAct),
-)
-CURRENT_ITEM_KEYS_DICT = dict(CURRENT_ITEM_KEYS)
-
HIERARCHIC_LEVELS = 5
LIST_FIELDS = { # key: hierarchic depth
@@ -354,11 +330,11 @@ def show_source_item(request, source_id, model, name, base_dct, extra_dct):
source_id, external_id = int(source_id), int(external_id)
except ValueError:
raise Http404()
- models_rest.ApiExternalSource.objects.get()
+ ApiExternalSource = apps.get_model("ishtar_common", "ApiExternalSource")
# TODO: check permissions
try:
- src = models_rest.ApiExternalSource.objects.get(pk=source_id)
- except models_rest.ApiExternalSource.DoesNotExist:
+ src = ApiExternalSource.objects.get(pk=source_id)
+ except ApiExternalSource.DoesNotExist:
return HttpResponse("{}", content_type="text/plain")
url = src.url
if not url.endswith("/"):
@@ -618,6 +594,8 @@ def _get_values(request, val):
else:
vals = [val]
new_vals = []
+ Organization = apps.get_model("ishtar_common", "Organization")
+ Person = apps.get_model("ishtar_common", "Person")
for v in vals:
if callable(v):
try:
@@ -626,7 +604,7 @@ def _get_values(request, val):
continue
try:
if (
- not isinstance(v, (models.Person, models.Organization))
+ not isinstance(v, (Person, Organization))
and hasattr(v, "url")
and v.url
):
@@ -1144,7 +1122,7 @@ def _manage_dated_fields(dated_fields, dct):
def _clean_type_val(val):
- for prefix in GeneralType.PREFIX_CODES:
+ for prefix in GENERAL_TYPE_PREFIX["prefix_codes"]:
val = val.replace(prefix, "")
val = val.strip()
if val.startswith('"') and val.endswith('"'):
@@ -1210,6 +1188,7 @@ def _manage_hierarchic_fields(model, dct, and_reqs):
hierarchic_fields = HIERARCHIC_FIELDS[:]
if hasattr(model, "hierarchic_fields"):
hierarchic_fields += model.hierarchic_fields()
+ Town = apps.get_model("ishtar_common", "Town")
for reqs in dct.copy():
if type(reqs) not in (list, tuple):
reqs = [reqs]
@@ -1257,6 +1236,7 @@ def _manage_hierarchic_fields(model, dct, and_reqs):
and_reqs.append(main_req)
continue
+ Container = apps.get_model("archaeological_warehouse", "Container")
for val in vals:
attr = "cached_label__iexact"
if val.endswith("*"):
@@ -1277,7 +1257,7 @@ def _manage_hierarchic_fields(model, dct, and_reqs):
vals = [v.replace('"', "") for v in val.split(";")]
town_ids = []
for val in vals:
- q = models.Town.objects.filter(cached_label__iexact=val).values_list(
+ q = Town.objects.filter(cached_label__iexact=val).values_list(
"id", flat=True)
if not q.count():
continue
@@ -1287,7 +1267,7 @@ def _manage_hierarchic_fields(model, dct, and_reqs):
for rel_query in ("parents__", "children__"):
for idx in range(HIERARCHIC_LEVELS):
k = rel_query * (idx + 1) + "pk"
- q = models.Town.objects.filter(
+ q = Town.objects.filter(
**{k: town_id}).values_list("id", flat=True)
if not q.count():
break
@@ -1595,6 +1575,7 @@ def _manage_default_search(
dct, request, model, default_name, my_base_request, my_relative_session_names
):
pinned_search = ""
+ current_item_keys_dict = get_current_item_keys_dict()
pin_key = "pin-search-" + default_name
base_request = my_base_request if isinstance(my_base_request, dict) else {}
dct = {k: v for k, v in dct.items() if v}
@@ -1606,6 +1587,7 @@ def _manage_default_search(
): # an item is pinned
value = request.session[default_name]
if "basket-" in value:
+ FindBasket = apps.get_model("archaeological_finds", "FindBasket")
try:
dct = {"basket__pk": request.session[default_name].split("-")[-1]}
pinned_search = str(FindBasket.objects.get(pk=dct["basket__pk"]))
@@ -1630,9 +1612,9 @@ def _manage_default_search(
name in request.session
and request.session[name]
and "basket-" not in request.session[name]
- and name in CURRENT_ITEM_KEYS_DICT
+ and name in current_item_keys_dict
):
- up_model = CURRENT_ITEM_KEYS_DICT[name]
+ up_model = current_item_keys_dict[name]
try:
dct.update({key: request.session[name]})
up_item = up_model.objects.get(pk=dct[key])
@@ -2031,6 +2013,7 @@ def get_item(
no_link=False,
no_limit=False,
return_query=False,
+ ishtaruser=None, # could be provided when request is None
**dct,
):
available_perms = []
@@ -2044,7 +2027,8 @@ def get_item(
if "json" in data_type:
EMPTY = "[]"
- if data_type not in ("json", "csv", "json-image", "json-map", "json-stats"):
+ if not return_query and data_type not in (
+ "json", "csv", "json-image", "json-map", "json-stats"):
return HttpResponse(EMPTY, content_type="text/plain")
if data_type == "json-stats" and len(model.STATISTIC_MODALITIES) < 2:
@@ -2070,6 +2054,7 @@ def get_item(
own = True
if (
full == "shortcut"
+ and request
and "SHORTCUT_SEARCH" in request.session
and request.session["SHORTCUT_SEARCH"] == "own"
):
@@ -2077,13 +2062,23 @@ def get_item(
query_own = None
if own:
- q = models.IshtarUser.objects.filter(user_ptr=request.user)
- if not q.count():
- return HttpResponse(EMPTY, content_type="text/plain")
+ # TODO: verify alt_query_own
+ """
if alt_query_own:
query_own = getattr(model, alt_query_own)(q.all()[0])
else:
query_own = model.get_query_owns(q.all()[0])
+ print(query_own) # TODO - get old request to transform them
+ """
+ user_pk = request.user.pk if request else ishtaruser.pk
+ q = UserObjectPermission.objects.filter(
+ user_id=user_pk,
+ permission__codename=f"view_own_{model._meta.model_name}",
+ content_type=ContentType.objects.get_for_model(model)
+ )
+ query_own = Q(
+ pk__in=[int(pk) for pk in q.values_list("object_pk", flat=True)]
+ )
query_parameters = {}
@@ -2191,14 +2186,10 @@ def get_item(
request_keys.update(my_extra_request_keys)
# manage search on json fields and excluded fields
- if (
- search_form
- and request
- and request.user
- and getattr(request.user, "ishtaruser", None)
- ):
+ if search_form:
+ ishtaruser = request.user.ishtaruser if request else ishtaruser
available, __, excluded_fields, json_fields = search_form.check_custom_form(
- request.user.ishtaruser
+ ishtaruser
)
# for now no manage on excluded_fields: should we prevent search on
# some fields regarding the user concerned?
@@ -2218,10 +2209,12 @@ def get_item(
if "query" in dct:
request_items = dct["query"]
request_items["submited"] = True
- elif request.method == "POST":
+ elif request and request.method == "POST":
request_items = request.POST
- else:
+ elif request:
request_items = request.GET
+ else:
+ return HttpResponse(EMPTY, content_type="text/plain")
count = dct.get("count", False)
@@ -2271,7 +2264,7 @@ def get_item(
key = "name__icontains"
else:
key = "cached_label__icontains"
- dct[key] = request.GET.get("term", None)
+ dct[key] = (request and request.GET.get("term", None)) or None
try:
old = "old" in request_items and int(request_items["old"])
@@ -2332,11 +2325,12 @@ def get_item(
# manage default and pinned search and not bookmark
if (
not has_a_search
+ and request
and not request_items.get("search_vector", "")
and not request_items.get("submited", "")
and full != "shortcut"
):
- if data_type == "csv" and func_name in request.session:
+ if data_type == "csv" and func_name and func_name in request.session:
dct = request.session[func_name]
else:
# default search
@@ -2451,6 +2445,10 @@ def get_item(
# manage hierarchic in shortcut menu
if full == "shortcut":
+ File = apps.get_model("archaeological_files", "File")
+ Operation = apps.get_model("archaeological_operations", "Operation")
+ ContextRecord = apps.get_model("archaeological_context_records", "ContextRecord")
+ Find = apps.get_model("archaeological_finds", "Find")
ASSOCIATED_ITEMS = {
Operation: (File, "associated_file__pk"),
ContextRecord: (Operation, "operation__pk"),
@@ -2463,6 +2461,7 @@ def get_item(
if current:
dct = {upper_key: current}
query &= Q(**dct)
+ # print("ishtar_common/views_item.py - 2455")
# print(query, distinct_queries, base_query, exc_query, extras)
items = model.objects.filter(query)
for d_q in distinct_queries:
@@ -2540,6 +2539,7 @@ def get_item(
for col in cols:
query_table_cols += col.split("|")
+ Document = apps.get_model("ishtar_common", "Document")
# contextual (full, simple, etc.) col
contxt = full and "full" or "simple"
if (
@@ -2559,7 +2559,7 @@ def get_item(
table_cols.append("cached_label")
if data_type == "json-image":
prefix = ""
- if model != models.Document:
+ if model != Document:
prefix = "main_image__"
query_table_cols.append(prefix + "thumbnail")
table_cols.append(prefix + "thumbnail")
@@ -2584,7 +2584,7 @@ def get_item(
for k in request_items:
if not k.startswith("order["):
continue
- num = int(k.split("]")[0][len("order[") :])
+ num = int(k.split("]")[0][len("order["):])
if num not in sorts:
sorts[num] = ["", ""] # sign, col_num
if k.endswith("[dir]"):
@@ -2907,9 +2907,10 @@ def adapt_distant_search(params, src, model):
search_vector = params["search_vector"][0]
match = RE_FACET.search(search_vector)
final_search_vector = ""
+ ApiKeyMatch = apps.get_model("ishtar_common", "ApiKeyMatch")
while match:
key, value, __ = match.groups()
- q = models_rest.ApiKeyMatch.objects.filter(
+ q = ApiKeyMatch.objects.filter(
source=src,
search_model__model__iexact=model,
search_keys__contains=[key],
@@ -2929,10 +2930,11 @@ def adapt_distant_search(params, src, model):
def get_distant_item(request, model, external_source_id, data_type=None):
- # TODO: check permissions
+ # TODO: verify/test check permissions
+ ApiExternalSource = apps.get_model("ishtar_common", "ApiExternalSource")
try:
- src = models_rest.ApiExternalSource.objects.get(pk=external_source_id)
- except (models_rest.ApiExternalSource.DoesNotExist, ValueError):
+ src = ApiExternalSource.objects.get(pk=external_source_id)
+ except (ApiExternalSource.DoesNotExist, ValueError):
return HttpResponse("{}", content_type="text/plain")
url = src.url
url += reverse(f"api-search-{model}")
@@ -2954,7 +2956,7 @@ def get_distant_item(request, model, external_source_id, data_type=None):
"submited",
"data_type",
]
- app = models_rest.MAIN_MODELS[model]
+ app = API_MAIN_MODELS[model]
model_class = ContentType.objects.get(app_label=app, model=model).model_class()
bool_fields = model_class.REVERSED_BOOL_FIELDS + model_class.BOOL_FIELDS + model_class.CALLABLE_BOOL_FIELDS
is_empty_params = not any(
@@ -3010,9 +3012,10 @@ def external_export(request, source_id, model_name, slug):
if not url:
return HttpResponse('Unauthorized', status=401)
+ ApiExternalSource = apps.get_model("ishtar_common", "ApiExternalSource")
try:
- src = models_rest.ApiExternalSource.objects.get(pk=source_id)
- except (models_rest.ApiExternalSource.DoesNotExist, ValueError):
+ src = ApiExternalSource.objects.get(pk=source_id)
+ except (ApiExternalSource.DoesNotExist, ValueError):
return HttpResponse('Unauthorized', status=401)
url = src.url + url