diff options
-rw-r--r-- | archaeological_operations/tests.py | 108 | ||||
-rw-r--r-- | ishtar_common/views_item.py | 46 |
2 files changed, 108 insertions, 46 deletions
diff --git a/archaeological_operations/tests.py b/archaeological_operations/tests.py index b13bf99ff..cc0a8cb1b 100644 --- a/archaeological_operations/tests.py +++ b/archaeological_operations/tests.py @@ -24,6 +24,7 @@ from io import StringIO, BytesIO import tempfile import locale from unittest.mock import patch +from urllib.parse import quote as url_quote import zipfile import requests @@ -35,7 +36,7 @@ from django.core.files import File as DjangoFile from django.core.files.uploadedfile import SimpleUploadedFile from django.db.models import Q from django.test import tag -from django.test.client import Client +from django.test.client import Client, RequestFactory from django.urls import reverse from django.utils.text import slugify @@ -50,6 +51,7 @@ from . import models from ishtar_common import models_rest from ishtar_common.admin import update_types_from_source from ishtar_common.views import document_deletion_steps +from ishtar_common.views_item import get_item, adapt_distant_search from ishtar_common.serializers import document_serialization from archaeological_operations import views, serializers @@ -4442,15 +4444,25 @@ class ApiTest(OperationInitTest, APITestCase): self.create_operation(self.user, self.orgas[0]) self.create_operation(self.user, self.orgas[0]) self.create_operation(self.user, self.orgas[0]) + + keys = ("old-neolithic", "middle-neolithic", "recent-neolithic", + "final-neolithic") + old_neo = models.Period.objects.get(txt_idx='old-neolithic') + middle_neo = models.Period.objects.get(txt_idx='middle-neolithic') + recent_neo = models.Period.objects.get(txt_idx='recent-neolithic') + final_neo = models.Period.objects.get(txt_idx='final-neolithic') self.operation_0 = models.Operation.objects.get(pk=self.operations[0].pk) self.operation_0.code_patriarche = "28123" self.operation_0.save() + self.operation_0.periods.add(old_neo) self.operation_1 = models.Operation.objects.get(pk=self.operations[1].pk) self.operation_1.code_patriarche = "28124" self.operation_1.save() + self.operation_1.periods.add(middle_neo) self.operation_2 = models.Operation.objects.get(pk=self.operations[2].pk) self.operation_2.code_patriarche = "29123" self.operation_2.save() + self.operation_2.periods.add(recent_neo) self.operation_3 = models.Operation.objects.get(pk=self.operations[3].pk) self.operation_3.code_patriarche = "29124" self.operation_3.save() @@ -4755,36 +4767,83 @@ class ApiTest(OperationInitTest, APITestCase): self.assertIn(msg, response) # TODO: mark with no update + def _mock_search(self, mock_get, model, url): + _get_item = get_item( + model, + "get_" + model.SLUG, + model.SLUG, + no_permission_check=True + ) + mock_get.__call__ = _get_item + + def __json(): + return json.loads(mock_get.return_value.text) + + mock_get.return_value.json = __json + mock_get.return_value.status_code = 200 + + factory = RequestFactory() + request = factory.get(url) + request.user = self.user + mock_get.return_value.text = _get_item(request).content + + def __construct_search(self, source): + params = {"submited": "1", "search_vector": ['periode="Néolithique final"']} + adapt_distant_search(params, source, "operation") + attr = "?" + for idx, k in enumerate(params.keys()): + if idx: + attr += "&" + value = params[k] + if isinstance(value, list): + value = value[0] + attr += f"{k}={url_quote(value)}" + return attr + @patch("requests.get") def test_query_transformation(self, mock_get): - # POV: local + # POV: external # change query terms from a source Ishtar to match distant Ishtar - self._mock_request(mock_get) - self.client.login(username=self.username, password=self.password) + api_search_model = self.create_api_search_model() source = self._get_source() - url = "/admin/{}/{}/".format( - "ishtar_common", models_rest.ApiExternalSource.__name__.lower() + content_type = ContentType.objects.get_for_model(models.Period) + api_key, __ = models_rest.ApiKeyMatch.objects.get_or_create( + source=source, + associated_type=content_type, + search_model=api_search_model.content_type, + search_keys=["period", "periode"], + distant_slug="middle-neolithic", + distant_label="Néolithique moyen", + local_slug="middle-neolithic", + local_label="Néolithique moyen", + ) + + base_url = reverse("search-external", args=["operation", source.pk]) + params = self.__construct_search(source) + url = base_url + params + self._mock_search(mock_get, models.Operation, "/get-operation/" + params) + response = self.client.get( + url, format="json", HTTP_AUTHORIZATION=self.auth_token, ) - params = { - "action": "update_types_from_source", - "_selected_action": [source.pk], - } - self.client.post(url, params, follow=True) + self.assertEqual(response.status_code, 200) + j = json.loads(response.content.decode()) + self.assertEqual(j["recordsTotal"], 0, "error on classic search") + # change a key match - content_type = ContentType.objects.get_for_model(models.Period) - # no republic for local - models_rest.ApiKeyMatch.objects.get( - source=source, associated_type=content_type, - distant_key="republic" - ).delete() - # local have only neolithic period no detail - keys = ("old-neolithic", "middle-neolithic", "recent-neolithic", - "final-neolithic") - models_rest.ApiKeyMatch.objects.filter( - source=source, associated_type=content_type, - distant_key__in=keys).update( - local_slug="neolithic", local_label="Néolithique") + # final neolithic -> middle-neolitic + api_key.local_slug = "final-neolithic" + api_key.local_label = "Néolithique final" + api_key.save() + params = self.__construct_search(source) + url = base_url + params + self._mock_search(mock_get, models.Operation, "/get-operation/" + params) + response = self.client.get( + url, format="json", HTTP_AUTHORIZATION=self.auth_token, + ) + self.assertEqual(response.status_code, 200) + j = json.loads(response.content.decode()) + self.assertEqual(j["recordsTotal"], 1, "error on query transformation") def test_external_source_query(self): # POV: local @@ -4808,4 +4867,3 @@ class ApiTest(OperationInitTest, APITestCase): ) self.assertEqual(response.status_code, 200) j = json.loads(response.content.decode()) - print(j) diff --git a/ishtar_common/views_item.py b/ishtar_common/views_item.py index 242e57b2a..88f20020b 100644 --- a/ishtar_common/views_item.py +++ b/ishtar_common/views_item.py @@ -1886,7 +1886,7 @@ def get_item( my_base_request, my_relative_session_names, ) - elif func_name and request: + elif func_name and request and hasattr(request, "session"): request.session[func_name] = dct for k in request_keys: @@ -2394,26 +2394,7 @@ def get_item( return func -def get_distant_item( - request, - model, - external_source_id, - data_type=None - ): - # TODO: check permissions - try: - src = models_rest.ApiExternalSource.objects.get( - pk=external_source_id) - except (models_rest.ApiExternalSource.DoesNotExist, ValueError): - return HttpResponse("{}", content_type="text/plain") - url = src.url - url += reverse(f"api-search-{model}") - - params = {k: v for k, v in dict(request.GET).items() if not k.startswith("columns")} - params["submited"] = 1 - params["data_type"] = "json" - if data_type: - params["data_type"] = data_type +def adapt_distant_search(params, src, model): if "search_vector" in params and params["search_vector"]: search_vector = params["search_vector"][0] match = RE_FACET.search(search_vector) @@ -2437,6 +2418,29 @@ def get_distant_item( match = RE_FACET.search(search_vector) final_search_vector += search_vector params["search_vector"] = [final_search_vector] + + +def get_distant_item( + request, + model, + external_source_id, + data_type=None + ): + # TODO: check permissions + try: + src = models_rest.ApiExternalSource.objects.get( + pk=external_source_id) + except (models_rest.ApiExternalSource.DoesNotExist, ValueError): + return HttpResponse("{}", content_type="text/plain") + url = src.url + url += reverse(f"api-search-{model}") + + params = {k: v for k, v in dict(request.GET).items() if not k.startswith("columns")} + params["submited"] = 1 + params["data_type"] = "json" + if data_type: + params["data_type"] = data_type + adapt_distant_search(params, src, model) try: response = requests.get( url, |