summaryrefslogtreecommitdiff
path: root/ishtar_common/rest.py
blob: 90f61f2aabc6ccdcb14779b0591521dd2718ee8a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import json

from django.conf import settings
from django.db.models import Q
from django.utils.translation import activate, deactivate

from rest_framework import authentication, permissions, generics
from rest_framework.response import Response
from rest_framework.views import APIView

from ishtar_common import models_rest
from ishtar_common.views_item import get_item
from ishtar_common.serializers_utils import generic_get_results


class IpModelPermission(permissions.BasePermission):
    def has_permission(self, request, view):
        if not request.user or not getattr(request.user, "apiuser", None):
            return False
        ip_addr = request.META["REMOTE_ADDR"]
        q = view.search_model_query(request).filter(user__ip=ip_addr)
        return bool(q.count())


class SearchAPIView(APIView):
    model = None
    authentication_classes = (authentication.TokenAuthentication,)
    permission_classes = (permissions.IsAuthenticated, IpModelPermission)

    def __init__(self, **kwargs):
        assert self.model is not None
        super(SearchAPIView, self).__init__(**kwargs)

    def search_model_query(self, request):
        return models_rest.ApiSearchModel.objects.filter(
            user=request.user.apiuser,
            content_type__app_label=self.model._meta.app_label,
            content_type__model=self.model._meta.model_name,
        )

    def get(self, request, format=None):
        _get_item = get_item(
            self.model,
            "get_" + self.model.SLUG,
            self.model.SLUG,
            no_permission_check=True
            # TODO: own_table_cols=get_table_cols_for_ope() - adapt columns
        )
        search_model = self.search_model_query(request).all()[0]
        if search_model.limit_query:
            query = search_model.limit_query
            if request.GET.get("search_vector", None):
                query += " " + request.GET.get("search_vector")
            request.GET._mutable = True
            request.GET["search_vector"] = query
            request.GET._mutable = False
        data_type = "json"
        if request.GET.get("data_type", None):
            data_type = request.GET.get("data_type")
        response = _get_item(request, data_type=data_type)
        return response


class FacetAPIView(APIView):
    authentication_classes = (authentication.TokenAuthentication,)
    permission_classes = (permissions.IsAuthenticated, IpModelPermission)
    # keep order for models and select_forms: first model match first select form, ...
    models = []
    select_forms = []

    def __init__(self, **kwargs):
        assert self.models
        assert self.select_forms
        assert len(self.models) == len(self.select_forms)
        super().__init__(**kwargs)

    def get(self, request, format=None):
        values = {}
        base_queries = self._get_base_search_model_queries()
        for idx, select_form in enumerate(self.select_forms):
            # only send types matching permissions
            model, q = base_queries[idx]
            if (
                not models_rest.ApiSearchModel.objects.filter(user=request.user.apiuser)
                .filter(q)
                .count()
            ):
                continue
            ct_model = f"{model._meta.app_label}.{model._meta.model_name}"
            values[ct_model] = []
            for type in select_form.TYPES:
                values_ct = []
                for item in type.model.objects.filter(available=True).all():
                    key = item.slug if hasattr(item, "slug") else item.txt_idx
                    values_ct.append((key, str(item)))
                search_key = model.ALT_NAMES[type.key].search_key
                search_keys = []
                for language_code, language_lbl in settings.LANGUAGES:
                    activate(language_code)
                    search_keys.append(str(search_key).lower())
                    deactivate()
                values[ct_model].append(
                    [
                        f"{type.model._meta.app_label}.{type.model._meta.model_name}",
                        search_keys,
                        values_ct,
                    ]
                )
        return Response(values, content_type="json")

    def _get_base_search_model_queries(self):
        """
        Get list of (model, query) to match content_types
        """
        return [
            (
                model,
                Q(
                    content_type__app_label=model._meta.app_label,
                    content_type__model=model._meta.model_name,
                ),
            )
            for model in self.models
        ]

    def search_model_query(self, request):
        q = None
        for __, base_query in self._get_base_search_model_queries():
            if not q:
                q = base_query
            else:
                q |= base_query
        return models_rest.ApiSearchModel.objects.filter(
            user=request.user.apiuser
        ).filter(q)


class GetAPIView(generics.RetrieveAPIView):
    authentication_classes = (authentication.TokenAuthentication,)
    permission_classes = (permissions.IsAuthenticated, IpModelPermission)
    model = None

    def __init__(self, **kwargs):
        assert self.model is not None
        super().__init__(**kwargs)

    def search_model_query(self, request):
        return models_rest.ApiSearchModel.objects.filter(
            user=request.user.apiuser,
            content_type__app_label=self.model._meta.app_label,
            content_type__model=self.model._meta.model_name,
        )

    def get(self, request, *args, **kwargs):
        _get_item = get_item(
            self.model,
            "get_" + self.model.SLUG,
            self.model.SLUG,
            no_permission_check=True
            # TODO: own_table_cols=get_table_cols_for_ope() - adapt columns
        )
        search_model = self.search_model_query(request).all()[0]
        if search_model.limit_query:
            request.GET._mutable = True
            query = search_model.limit_query
            request.GET["search_vector"] = query
            request.GET._mutable = False
        q = _get_item(request, data_type="json", return_query=True).filter(
            pk=self.kwargs["pk"]
        )
        if not q.count():
            return Response({}, content_type="json")
        obj = q.all()[0]
        result = obj.full_serialize()
        return Response(result, content_type="json")