import datetime import requests from django.conf import settings from django.db.models import Q from django.http import HttpResponse from django.shortcuts import reverse from django.utils.translation import activate, deactivate from rest_framework import authentication, permissions, generics from rest_framework.response import Response from rest_framework.views import APIView from ishtar_common import models_rest from ishtar_common.models_common import GeneralType from ishtar_common.models_imports import Importer from ishtar_common.views_item import get_item class IpModelPermission(permissions.BasePermission): def has_permission(self, request, view): if not request.user or not getattr(request.user, "apiuser", None): return False ip_addr = request.META["REMOTE_ADDR"] q = view.search_model_query(request).filter(user__ip=ip_addr) return bool(q.count()) class SearchAPIView(APIView): model = None authentication_classes = (authentication.TokenAuthentication,) permission_classes = (permissions.IsAuthenticated, IpModelPermission) def __init__(self, **kwargs): if not self.model: raise NotImplementedError("model attribute is not defined") super(SearchAPIView, self).__init__(**kwargs) def search_model_query(self, request): return models_rest.ApiSearchModel.objects.filter( user=request.user.apiuser, content_type__app_label=self.model._meta.app_label, content_type__model=self.model._meta.model_name, ) def get(self, request, slug=None, format=None, data_type="json"): search_model = self.search_model_query(request).all()[0] # get default importer if slug: q = search_model.export.filter(slug=slug) if not q.count(): return HttpResponse('Page not found', status=404) table_cols, col_names = q.all()[0].get_columns() else: table_cols, col_names = self.model.get_columns(dict_col_labels=False) _get_item = get_item( self.model, "get_" + self.model.SLUG, self.model.SLUG, no_permission_check=True, own_table_cols=table_cols ) if search_model.limit_query: query = search_model.limit_query if request.GET.get("search_vector", None): query += " " + request.GET.get("search_vector") request.GET._mutable = True request.GET["search_vector"] = query request.GET._mutable = False # source-1-689-source-1-1853 -> 689-1853 if "selected_ids" in request.GET: value = request.GET["selected_ids"] values = [] for idx, k in enumerate(value.split("-")): if idx % 3 == 2: values.append(k) request.GET._mutable = True request.GET["selected_ids"] = "-".join(values) request.GET._mutable = False if request.GET.get("data_type", None): data_type = request.GET.get("data_type") response = _get_item(request, col_names=col_names, data_type=data_type, type=data_type) return response class ExportAPIView(SearchAPIView): def get(self, request, slug=None, format=None, data_type="csv"): return super().get(request, slug=slug, format=format, data_type=data_type) class FacetAPIView(APIView): authentication_classes = (authentication.TokenAuthentication,) permission_classes = (permissions.IsAuthenticated, IpModelPermission) # keep order for models and select_forms: first model match first select form, ... models = [] select_forms = [] def __init__(self, **kwargs): if not self.models or not self.select_forms: raise NotImplementedError("models or select_forms attribute is not defined") if len(self.models) != len(self.select_forms): raise NotImplementedError("len of models and select_forms must be equals") super().__init__(**kwargs) def get(self, request, format=None): values = {} base_queries = self._get_base_search_model_queries() # config values["config"] = { "search_columns": "", "search_columns_label": "", "exports": "", "exports_label": "" } for model in self.models: model_name = f"{model._meta.app_label}-{model._meta.model_name}-" q = models_rest.ApiSearchModel.objects.filter( user=request.user.apiuser, content_type__app_label=model._meta.app_label, content_type__model=model._meta.model_name ) if not q.count(): continue search_model = q.all()[0] # cols, col_names = search_model.table_format.get_columns() cols, col_names = model.get_columns(dict_col_labels=False) cols, col_names = [c for c in cols if c], [c for c in col_names if c] if cols and col_names: if values["config"]["search_columns"]: values["config"]["search_columns"] += "||" values["config"]["search_columns_label"] += "||" values["config"]["search_columns"] += "||".join([ model_name + (col[0] if isinstance(col, list) else col) for col in cols ]) values["config"]["search_columns_label"] += "||".join([ model_name + col for col in col_names ]) for export in search_model.export.all(): if values["config"]["exports"]: values["config"]["exports"] += "||" values["config"]["exports_label"] += "||" values["config"]["exports"] += model_name + export.slug values["config"]["exports_label"] += model_name + export.name for idx, select_form in enumerate(self.select_forms): # only send types matching permissions model, q = base_queries[idx] if ( not models_rest.ApiSearchModel.objects.filter(user=request.user.apiuser) .filter(q) .count() ): continue ct_model = f"{model._meta.app_label}.{model._meta.model_name}" values[ct_model] = [] search_types = [ (search_type.key, search_type.model) for search_type in select_form.TYPES ] for key in select_form.base_fields: field = select_form.base_fields[key] associated_model = getattr(field.widget, "associated_model", None) if ( associated_model and issubclass(associated_model, GeneralType) and (key, associated_model) not in search_types ): search_types.append((key, associated_model)) for associated_key, associated_model in search_types: values_ct = [] for item in associated_model.objects.filter(available=True).all(): key = item.slug if hasattr(item, "slug") else item.txt_idx values_ct.append((key, str(item))) search_key = model.ALT_NAMES[associated_key].search_key search_keys = [] for language_code, language_lbl in settings.LANGUAGES: activate(language_code) search_keys.append(str(search_key).lower()) deactivate() values[ct_model].append( [ f"{associated_model._meta.app_label}.{associated_model._meta.model_name}", search_keys, values_ct, ] ) return Response(values, content_type="json") def _get_base_search_model_queries(self): """ Get list of (model, query) to match content_types """ return [ ( model, Q( content_type__app_label=model._meta.app_label, content_type__model=model._meta.model_name, ), ) for model in self.models ] def search_model_query(self, request): q = None for __, base_query in self._get_base_search_model_queries(): if not q: q = base_query else: q |= base_query return models_rest.ApiSearchModel.objects.filter( user=request.user.apiuser ).filter(q) class GetAPIView(generics.RetrieveAPIView): authentication_classes = (authentication.TokenAuthentication,) permission_classes = (permissions.IsAuthenticated, IpModelPermission) model = None def __init__(self, **kwargs): if self.model is None: raise NotImplementedError("model attribute is not defined") super().__init__(**kwargs) def search_model_query(self, request): return models_rest.ApiSearchModel.objects.filter( user=request.user.apiuser, content_type__app_label=self.model._meta.app_label, content_type__model=self.model._meta.model_name, ) def get(self, request, *args, **kwargs): _get_item = get_item( self.model, "get_" + self.model.SLUG, self.model.SLUG, no_permission_check=True # TODO: own_table_cols=get_table_cols_for_ope() - adapt columns ) search_model = self.search_model_query(request).all()[0] if search_model.limit_query: request.GET._mutable = True query = search_model.limit_query request.GET["search_vector"] = query request.GET._mutable = False q = _get_item(request, data_type="json", return_query=True).filter( pk=self.kwargs["pk"] ) if not q.count(): return Response({}, content_type="json") obj = q.all()[0] result = obj.full_serialize(search_model) return Response(result, content_type="json")