import json from django.conf import settings from django.db.models import Q from django.utils.translation import activate, deactivate from rest_framework import authentication, permissions, generics from rest_framework.response import Response from rest_framework.views import APIView from ishtar_common import models_rest from ishtar_common.models_common import GeneralType from ishtar_common.views_item import get_item from ishtar_common.serializers_utils import generic_get_results class IpModelPermission(permissions.BasePermission): def has_permission(self, request, view): if not request.user or not getattr(request.user, "apiuser", None): return False ip_addr = request.META["REMOTE_ADDR"] q = view.search_model_query(request).filter(user__ip=ip_addr) return bool(q.count()) class SearchAPIView(APIView): model = None authentication_classes = (authentication.TokenAuthentication,) permission_classes = (permissions.IsAuthenticated, IpModelPermission) def __init__(self, **kwargs): assert self.model is not None super(SearchAPIView, self).__init__(**kwargs) def search_model_query(self, request): return models_rest.ApiSearchModel.objects.filter( user=request.user.apiuser, content_type__app_label=self.model._meta.app_label, content_type__model=self.model._meta.model_name, ) def get(self, request, format=None): _get_item = get_item( self.model, "get_" + self.model.SLUG, self.model.SLUG, no_permission_check=True # TODO: own_table_cols=get_table_cols_for_ope() - adapt columns ) search_model = self.search_model_query(request).all()[0] if search_model.limit_query: query = search_model.limit_query if request.GET.get("search_vector", None): query += " " + request.GET.get("search_vector") request.GET._mutable = True request.GET["search_vector"] = query request.GET._mutable = False data_type = "json" if request.GET.get("data_type", None): data_type = request.GET.get("data_type") response = _get_item(request, data_type=data_type) return response class FacetAPIView(APIView): authentication_classes = (authentication.TokenAuthentication,) permission_classes = (permissions.IsAuthenticated, IpModelPermission) # keep order for models and select_forms: first model match first select form, ... models = [] select_forms = [] def __init__(self, **kwargs): assert self.models assert self.select_forms assert len(self.models) == len(self.select_forms) super().__init__(**kwargs) def get(self, request, format=None): values = {} base_queries = self._get_base_search_model_queries() for idx, select_form in enumerate(self.select_forms): # only send types matching permissions model, q = base_queries[idx] if ( not models_rest.ApiSearchModel.objects.filter(user=request.user.apiuser) .filter(q) .count() ): continue ct_model = f"{model._meta.app_label}.{model._meta.model_name}" values[ct_model] = [] search_types = [ (search_type.key, search_type.model) for search_type in select_form.TYPES ] for key in select_form.base_fields: field = select_form.base_fields[key] associated_model = getattr(field.widget, "associated_model", None) if ( associated_model and issubclass(associated_model, GeneralType) and (key, associated_model) not in search_types ): search_types.append((key, associated_model)) for associated_key, associated_model in search_types: values_ct = [] for item in associated_model.objects.filter(available=True).all(): key = item.slug if hasattr(item, "slug") else item.txt_idx values_ct.append((key, str(item))) search_key = model.ALT_NAMES[associated_key].search_key search_keys = [] for language_code, language_lbl in settings.LANGUAGES: activate(language_code) search_keys.append(str(search_key).lower()) deactivate() values[ct_model].append( [ f"{associated_model._meta.app_label}.{associated_model._meta.model_name}", search_keys, values_ct, ] ) return Response(values, content_type="json") def _get_base_search_model_queries(self): """ Get list of (model, query) to match content_types """ return [ ( model, Q( content_type__app_label=model._meta.app_label, content_type__model=model._meta.model_name, ), ) for model in self.models ] def search_model_query(self, request): q = None for __, base_query in self._get_base_search_model_queries(): if not q: q = base_query else: q |= base_query return models_rest.ApiSearchModel.objects.filter( user=request.user.apiuser ).filter(q) class GetAPIView(generics.RetrieveAPIView): authentication_classes = (authentication.TokenAuthentication,) permission_classes = (permissions.IsAuthenticated, IpModelPermission) model = None def __init__(self, **kwargs): assert self.model is not None super().__init__(**kwargs) def search_model_query(self, request): return models_rest.ApiSearchModel.objects.filter( user=request.user.apiuser, content_type__app_label=self.model._meta.app_label, content_type__model=self.model._meta.model_name, ) def get(self, request, *args, **kwargs): _get_item = get_item( self.model, "get_" + self.model.SLUG, self.model.SLUG, no_permission_check=True # TODO: own_table_cols=get_table_cols_for_ope() - adapt columns ) search_model = self.search_model_query(request).all()[0] if search_model.limit_query: request.GET._mutable = True query = search_model.limit_query request.GET["search_vector"] = query request.GET._mutable = False q = _get_item(request, data_type="json", return_query=True).filter( pk=self.kwargs["pk"] ) if not q.count(): return Response({}, content_type="json") obj = q.all()[0] result = obj.full_serialize(search_model) return Response(result, content_type="json")