#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2018-2025 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. from collections import OrderedDict from copy import copy, deepcopy import csv import datetime import json import logging import re import requests # nosec: no user input used import subprocess # nosec from tempfile import NamedTemporaryFile from django.apps import apps from django.conf import settings from django.contrib.auth.models import Permission from django.contrib.contenttypes.models import ContentType from django.contrib.gis.geos import GEOSException from django.templatetags.static import static from django.core.cache import cache from django.core.exceptions import FieldDoesNotExist, ObjectDoesNotExist, PermissionDenied from django.db.models import ( F, Q, Count, Sum, ImageField, ExpressionWrapper, FloatField, FileField, ) from django.db.models.functions import ExtractYear from django.db.utils import ProgrammingError from django import forms from django.forms.models import model_to_dict from django.http import HttpResponse, Http404 from django.shortcuts import render from django.template import loader from django.urls import reverse, NoReverseMatch from django.utils import timezone from django.utils.translation import ( activate, deactivate, get_language, pgettext_lazy, gettext, gettext_lazy as _, ) from guardian.models import UserObjectPermission from tidylib import tidy_document as tidy from unidecode import unidecode from bootstrap_datepicker.widgets import DateField from ishtar_common.urls_converters import DateTimeConverter from ishtar_common.utils import ( API_MAIN_MODELS, check_model_access_control, CSV_OPTIONS, GENERAL_TYPE_PREFIX, get_all_field_names, get_current_item_keys_dict, get_current_profile, HistoryError, PRIVATE_FIELDS, SearchAltName, Round, ) from .menus import Menu logger = logging.getLogger(__name__) ENCODING = settings.ENCODING or "utf-8" HIERARCHIC_LEVELS = 5 LIST_FIELDS = { # key: hierarchic depth "conservatory_states": HIERARCHIC_LEVELS, "identifications": HIERARCHIC_LEVELS, "material_types": HIERARCHIC_LEVELS, "material_type": HIERARCHIC_LEVELS, "object_types": HIERARCHIC_LEVELS, "period": HIERARCHIC_LEVELS, "periods": HIERARCHIC_LEVELS, "source_type": HIERARCHIC_LEVELS, "unit": HIERARCHIC_LEVELS, "museum_collection_entry_mode": HIERARCHIC_LEVELS, "shooting_angle": HIERARCHIC_LEVELS, "technical_processes": HIERARCHIC_LEVELS, "structures": HIERARCHIC_LEVELS, "textures": HIERARCHIC_LEVELS, "inclusions": HIERARCHIC_LEVELS, "colors": HIERARCHIC_LEVELS, "development_type": HIERARCHIC_LEVELS, "monitoring_justification": HIERARCHIC_LEVELS, "documentations": HIERARCHIC_LEVELS, "excavation_technics": HIERARCHIC_LEVELS, "treatment_types": HIERARCHIC_LEVELS, "discovery_method": 0, "discovery_status": 0, "current_status": 0, "nature_of_site": 0, "interpretation_level": 0, "museum_inventory_marking_presence": 0, "museum_marking_type": 0, "museum_collection": 0, "batch": 0, "preservation_to_considers": 0, "integrities": 0, "remarkabilities": 0, "checked_type": 0, "material_type_quality": 0, "object_type_quality": 0, "communicabilities": 0, "alterations": 0, "alteration_causes": 0, "treatment_emergency": 0, "cultural_attributions": 0, "remains": 0, "dating_type": 0, "quality": 0, "operation_type": 0, "report_processing": 0, "record_quality_type": 0, "data_type": 0, "origin": 0, "provider": 0, "activity": 0, "person_types": 0, "relation_types": 0, "types": HIERARCHIC_LEVELS, # keep it at the end to not mess with other types } HIERARCHIC_FIELDS = list(LIST_FIELDS.keys()) def get_autocomplete_query(request, app, model_name): ishtaruser = getattr(request.user, "ishtaruser", None) if not ishtaruser or not request.GET.get("term"): return if ishtaruser.has_permission(f"{app}.view_{model_name}"): return Q() if not ishtaruser.has_permission(f"{app}.view_own_{model_name}"): return permission_id = Permission.objects.get(codename=f"view_own_{model_name}").id object_ids = [ int(pk) for pk in UserObjectPermission.objects.filter( permission_id=permission_id, user_id=request.user.id ).values_list("object_pk", flat=True) ] return Q(pk__in=object_ids) def get_autocomplete_queries(request, label_attributes, extra=None): if not label_attributes: return [Q(pk__isnull=True)] base_q = request.GET.get("term") or "" queries = [] splited_q = base_q.split(" ") for value_prefix, query_suffix, query_endswith in ( ("", "__startswith", True), # starts with (" ", "__icontains", True), # contain a word which starts with ("", "__endswith", False), # ends with ("", "__icontains", False), ): # contains alt_queries = [None] if len(splited_q) == 1 and query_endswith: alt_queries = ["__endswith", None] for alt_query in alt_queries: query = Q() if extra: query = Q(**extra) for q in splited_q: if not q: continue sub_q = Q(**{label_attributes[0] + query_suffix: value_prefix + q}) if alt_query: sub_q &= Q(**{label_attributes[0] + alt_query: q}) for other_label in label_attributes[1:]: sub_q = sub_q | Q(**{other_label + query_suffix: value_prefix + q}) query = query & sub_q queries.append(query) return queries def get_autocomplete_item(model, extra=None): if not extra: extra = {} def func(request, current_right=None, limit=20): meta = model._meta model_name = meta.model_name.lower() if model_name == "basefind": model_name = "find" base_query = get_autocomplete_query(request, meta.app_label, model_name) if base_query is None: return HttpResponse(content_type="text/plain") result = OrderedDict() base_query = model.objects.filter(base_query) for query in get_autocomplete_queries(request, ["cached_label"], extra=extra): objects = base_query.filter(query).values("cached_label", "id")[:limit] for obj in objects: if obj["id"] not in list(result.keys()): result[obj["id"]] = obj["cached_label"] limit -= 1 if not limit: break if not limit: break data = json.dumps( [{"id": obj[0], "value": obj[1]} for obj in list(result.items())] ) return HttpResponse(data, content_type="text/plain") return func def check_permission(request, action_slug, obj=None): if not request.user.ishtaruser: return False main_menu = Menu(request.user) main_menu.init() if action_slug not in main_menu.items: # not an action -> a classic permission if "." not in action_slug: return False if request.user.ishtaruser.has_permission(action_slug): return True if not obj: return False app, model_name = action_slug.split(".") parts = model_name.split("_") action_slug = f"{app}.{parts[0]}_own_{'_'.join(parts[1:])}" return request.user.ishtaruser.has_permission(action_slug, obj) if obj: return main_menu.items[action_slug].is_available( request.user, obj ) return main_menu.items[action_slug].can_be_available(request.user) def new_qa_item( model, frm, many=False, template="ishtar/forms/qa_new_item.html", page_name="", callback=None ): def func(request, parent_name=None, limits=""): not_permitted_msg = gettext("Operation not permitted.") meta = model._meta permission = f"{meta.app_label}.add_{meta.model_name}" if not check_permission(request, permission): return HttpResponse(not_permitted_msg) slug = model.SLUG if model.SLUG == "site": slug = "archaeologicalsite" url_slug = "new-" + slug current_page_name = page_name[:] model_name = model._meta.object_name if not current_page_name: current_page_name = _("New %s" % model_name.lower()) dct = { "page_name": str(current_page_name), "url": reverse(url_slug, args=[parent_name]), "slug": slug, "parent_name": parent_name, "many": many, } if request.method == "POST": dct["form"] = frm(request.POST, limits=limits) if dct["form"].is_valid(): new_item = dct["form"].save(request.user) lbl = str(new_item) if not lbl and hasattr(new_item, "_generate_cached_label"): lbl = new_item._generate_cached_label() dct["new_item_label"] = lbl dct["new_item_pk"] = new_item.pk dct["parent_pk"] = parent_name if dct["parent_pk"] and "_select_" in dct["parent_pk"]: parents = dct["parent_pk"].split("_") dct["parent_pk"] = "_".join([parents[0]] + parents[2:]) if callback: callback("new_qa_item", request, None, model.objects.filter(pk=new_item.pk)) return render(request, template, dct) else: dct["form"] = frm(limits=limits) return render(request, template, dct) return func def get_short_html_detail(model): def func(request, pk): not_permitted_msg = gettext("Operation not permitted.") try: item = model.objects.get(pk=pk) except model.DoesNotExist: return HttpResponse(not_permitted_msg) meta = model._meta permission = f"{meta.app_label}.view_{meta.model_name}" if not check_permission(request, permission, item): return HttpResponse(not_permitted_msg) html = item.get_short_html_detail() return HttpResponse(html) return func def modify_qa_item(model, frm, callback=None): def func(request, parent_name="", pk=None): template = "ishtar/forms/qa_new_item.html" model_name = model._meta.object_name not_permitted_msg = gettext("Operation not permitted.") try: item = model.objects.get(pk=pk) except model.DoesNotExist: return HttpResponse(not_permitted_msg) meta = model._meta permission = f"{meta.app_label}.change_{meta.model_name}" if not check_permission(request, permission, item): return HttpResponse(not_permitted_msg) slug = model.SLUG if model.SLUG == "site": slug = "archaeologicalsite" url_slug = "modify-" + slug dct = { "page_name": str(_("Modify a %s" % model_name.lower())), "url": reverse(url_slug, args=[parent_name, pk]), "slug": slug, "modify": True, "parent_name": parent_name, } if request.method == "POST": dct["form"] = frm(request.POST) if dct["form"].is_valid(): new_item = dct["form"].save(request.user, item) lbl = str(new_item) if not lbl and hasattr(new_item, "_generate_cached_label"): lbl = new_item._generate_cached_label() dct["new_item_label"] = lbl dct["new_item_pk"] = new_item.pk dct["parent_pk"] = parent_name if dct["parent_pk"] and "_select_" in dct["parent_pk"]: parents = dct["parent_pk"].split("_") dct["parent_pk"] = "_".join([parents[0]] + parents[2:]) if callback: callback("modify_qa_item", request, None, model.objects.filter(pk=new_item.pk)) return render(request, template, dct) else: data = model_to_dict(item) for k in list(data.keys()): if data[k] and isinstance(data[k], list) and hasattr(data[k][0], "pk"): data[k] = [i.pk for i in data[k]] dct["form"] = frm(initial=data) return render(request, template, dct) return func def display_item(model, extra_dct=None, show_url=None): def func(request, pk, **dct): if show_url: dct["show_url"] = "/{}{}/".format(show_url, pk) else: dct["show_url"] = "/show-{}/{}/".format(model.SLUG, pk) return render(request, "ishtar/display_item.html", dct) return func def show_source_item(request, source_id, model, name, base_dct, extra_dct): try: __, source_id, external_id = source_id.split("-") source_id, external_id = int(source_id), int(external_id) except ValueError: raise Http404() ApiExternalSource = apps.get_model("ishtar_common", "ApiExternalSource") # TODO: check permissions try: src = ApiExternalSource.objects.get(pk=source_id) except ApiExternalSource.DoesNotExist: return HttpResponse("{}", content_type="text/plain") url = src.url if not url.endswith("/"): url += "/" url += f"api/get/{model.SLUG}/{external_id}/" try: response = requests.get( url, timeout=20, headers={"Authorization": f"Token {src.key}"}, ) except requests.exceptions.Timeout: return HttpResponse("{}", content_type="text/plain") item = response.json() dct = deepcopy(base_dct) if not item: return HttpResponse("{}", content_type="text/plain") item["is_external"] = True item["current_source"] = src.name dct["item"], dct["item_name"] = item, name dct["is_external"] = True dct["SHOW_GEO"] = False if extra_dct: dct.update(extra_dct(request, dct)) for perm in Permission.objects.filter( codename__startswith='view_').values_list("codename", flat=True).all(): dct["permission_" + perm] = False permissions = ["permission_view_document"] for p in permissions: dct[p] = True for perm in ["document", "findbasket"]: dct[f"permission_change_own_{perm}"] = False dct[f"permission_change_{perm}"] = False tpl = loader.get_template(f"ishtar/sheet_{name}_window.html") content = tpl.render(dct, request) return HttpResponse(content, content_type="application/xhtml") def filter_sheet(ishtar_user, item): if not ishtar_user or ishtar_user.is_ishtaradmin: return item profile = ishtar_user.current_profile if not profile: return item exclude, keys = profile.profile_type.get_filters(item.__class__) if exclude is None: return item base_keys = [ "SLUG", "APP", "MODEL", "DELETE_URL" "HAS_QR_CODE", "id", "pk", "app_label", "model_name", "locked", "is_locked", "get_absolute_url", "get_extra_actions", "get_extra_templates", "can_edit", "can_delete" ] base_keys += getattr(item, "SHEET_BASE_KEYS", []) empty_keys = getattr(item, "SHEET_EMPTY_KEYS", []) if exclude: # cannot exclude base keys len_keys = len(keys) for idx, key in enumerate(reversed(keys)): if key in base_keys: keys.pop(len_keys - 1 - idx) else: keys += base_keys if exclude: for key in keys: param = getattr(item, key, None) if hasattr(param, "count"): # set a key_not_available variable in order to filter on the # sheet non setable variable such as queryset or properties setattr(item, key + "_not_available", True) try: setattr(item, key, None) except (TypeError, AttributeError): pass return item new_item = type("BaseObject", (object,), {}) for empty_key in empty_keys: setattr(new_item, empty_key, None) for key in keys: setattr(new_item, key, getattr(item, key, None)) return new_item PUNCTUATION = { "fr": { "colon": " :", "exclamation": " !", "question": " ?", }, "en": { "colon": ":", "exclamation": "!", "question": "?", }, } def show_item(model, name, extra_dct=None, model_for_perms=None, callback=None): def func(request, pk, **dct): check_model = model if model_for_perms: check_model = model_for_perms allowed, own = check_model_access_control(request, check_model) if not allowed: raise PermissionDenied() q = model.objects doc_type = "type" in dct and dct.pop("type") try: url = reverse("show-" + name, args=["0", ""]) except NoReverseMatch: url = reverse("show-" + name, args=[0]) url_name = ( "/".join(url.split("/")[:-2]) + "/" ) profile = get_current_profile() sheet_name = name for profile_key, alt_name in model.SHEET_ALTERNATIVES: if getattr(profile, profile_key): sheet_name = alt_name break dct["PROFILE"] = profile dct["CURRENCY"] = profile.currency dct["ENCODING"] = settings.ENCODING dct["DOT_GENERATION"] = settings.DOT_BINARY and profile.relation_graph dct["SHOW_GEO"] = profile.mapping current_language = get_language() if current_language not in PUNCTUATION: current_language = "en" dct["PUNCTUATION"] = PUNCTUATION[current_language] dct["model_slug"] = model.SLUG dct["current_window_url"] = url_name date = None if "date" in dct: date = dct.pop("date") dct["sheet_id"] = f"{name}-{pk}" dct["window_id"] = f"{name}-{pk}-{datetime.datetime.now().strftime('%M%s')}" dct["window_id_underscore"] = dct["window_id"].replace("-", "_") if str(pk).startswith("source-"): return show_source_item(request, pk, model, name, dct, extra_dct) q = q.filter(pk=pk) if not q.count(): raise PermissionDenied() item = q.all()[0] if own: meta = model._meta if not request.user.has_perm( f"{meta.app_label}.view_own_{meta.model_name}", item): raise PermissionDenied() if callback: callback("show_item", request, doc_type, q) # list current perms for perm in Permission.objects.filter( codename__startswith='view_').values_list("codename", flat=True).all(): dct["permission_" + perm] = False for perm in ["document", "findbasket"]: dct[f"permission_change_own_{perm}"] = False dct[f"permission_change_{perm}"] = False if hasattr(request.user, "ishtaruser") and request.user.ishtaruser: cache_key = "{}-{}-{}".format( settings.PROJECT_SLUG, "current-perms", request.session.session_key, ) permissions = cache.get(cache_key) if permissions is None: permissions = [] profile = request.user.ishtaruser.person.current_profile if profile: for group in profile.profile_type.groups.all(): for permission in group.permissions.all(): permissions.append(permission.codename) cache.set(cache_key, permissions, settings.CACHE_TIMEOUT) for perm in permissions: dct["permission_" + perm] = True if hasattr(item, "get_imports"): dct["get_import_list"] = item.get_imports(request.user, limit=5) dct["get_import_updated"] = item.get_imports_updated(request.user, limit=5) if hasattr(item, "history") and request.user.is_superuser: if date: try: if not isinstance(date, datetime.datetime): date = datetime.datetime.strptime( date, DateTimeConverter.date_format) dct["IS_HISTORY"] = True if item.get_last_history_date() != date: item = item.get_previous(date=date) if item is None: raise ValueError("No previous history entry.") dct["previous"] = item._previous dct["next"] = item._next else: date = None except ValueError: return HttpResponse("", content_type="text/plain") if not date: historized = item.history.all() if historized: item.history_date = historized[0].history_date if len(historized) > 1: dct["previous"] = historized[1].history_date if ( doc_type in ("odt", "pdf") and hasattr(item, "qrcode") and (not item.qrcode or not item.qrcode.name) ): item.generate_qrcode(request=request) ishtaruser = hasattr(request.user, "ishtaruser") and request.user.ishtaruser dct["item"], dct["item_name"] = filter_sheet(ishtaruser, item), name # add context if extra_dct: dct.update(extra_dct(request, item)) context_instance = deepcopy(dct) context_instance["output"] = "html" if hasattr(item, "associated_filename"): if hasattr(item, "history_object"): filename = item.history_object.associated_filename else: filename = item.associated_filename if doc_type == "odt" and settings.ODT_TEMPLATE: tpl = loader.get_template(f"ishtar/sheet_{sheet_name}.html") context_instance["output"] = "ODT" content = tpl.render(context_instance, request) tidy_options = { "output-xhtml": 1, "indent": 1, "tidy-mark": 0, "doctype": "auto", "add-xml-decl": 1, "wrap": 1, } html, errors = tidy(content, options=tidy_options) html = html.replace(" ", " ") html = re.sub("]*)>\n", "", html) odt = NamedTemporaryFile() html_source = NamedTemporaryFile() with open(html_source.name, "w") as html_file: html_file.write(html) pandoc_args = [ "pandoc", "-f", "html", "-t", "odt", "-o", odt.name, html_source.name, ] try: # nosec: no user input subprocess.check_call( # nosec pandoc_args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) except subprocess.CalledProcessError: return HttpResponse(content, content_type="application/xhtml") response = HttpResponse( content_type="application/vnd.oasis.opendocument.text" ) response["Content-Disposition"] = "attachment; filename={}.odt".format( filename ) with open(odt.name, "rb") as odt_file: response.write(odt_file.read()) return response tpl = loader.get_template(f"ishtar/sheet_{sheet_name}_window.html") content = tpl.render(context_instance, request) return HttpResponse(content, content_type="application/xhtml") return func def revert_item(model): def func(request, pk, date, **dct): try: item = model.objects.get(pk=pk) if not isinstance(date, datetime.datetime): date = datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%f") item.rollback(date) except (ObjectDoesNotExist, ValueError, HistoryError): return HttpResponse(None, content_type="text/plain") return HttpResponse("True", content_type="text/plain") return func def _get_values(request, val): if hasattr(val, "all"): # manage related objects vals = list(val.all()) else: vals = [val] new_vals = [] Organization = apps.get_model("ishtar_common", "Organization") Person = apps.get_model("ishtar_common", "Person") for v in vals: if callable(v): try: v = v() except TypeError: continue try: if ( not isinstance(v, (Person, Organization)) and hasattr(v, "url") and v.url ): v = ( (request.is_secure() and "https" or "http") + "://" + request.get_host() + v.url ) except ValueError: pass new_vals.append(v) return new_vals def _push_to_list(obj, current_group, depth): """ parse_parentheses helper function """ try: while depth > 0: current_group = current_group[-1] depth -= 1 except IndexError: # tolerant to parentheses mismatch pass if ( current_group and type(obj) in (str, str) and type(current_group[-1]) in (str, str) ): current_group[-1] += obj else: current_group.append(obj) true_strings = ["1", "true"] for language_code, language_lbl in settings.LANGUAGES: activate(language_code) true_strings.append(str(_("Yes")).lower()) true_strings.append(str(_("True")).lower()) deactivate() def is_true_string(val): val = str(val).lower().replace('"', "") if val in true_strings: return True def _parse_parentheses(s): """ Parse parentheses into list. (OA01 & (pierre | ciseau)) -> ["0A01 &", ["pierre | ciseau"]] """ groups = [] depth = 0 inside_quote = False for char in s: if char == '"': inside_quote = not inside_quote if not inside_quote: if char == "(": _push_to_list([], groups, depth) depth += 1 elif char == ")": if depth > 0: depth -= 1 else: _push_to_list(char, groups, depth) continue _push_to_list(char, groups, depth) # for non tolerant to parentheses mismatch check depth is equal to 0 return groups FORBIDDEN_CHAR = [":"] RESERVED_CHAR = ["|", "&"] RE_FACET = re.compile('([-a-zA-Z]+)="([^"]+)"(?:;"([^"]+)")*') def _parse_query_string( string, query_parameters, current_dct, exc_dct, extra_distinct_q ): string = string.strip().lower() match = RE_FACET.search(string) if match or "=" in string: base_term, queries = "", [] if match: for idx, gp in enumerate(match.groups()): if not idx: base_term = gp elif gp: queries.append('"{}"'.format(gp)) else: splited = string.split("=") if len(splited) == 2: base_term = splited[0] queries.append(splited[1]) if queries: excluded = base_term.startswith("-") if excluded: base_term = base_term[1:] if base_term in query_parameters: dct = current_dct term = query_parameters[base_term].search_query for query in queries: # callable request key for complex queries if callable(term): # !! distinct_queries not managed is_true = is_true_string(query) if excluded: is_true = not is_true cfltr, cexclude, cextra = term(is_true=is_true) if cfltr: if "and_reqs" not in dct: dct["and_reqs"] = [] dct["and_reqs"].append(cfltr) if cexclude: if "exc_and_reqs" not in dct: dct["exc_and_reqs"] = [] dct["exc_and_reqs"].append(cexclude) if cextra: dct["extras"].append(cextra) else: if query_parameters[base_term].distinct_query: extra_distinct_q.append({}) dct = extra_distinct_q[-1] if not query_parameters[base_term].distinct_query and excluded: dct = exc_dct if query_parameters[base_term].extra_query: dct.update(query_parameters[base_term].extra_query) if query_parameters[base_term].related_name and query == '"*"': term = query_parameters[base_term].related_name if term in dct: dct[term] += ";" + query else: dct[term] = query if query_parameters[base_term].distinct_query: for k in dct: # clean " dct[k] = dct[k].replace('"', "") # distinct query wait for a query _manage_clean_search_field(dct) extra_distinct_q[-1] = ~Q(**dct) if excluded else Q(**dct) return "" for reserved_char in FORBIDDEN_CHAR: string = string.replace(reserved_char, "") if len(string) != 1: for reserved_char in RESERVED_CHAR: string = string.replace(reserved_char, "") if not string: return "" if string.endswith("*"): if len(string.strip()) == 1: return "" string = string[:-1] + ":*" elif string not in ("&", "|", "!", "-"): # like search by default string = string + ":*" if string.startswith("-"): if len(string.strip()) == 1: return "" string = "!" + string[1:] return string def _parse_parentheses_groups( groups, query_parameters, current_dct=None, exc_dct=None, extra_distinct_q=None ): """ Transform parentheses groups to query :param groups: groups to transform (list) :param query_parameters: query keys for facet search :param current_dct: query dict :param exc_dct: exclude query dict :param exc_dct: exclude query dict :return: query string, query dict, excluded query dict """ if not current_dct: current_dct = {} if not exc_dct: exc_dct = {} if not extra_distinct_q: extra_distinct_q = [] if not isinstance(groups, list): string = groups.strip() if string.startswith('"') and string.endswith('"') and string.count('"') == 2: string = string[1:-1] # split into many groups if spaces # do not split inside quotes current_index = 0 found = string.find('"', current_index) SEP = "?ç;?" # replace spaces inside quote with this characters previous_quote = None while found != -1: if previous_quote is not None: string = ( string[0:previous_quote] + string[previous_quote:found].replace(" ", SEP) + string[found:] ) previous_quote = None # SEP is larger than a space found = string.find('"', current_index) else: previous_quote = found current_index = found + 1 found = string.find('"', current_index) string_groups = [gp.replace(SEP, " ") for gp in string.split(" ")] if len(string_groups) == 1: return ( _parse_query_string( string_groups[0], query_parameters, current_dct, exc_dct, extra_distinct_q, ), current_dct, exc_dct, extra_distinct_q, ) return _parse_parentheses_groups( string_groups, query_parameters, current_dct, exc_dct, extra_distinct_q ) if not groups: # empty list return "", current_dct, exc_dct, extra_distinct_q query = "(" previous_sep, has_item = None, False for item in groups: q, current_dct, exc_dct, extra_distinct_q = _parse_parentheses_groups( item, query_parameters, current_dct, exc_dct, extra_distinct_q ) q = q.strip() if not q: continue if q in ("|", "&"): if previous_sep or not has_item: continue # multiple sep is not relevant previous_sep = q continue if has_item: query += previous_sep or " & " query += q has_item = True previous_sep = None query += ")" if query == "()": query = "" unaccent_query = unidecode(query) if unaccent_query != query: query = f"({query} | {unaccent_query})" return query, current_dct, exc_dct, extra_distinct_q def _search_manage_search_vector( model, dct, exc_dct, distinct_queries, query_parameters ): if ( "search_vector" not in dct or not model._meta.managed ): # is a view - no search_vector return dct, exc_dct, distinct_queries search_vector = dct["search_vector"] parentheses_groups = _parse_parentheses(search_vector) ( search_query, extra_dct, extra_exc_dct, extra_distinct_q, ) = _parse_parentheses_groups(parentheses_groups, query_parameters) dct.update(extra_dct) distinct_queries += extra_distinct_q exc_dct.update(extra_exc_dct) if not search_query: return dct, exc_dct, distinct_queries # remove inside parenthesis search_query = search_query.replace("(", "").replace(")", "").strip() if search_query: if "extras" not in dct: dct["extras"] = [] dct["extras"].append( { "where": [ f"{model._meta.db_table}.search_vector @@ (to_tsquery(%s, %s)) = true OR " + f"{model._meta.db_table}.search_vector @@ (to_tsquery('simple', %s)) = true OR " f"{model._meta.db_table}.search_vector::tsvector @@ %s::tsquery = true" ], "params": [settings.ISHTAR_SEARCH_LANGUAGE, search_query, search_query, search_query], } ) return dct, exc_dct, distinct_queries def _manage_bool_fields(model, bool_fields, reversed_bool_fields, dct, or_reqs): bool_fields = list(bool_fields) + list(reversed_bool_fields) for k in bool_fields: if k not in dct: continue elif dct[k] == "1" or dct[k] == "unknown": dct.pop(k) continue dct[k] = dct[k].replace('"', "") if dct[k] in ["2", "yes", str(_("Yes")).lower(), "True"]: dct[k] = True else: dct[k] = False if k in reversed_bool_fields: dct[k] = not dct[k] # check also for empty value with image field field_names = k.split("__") # TODO: can be improved in later version of Django try: c_field = model._meta.get_field(field_names[0]) for field_name in field_names[1:-1]: if not hasattr(c_field, "related_model") or not c_field.related_model: return c_field = c_field.related_model._meta.get_field(field_name) if k.endswith("__isnull") and ( isinstance(c_field, (ImageField, FileField)) or field_names[-2] == "associated_url" ): key = "__".join(k.split("__")[:-1]) if dct[k]: or_reqs.append((k, {key + "__exact": ""})) else: dct[key + "__regex"] = ".{1}.*" except FieldDoesNotExist: pass def _manage_many_counted_fields(fields, reversed_fields, dct, excluded_dct): bool_fields = list(fields or []) + list(reversed_fields or []) for k in bool_fields: if k not in dct: continue elif dct[k] == "1" or dct[k] == "unknown": dct.pop(k) continue dct[k] = dct[k].replace('"', "") dct[k] = True if dct[k] in ["2", "yes", str(_("Yes")).lower()] else None if reversed_fields and k in reversed_fields: dct[k] = True if not dct[k] else None if dct[k]: dct[k] = False else: dct.pop(k) excluded_dct[k] = False def __manage_relative_search(value): """ Parse search to manage "=", "=>" and "=<" searches :param value: value :return: value, search type ("eq", "lte" or "gte") """ value = value.replace('"', "").strip() search_type = "eq" if value.startswith(">"): value = value[1:] search_type = "gte" elif value.startswith("<"): value = value[1:] search_type = "lte" return value, search_type def _manage_number(k, dct): """ Manage number from a search query :param k: number key :param dct: search dict :return: None -> search dict is modified """ values = dct[k].split(";") results = [] for value in values: value, search_type = __manage_relative_search(value) try: value = str(float(value.replace(",", "."))) if value.endswith(".0"): value = value[:-2] results.append((search_type, value)) except ValueError: continue mins = [value for dt, value in results if dt == "gte"] maxs = [value for dt, value in results if dt == "lte"] eqs = [value for dt, value in results if dt == "eq"] if eqs and not mins and not maxs: # TODO: min and max not available dct[k] = ";".join(eqs) return dct.pop(k) if mins: dct[k + "__gte"] = min(mins) if maxs: dct[k + "__lte"] = max(maxs) def _manage_number_fields(fields, dct): keys = list(dct.keys()) for key in fields: res = [j for j in keys if j.startswith(key)] if not res: continue for k in res: if not dct[k]: dct.pop(k) continue _manage_number(k, dct) today_lbl = pgettext_lazy("key for text search", "today") TODAYS = ["today"] for language_code, language_lbl in settings.LANGUAGES: activate(language_code) TODAYS.append(str(today_lbl)) deactivate() def _manage_date(k, dct): """ Manage date from a search query :param k: date key :param dct: search dict :return: None -> search dict is modified """ if not isinstance(dct[k], str): return values = dct[k].split(";") results = [] for value in values: # TODO: manage '*/10/2024' searches? if value.replace('"', "").replace("'", "") == "*": dct.pop(k) dct[k + "__isnull"] = False return value, date_type = __manage_relative_search(value) has_today = False for today in TODAYS: if value.startswith(today): base_date = datetime.date.today() value = value[len(today):].replace(" ", "") if value and value[0] in ("-", "+"): sign = value[0] try: days = int(value[1:]) except ValueError: days = 0 if days: if sign == "-": base_date = base_date - datetime.timedelta(days=days) else: base_date = base_date + datetime.timedelta(days=days) value = base_date.strftime("%Y-%m-%d") has_today = True break if has_today: results.append((date_type, value)) continue items = [] if "/" in value: items = list(reversed(value.split("/"))) elif "-" in value: # already date formated items = value.split("-") if len(items) != 3: continue try: results.append((date_type, datetime.datetime(*map(lambda x: int(x), items)).strftime( "%Y-%m-%d" ))) except ValueError: try: # try US format results.append(( date_type, datetime.datetime(*map(lambda x: int(x), reversed(items))).strftime("%Y-%m-%d") )) except ValueError: pass mins = [value for dt, value in results if dt == "gte"] maxs = [value for dt, value in results if dt == "lte"] eqs = [value for dt, value in results if dt == "eq"] if eqs and not mins and not maxs: # TODO: min and max not available when using single dates for datetime_field in BASE_DATETIME_FIELDS: # manage exact date match on datetime field if k.startswith(datetime_field): dct.pop(k) k = k + "__date" break dct[k] = ";".join(eqs) return dct.pop(k) if mins: dct[k + "__gte"] = min(mins) if maxs: dct[k + "__lte"] = max(maxs) def _manage_dated_fields(dated_fields, dct): keys = list(dct.keys()) for key in dated_fields: res = [j for j in keys if j.startswith(key)] if not res: continue for k in res: if k not in dct: continue if not dct[k]: dct.pop(k) continue _manage_date(k, dct) def _clean_type_val(val): for prefix in GENERAL_TYPE_PREFIX["prefix_codes"]: val = val.replace(prefix, "") val = val.strip() if val.startswith('"') and val.endswith('"'): val = '"{}"'.format(val[1:-1].strip()) return val def _manage_facet_search(model, dct, and_reqs): if not hasattr(model, "general_types"): return general_types = model.general_types() hierarchic_fields = HIERARCHIC_FIELDS[:] if hasattr(model, "hierarchic_fields"): hierarchic_fields += model.hierarchic_fields() for base_k in general_types: if base_k in hierarchic_fields: # already managed continue if base_k.endswith("_id"): k = base_k else: k = base_k + "__pk" if k not in dct or not dct[k].startswith('"') or not dct[k].startswith('"'): continue val = _clean_type_val(dct.pop(k)) if '";"' in val: # OR request values = val.split(";") else: values = [val] reqs = None for val in values: if not val.endswith('"') or not val.startswith('"'): continue lbl_name = "__label__" try: rel = getattr(model, base_k).field.related_model if not hasattr(rel, "label") and hasattr(rel, "cached_label"): lbl_name = "__cached_label__" except AttributeError: pass suffix = ( "{}icontains".format(lbl_name) if "%" in val else "{}iexact".format(lbl_name) ) query = val[1:-1].replace("*", "") if not reqs: reqs = Q(**{base_k + suffix: query}) else: reqs |= Q(**{base_k + suffix: query}) if reqs: and_reqs.append(reqs) POST_PROCESS_REQUEST = getattr(model, "POST_PROCESS_REQUEST", None) if not POST_PROCESS_REQUEST: return for k in dct: if k in POST_PROCESS_REQUEST and dct[k]: dct[k] = getattr(model, POST_PROCESS_REQUEST[k])(dct[k].replace('"', "")) def _manage_hierarchic_precise_town(req, dct, and_reqs): val = _clean_type_val(dct.pop(req)).strip('"') if val.startswith('"') and val.endswith('"'): val = val[1:-1] vals = [v.replace('"', "") for v in val.split(";")] town_ids = [] Town = apps.get_model("ishtar_common", "Town") for val in vals: q = Town.objects.filter(cached_label__iexact=val).values_list( "id", flat=True) if not q.count(): continue town_id = q.all()[0] town_ids.append(town_id) for rel_query in ("parents__", "children__"): for idx in range(HIERARCHIC_LEVELS): k = rel_query * (idx + 1) + "pk" q = Town.objects.filter( **{k: town_id}).values_list("id", flat=True) if not q.count(): break town_ids += list(q.all()) main_req = Q(**{req + "__in": town_ids}) and_reqs.append(main_req) def _manage_hierarchic_precise_town_area(req, dct, and_reqs): if req.endswith("pk"): suffix = "pk" elif req.endswith("label__iexact"): suffix = "label__iexact" else: return val = _clean_type_val(dct.pop(req)) if val.startswith('"') and val.endswith('"'): val = val[1:-1] if val == "*": req = req.replace("label__", "").replace("pk", "").replace( "iexact", "" ).replace("precise_town__areas", "precise_town_id") # * -> is attached to a town req += "isnull" reqs = Q(**{req: False}) and_reqs.append(reqs) return elif "*" in val and "iexact" in suffix: suffix = suffix.replace("iexact", "icontains") req = req.replace("iexact", "icontains") val = val.replace("*", "") Town = apps.get_model("ishtar_common", "Town") Area = apps.get_model("ishtar_common", "Area") q_area = Area.objects.filter(**{suffix: val}) q_towns = Town.objects.filter(areas__pk__in=q_area.values_list("pk", flat=True)) req = req.replace("precise_town__areas__label__iexact", "precise_town_id__in") val_towns = q_towns.values_list("pk", flat=True) reqs = Q(**{req: val_towns}) for idx in range(HIERARCHIC_LEVELS): suffix = "parent__" + suffix q_area = Area.objects.filter(**{suffix: val}) q_towns = Town.objects.filter(areas__pk__in=q_area.values_list("pk", flat=True)) if q_towns.count(): val_towns = q_towns.values_list("pk", flat=True) q = Q(**{req: val_towns}) reqs |= q and_reqs.append(reqs) def _manage_hierarchic_area(base_req, dct, and_reqs): if base_req.endswith("pk"): suffix = "pk" elif base_req.endswith("label__iexact"): suffix = "label__iexact" else: return vals = _clean_type_val(dct.pop(base_req)) query_list = [] for val in vals.split(";"): req = base_req[:] if val.startswith('"'): val = val[1:] if val.endswith('"'): val = val[:-1] if val == "*": req = req.replace("label__", "").replace("pk", "").replace("iexact", "") req += "isnull" reqs = Q(**{req: False}) and_reqs.append(reqs) return elif "*" in val and "iexact" in suffix: suffix = suffix.replace("iexact", "icontains") req = req.replace("iexact", "icontains") val = val.replace("*", "") reqs = Q(**{req: val}) for idx in range(HIERARCHIC_LEVELS): req = req[: -(len(suffix))] + "parent__" + suffix q = Q(**{req: val}) reqs |= q query_list.append(reqs) # TODO: improve query with "IN ()"? if not query_list: return q = query_list[0] for query in query_list[1:]: q |= query and_reqs.append(q) def _manage_hierarchic_fields(model, dct, and_reqs): hierarchic_fields = HIERARCHIC_FIELDS[:] if hasattr(model, "hierarchic_fields"): hierarchic_fields += model.hierarchic_fields() Town = apps.get_model("ishtar_common", "Town") for reqs in dct.copy(): if type(reqs) not in (list, tuple): reqs = [reqs] for req in reqs: if req.endswith( "precise_town__areas__label__iexact" ) or req.endswith("precise_town__areas__pk"): _manage_hierarchic_precise_town_area(req, dct, and_reqs) continue if req.endswith("areas__pk") or req.endswith("areas__label__iexact"): _manage_hierarchic_area(req, dct, and_reqs) continue if req.endswith("wcontainer_id") or req.endswith("wcontainer_ref_id"): val = _clean_type_val(dct.pop(req)).strip('"') if val.startswith('"') and val.endswith('"'): val = val[1:-1] vals = [v.replace('"', "") for v in val.split(";")] req = req[1:] # remove "w" container_ids = [] if "*" in vals: main_req = Q(**{req + "__isnull": False}) and_reqs.append(main_req) continue Container = apps.get_model("archaeological_warehouse", "Container") for val in vals: attr = "cached_label__iexact" if val.endswith("*"): attr = "cached_label__icontains" val = val[:-1] q = Container.objects.filter(**{attr: val}).values_list( "id", flat=True) if not q.count(): continue container_ids += list(q.all()) main_req = Q(**{req + "__in": container_ids}) and_reqs.append(main_req) continue if req.endswith("precise_town_id"): _manage_hierarchic_precise_town(req, dct, and_reqs) continue elif ( req.endswith("town__pk") or req.endswith("towns__pk") or req.endswith("town__cached_label__iexact") or req.endswith("towns__cached_label__iexact") ): if req.endswith("pk"): suffix = "pk" elif req.endswith("cached_label__iexact"): suffix = "cached_label__iexact" else: continue val = _clean_type_val(dct.pop(req)).strip('"') if val.startswith('"') and val.endswith('"'): val = val[1:-1] vals = [v.replace('"', "") for v in val.split(";")] main_req = None for val in vals: suf = suffix if "*" == val: req = req.replace("cached_label__", "").replace("pk", "").replace("iexact", "") req += "isnull" reqs = Q(**{req: False}) if not main_req: main_req = reqs else: main_req |= reqs continue elif "*" in val and "iexact" in suf: suf = suffix.replace("iexact", "icontains") req = req.replace("iexact", "icontains") val = val.replace("*", "") reqs = Q(**{req: val}) nreq = base_req = req[:] for idx in range(HIERARCHIC_LEVELS): nreq = nreq[: -(len(suf))] + "parents__" + suf q = Q(**{nreq: val}) reqs |= q nreq = base_req[:] for idx in range(HIERARCHIC_LEVELS): nreq = nreq[: -(len(suf))] + "children__" + suf q = Q(**{nreq: val}) reqs |= q if not main_req: main_req = reqs else: main_req |= reqs and_reqs.append(main_req) # TODO: improve query with "IN ()"? continue for k_hr in hierarchic_fields: lbl_name = "label" if hasattr(model, k_hr): rel = getattr(model, k_hr).field.related_model if not hasattr(rel, "label") and hasattr(rel, "cached_label"): lbl_name = "cached_label" if type(req) in (list, tuple): val = dct.pop(req) val = _clean_type_val(val) q = None for idx, r in enumerate(req): r = _clean_type_val(r) if not idx: q = Q(**{r: val}) else: q |= Q(**{r: val}) and_reqs.append(q) break elif req.endswith(k_hr + "__pk") or req.endswith( k_hr + "__{}__iexact".format(lbl_name) ): val = _clean_type_val(dct.pop(req)) if '";"' in val: # OR request values = val.split(";") else: values = [val] base_req = req[:] reqs = None if req.endswith("pk"): base_suffix = "pk" elif req.endswith("{}__iexact".format(lbl_name)): base_suffix = lbl_name + "__iexact" else: continue for val in values: suffix = base_suffix[:] req = base_req[:] if val.endswith('"') and val.startswith('"'): val = val[1:-1] # manage search text by label if "*" in val: suffix = lbl_name + "__icontains" else: suffix = lbl_name + "__iexact" current_values = val.strip().split("*") req = req[: -(len(base_suffix))] + suffix else: current_values = [val] new_req = None for idx, nval in enumerate(current_values): if not idx: new_req = Q(**{req: nval}) else: new_req &= Q(**{req: nval}) if not reqs: reqs = new_req else: reqs |= new_req hierarchic_levels = LIST_FIELDS[k_hr] if k_hr in LIST_FIELDS \ else HIERARCHIC_LEVELS for idx in range(hierarchic_levels): req = req[: -(len(suffix))] + "parent__" + suffix for idx, nval in enumerate(current_values): if not idx: new_req = Q(**{req: nval}) else: new_req &= Q(**{req: nval}) reqs |= new_req # TODO: improve query with "IN ()"? if reqs: and_reqs.append(reqs) break def _manage_clean_search_field(dct, exclude=None, reverse=False, related_name_fields=None): related_names = related_name_fields if related_name_fields else [] for k in list(dct.keys()): # clean quoted search field if not isinstance(dct[k], str): continue dct[k] = dct[k].replace('"', "") dct[k] = _clean_type_val(dct[k]) if "*" not in dct[k] or k.endswith("regex"): continue value = dct.pop(k).strip() base_key = k[:] if k.endswith("__iexact"): base_key = k[:-len("__iexact")] if value == "*": if k in related_names or not reverse: dct[base_key + "__isnull"] = False if exclude is not None and k.endswith("__iexact"): exclude[base_key + "__exact"] = "" continue if value.startswith("*"): value = value[1:] if value.endswith("*"): value = value[:-1] if value: dct[base_key + "__icontains"] = value elif exclude is not None: exclude[base_key + "__exact"] = "" def _get_relation_type_dict(my_relation_types_prefix, dct): relation_types = {} for rtype_key in my_relation_types_prefix: relation_types[my_relation_types_prefix[rtype_key]] = set() for rtype_key in my_relation_types_prefix: for keys in list(dct.keys()): if type(keys) not in (list, tuple): keys = [keys] for k in keys: if k.startswith(rtype_key): relation_types[my_relation_types_prefix[rtype_key]].add( dct.pop(k) ) return relation_types def _manage_relation_types(relation_types, dct, query, or_reqs): for rtype_prefix in relation_types: vals = relation_types[rtype_prefix] if not vals: continue vals = list(vals)[0].split(";") alt_dct = {} for v in vals: alt_dct = { rtype_prefix + "right_relations__relation_type__label__iexact": v.replace('"', "") } for k in dct: val = dct[k] if rtype_prefix: # only get conditions related to the object if rtype_prefix not in k: continue # tricky: reconstruct the key to make sense - remove the # prefix from the key k = ( k[0 : k.index(rtype_prefix)] + k[k.index(rtype_prefix) + len(rtype_prefix) :] ) if k.endswith("year"): k += "__exact" alt_dct[rtype_prefix + "right_relations__right_record__" + k] = val query |= Q(**alt_dct) for k, or_req in or_reqs: altor_dct = alt_dct.copy() altor_dct.pop(k) for j in or_req: val = or_req[j] if j == "year": j = "year__exact" altor_dct[rtype_prefix + "right_relations__right_record__" + j] = val query |= Q(**altor_dct) return query def _construct_query(relation_types, dct, or_reqs, and_reqs, excluded_relation=False): # excluded -> reverse logic if excluded_relation: and_reqs, or_reqs = or_reqs, and_reqs for key in dct: if isinstance(dct[key], str): values = [v for v in dct[key].split(";") if v] else: values = [dct[key]] if not values: values = [""] # filter empty value for value in values: or_reqs.append((key, {key: value})) dct = {} # manage multi value not already managed for key in list(dct.keys()): if isinstance(dct[key], str) and ";" in dct[key]: values = [v for v in dct[key].split(";") if v] if not values: dct.pop(key) continue dct[key] = values[0] if len(values) == 1: continue for v in values[1:]: or_reqs.append((key, {key: v})) for k in list(dct.keys()): if type(k) not in (list, tuple): continue first_key = k[0] value = dct[k][:] dct.pop(k) dct[first_key] = value for other_key in k[1:]: or_reqs.append((first_key, {other_key: value})) query = Q(**dct) for or_req in or_reqs: alt_dct = dct.copy() if isinstance(or_req, (tuple, list)): k, or_req = or_req if k in alt_dct: alt_dct.pop(k) alt_dct.update(or_req) query |= Q(**alt_dct) else: query |= (Q(**alt_dct) & Q(or_req)) query = _manage_relation_types(relation_types, dct, query, or_reqs) done = [] for and_req in and_reqs: str_q = str(and_req) if str_q in done: continue done.append(str_q) query = query & and_req return query RE_ID = re.compile(r"id\:(\d+)") ID_REPLACE_KEYS = ["__cached_label", "__label", "__iexact", "__icontains"] def _manage_direct_id(dct): """ Manage "id:1234" syntax """ for k, v in list(dct.items()): if not isinstance(v, str): continue m = RE_ID.match(v) if not m: continue dct.pop(k) pk = int(m.groups()[0]) new_k = k[:] for rep_key in ID_REPLACE_KEYS: new_k = new_k.replace(rep_key, "") dct[new_k] = pk def _manage_default_search( dct, request, model, default_name, my_base_request, my_relative_session_names ): pinned_search = "" current_item_keys_dict = get_current_item_keys_dict() pin_key = "pin-search-" + default_name base_request = my_base_request if isinstance(my_base_request, dict) else {} dct = {k: v for k, v in dct.items() if v} if pin_key in request.session and request.session[pin_key]: # a search is pinned pinned_search = request.session[pin_key] dct = {"search_vector": request.session[pin_key]} elif ( default_name in request.session and request.session[default_name] ): # an item is pinned value = request.session[default_name] if "basket-" in value: FindBasket = apps.get_model("archaeological_finds", "FindBasket") try: dct = {"basket__pk": request.session[default_name].split("-")[-1]} pinned_search = str(FindBasket.objects.get(pk=dct["basket__pk"])) except FindBasket.DoesNotExist: pass else: try: dct = {"pk": request.session[default_name]} pinned_search = '"{}"'.format(model.objects.get(pk=dct["pk"])) except model.DoesNotExist: pass elif dct == {k: v for k, v in base_request.items() if v}: if not hasattr(model, "UP_MODEL_QUERY"): logger.warning( "**WARN get_item**: - UP_MODEL_QUERY not defined for " "'{}'".format(model) ) else: # a parent item may be selected in the default menu for name, key in my_relative_session_names: if ( name in request.session and request.session[name] and "basket-" not in request.session[name] and name in current_item_keys_dict ): up_model = current_item_keys_dict[name] try: dct.update({key: request.session[name]}) up_item = up_model.objects.get(pk=dct[key]) if up_item.SLUG not in model.UP_MODEL_QUERY: logger.warning( "**WARN get_item**: - {} not in " "UP_MODEL_QUERY for {}'".format(up_item.SLUG, model) ) else: req_key, up_attr = model.UP_MODEL_QUERY[up_item.SLUG] pinned_search = '{}="{}"'.format( req_key, getattr(up_item, up_attr) ) break except up_model.DoesNotExist: pass return dct, pinned_search def _format_val(val): if val is None: return "" if type(val) == bool: if val: return str(_("True")) else: return str(_("False")) return str(val) def _format_geojson(rows, link_template, display_polygon): data = { "type": "FeatureCollection", "crs": {"type": "name", "properties": {"name": "EPSG:4326"}}, "link_template": link_template, "features": [], "no-geo": [], } if not rows: return data """ Columns: base: ['id', 'cached_label', 'main_geodata__cached_x', 'main_geodata__cached_y', 'point_x', 'point_y', 'locked', 'lock_user_id'] poly: ['id', 'cached_label', 'main_geodata__cached_x', 'main_geodata__cached_y', 'main_geodata__multi_line', 'main_geodata__multi_polygon', 'point_x', 'point_y', 'locked', 'lock_user_id'] """ delta = 2 if display_polygon else 0 for row in rows: properties = {"id": row[0], "name": row[1]} feature = None base_feature = { "type": "Feature", "properties": properties, } if display_polygon: if row[4]: # lines feature = base_feature feature["geometry"] = json.loads(row[4].geojson) elif row[5]: # polygons feature = base_feature feature["geometry"] = json.loads(row[5].geojson) if not feature: x, y = row[4 + delta], row[5 + delta] if not x or not y or x < -180 or x > 180 or y < -90 or y > 90: data["no-geo"].append(properties) continue feature = base_feature feature["geometry"] = {"type": "Point", "coordinates": [x, y]} data["features"].append(feature) return data def _get_data_from_query(items, query_table_cols, extra_request_keys, geo_fields=None): # TODO: manage data json field for query_keys in query_table_cols: if not isinstance(query_keys, (tuple, list)): query_keys = [query_keys] for query_key in query_keys: if query_key in extra_request_keys: # translate query term query_key = extra_request_keys[query_key] if isinstance(query_key, (list, tuple)): # only manage one level for display query_key = query_key[0] # clean for filtr in ("__icontains", "__contains", "__iexact", "__exact"): if query_key.endswith(filtr): query_key = query_key[: len(query_key) - len(filtr)] query_key.replace(".", "__") # class style to query values = ["id"] + query_table_cols if geo_fields: profile = get_current_profile() precision = profile.point_precision if precision is not None: exp_x = ExpressionWrapper( Round(geo_fields[0], precision), output_field=FloatField(), ) exp_y = ExpressionWrapper( Round(geo_fields[1], precision), output_field=FloatField(), ) else: exp_x = F(geo_fields[0]) exp_y = F(geo_fields[1]) items = items.annotate(point_x=exp_x) items = items.annotate(point_y=exp_y) values += ["point_x", "point_y"] if hasattr(items.model, "locked"): values.append("locked") values.append("lock_user_id") values = [v for v in values if v] # filter empty values return items.values_list(*values) def _get_data_from_query_old( items, query_table_cols, request, extra_request_keys, do_not_deduplicate=False ): c_ids, datas = [], [] has_lock = items and hasattr(items[0], "locked") for item in items: # manual deduplicate when distinct is not enough if not do_not_deduplicate and item.pk in c_ids: continue c_ids.append(item.pk) data = [item.pk] for keys in query_table_cols: if type(keys) not in (list, tuple): keys = [keys] my_vals = [] for k in keys: if k in extra_request_keys: k = extra_request_keys[k] if type(k) in (list, tuple): k = k[0] for filtr in ("__icontains", "__contains", "__iexact", "__exact"): if k.endswith(filtr): k = k[: len(k) - len(filtr)] vals = [item] # foreign key may be divided by "." or "__" splitted_k = [] for ky in k.split("."): if "__" in ky: splitted_k += ky.split("__") else: splitted_k.append(ky) if splitted_k[-1] == "count": splitted_k = splitted_k[:-2] + [splitted_k[-2] + "__count"] for ky in splitted_k: new_vals = [] for val in vals: if ky.endswith("__count"): new_vals += [getattr(val, ky[: -len("__count")]).count()] elif hasattr(val, "all"): # manage related objects val = list(val.all()) for v in val: v = getattr(v, ky) new_vals += _get_values(request, v) elif val and isinstance(val, dict): if ky in val: val = val[ky] new_vals += _get_values(request, val) elif val: try: val = getattr(val, ky) new_vals += _get_values(request, val) except (AttributeError, GEOSException): # must be a query key such as "contains" pass vals = new_vals # manage last related objects if vals and hasattr(vals[0], "all"): new_vals = [] for val in vals: new_vals += list(val.all()) vals = new_vals if not my_vals: my_vals = [_format_val(va) for va in vals] else: new_vals = [] if not vals: for idx, my_v in enumerate(my_vals): new_vals.append("{}{}{}".format(my_v, " - ", "")) else: for idx, v in enumerate(vals): new_vals.append( "{}{}{}".format(vals[idx], " - ", _format_val(v)) ) my_vals = new_vals[:] data.append(" & ".join(set(my_vals)) or "") if has_lock: data.append(item.locked) data.append(item.lock_user_id) datas.append(data) return datas def _format_modality(value): if value is None: value = str(_("Unknown")) if isinstance(value, bool): value = str(_(str(value))) return value def _get_json_stats_optimized( items, stats_sum_variable, stats_modality_1, stats_modality_2, multiply=1 ): q = items value_keys = [] for stat in (stats_modality_1, stats_modality_2): if not stat: continue if stat.endswith("__year"): q = q.annotate(**{stat: ExtractYear(stat[: -len("__year")])}) value_keys.append(stat) q = q.values(*value_keys) if stats_sum_variable == "pk": q = q.annotate(sum=Count("pk")) else: q = q.annotate(sum=Sum(stats_sum_variable)) data = [] if stats_modality_2 and stats_modality_2 != stats_modality_1: q = q.order_by(stats_modality_1, stats_modality_2) for values in q.all(): modality_1 = _format_modality(values[stats_modality_1]) if not data or data[-1][0] != modality_1: data.append([modality_1, []]) data[-1][1].append( ( _format_modality(values[stats_modality_2]), int((values["sum"] or 0) * multiply), ) ) else: q = q.order_by(stats_modality_1) for values in q.all(): modality_1 = values[stats_modality_1] data.append( [_format_modality(modality_1), int((values["sum"] or 0) * multiply)] ) data = json.dumps({"data": data}) return HttpResponse(data, content_type="application/json") def _get_json_stats( items, stats_sum_variable, stats_modality_1, stats_modality_2, multiply=1 ): # _get_json_stats_optimized should work # but problem on container with criteria on CVL q = items value_keys = [] for stat in (stats_modality_1, stats_modality_2): if not stat: continue if stat.endswith("__year"): q = q.annotate(**{stat: ExtractYear(stat[:-len("__year")])}) value_keys.append(stat) value_keys.append(stats_sum_variable) q = q.values(*value_keys) data = [] if stats_modality_2 and stats_modality_2 != stats_modality_1: q = q.order_by(stats_modality_1, stats_modality_2) results = {} for values in q.all(): value = values[stats_sum_variable] or 0 if stats_sum_variable == "pk": value = 1 modality_1 = _format_modality(values[stats_modality_1]) modality_2 = _format_modality(values[stats_modality_2]) key = (modality_1, modality_2) if key not in results: results[key] = 0 results[key] += value * multiply for key in results: modality_1, modality_2 = key if not data or data[-1][0] != modality_1: data.append([modality_1, []]) data[-1][1].append((modality_2, results[key])) else: q = q.order_by(stats_modality_1) results = {} for values in q.all(): value = values[stats_sum_variable] or 0 if stats_sum_variable == "pk": value = 1 modality_1 = _format_modality(values[stats_modality_1]) if modality_1 not in results: results[modality_1] = 0 results[modality_1] += value * multiply for modality_1 in results: data.append([modality_1, results[modality_1]]) data = json.dumps({"data": data}) return HttpResponse(data, content_type="application/json") def _get_table_cols(request, data_type, own_table_cols, full, model): # list of table cols depending on configuration and data send if data_type == "json-map": return [] # only pk for map if own_table_cols: table_cols = own_table_cols else: if full: table_cols = [ field.name for field in model._meta.fields if field.name not in PRIVATE_FIELDS ] table_cols += [ field.name for field in model._meta.many_to_many if field.name not in PRIVATE_FIELDS ] if hasattr(model, "EXTRA_FULL_FIELDS"): table_cols += model.EXTRA_FULL_FIELDS else: tb_key = (getattr(model, "SLUG", None), "TABLE_COLS") if tb_key in settings.TABLE_COLS: table_cols = settings.TABLE_COLS[tb_key] else: table_cols = model.TABLE_COLS if callable(table_cols): table_cols = table_cols() table_cols = list(table_cols) if not hasattr(model, "TABLE_COLS_FILTERS"): return table_cols filtered_table_cols = [] if table_cols: # table_cols for exports are contained inside tables table_cols = [tc[0] if (tc and isinstance(tc, (list, tuple))) else tc for tc in table_cols] for col_name in table_cols: for key in model.TABLE_COLS_FILTERS: if not col_name.startswith(key) or \ request.user.ishtaruser.has_permission( model.TABLE_COLS_FILTERS[key] ): filtered_table_cols.append(col_name) return filtered_table_cols def split_dict(dct): if not dct.get("search_vector", None): return [("OR", dct)] new_dcts = [] # TODO: manage || and && syntax in the same query # example: to extract [[]] parenthesis re.findall(r"(.*)\[\[ (.*?) \]\](.*)", s) split_key, split_type = " || ", "OR" if " && " in dct["search_vector"]: split_key, split_type = " && ", "AND" for vector in dct["search_vector"].split(split_key): new_dct = deepcopy(dct) new_dct["search_vector"] = vector new_dcts.append((split_type, new_dct)) return new_dcts def main_manager( request, model, query_own, full, dct, distinct_queries, query_parameters, my_relation_types_prefix, my_bool_fields, my_reversed_bool_fields, related_name_fields, many_counted_fields, reversed_many_counted_fields, my_dated_fields, my_number_fields, and_reqs ): excluded_dct = {} or_reqs = [] exc_and_reqs, exc_or_reqs = [], [] dct, excluded_dct, distinct_queries = _search_manage_search_vector( model, dct, excluded_dct, distinct_queries, query_parameters, ) if "search_vector" in dct: dct.pop("search_vector") # manage relations types if "relation_types" not in my_relation_types_prefix: my_relation_types_prefix["relation_types"] = "" relation_types = _get_relation_type_dict(my_relation_types_prefix, dct) exc_relation_types = _get_relation_type_dict(my_relation_types_prefix, excluded_dct) _manage_bool_fields( model, my_bool_fields, my_reversed_bool_fields, dct, or_reqs ) _manage_bool_fields( model, my_bool_fields, my_reversed_bool_fields, excluded_dct, exc_or_reqs ) tmp_excluded = {} _manage_many_counted_fields( many_counted_fields, reversed_many_counted_fields, dct, tmp_excluded ) _manage_many_counted_fields( many_counted_fields, reversed_many_counted_fields, excluded_dct, dct ) if tmp_excluded: excluded_dct.update(tmp_excluded) # dated_fields, number_fields # ['signature_date', ...], ['signature_date__year', ...] # -> remove 'signature_date' filtered_dated_fields = [] for field_name in my_dated_fields: exc = False for number_field in my_number_fields: if number_field.startswith(field_name): exc = True break if not exc: filtered_dated_fields.append(field_name) my_dated_fields = filtered_dated_fields _manage_dated_fields(my_dated_fields, dct) _manage_dated_fields(my_dated_fields, excluded_dct) _manage_number_fields(my_number_fields, dct) _manage_number_fields(my_number_fields, excluded_dct) _manage_hierarchic_fields(model, dct, and_reqs) _manage_hierarchic_fields(model, excluded_dct, exc_and_reqs) _manage_facet_search(model, dct, and_reqs) _manage_facet_search(model, excluded_dct, exc_and_reqs) extras = [] if "extras" in dct: extras = dct.pop("extras") if "and_reqs" in dct: and_reqs += dct.pop("and_reqs") if "exc_and_reqs" in dct: exc_and_reqs += dct.pop("exc_and_reqs") updated_excluded = {} _manage_clean_search_field( dct, updated_excluded, related_name_fields=related_name_fields ) _manage_clean_search_field( excluded_dct, dct, reverse=True, related_name_fields=related_name_fields ) if updated_excluded: excluded_dct.update(updated_excluded) # manage direct ID 'id:1234' syntax - used by "{USER}" search _manage_direct_id(dct) _manage_direct_id(excluded_dct) query = _construct_query(relation_types, dct, or_reqs, and_reqs) exc_query = None if excluded_dct or exc_and_reqs or exc_or_reqs or exc_relation_types: exc_query = _construct_query( exc_relation_types, excluded_dct, exc_or_reqs, exc_and_reqs, excluded_relation=True ) if query_own: query = query & query_own # manage hierarchic in shortcut menu if full == "shortcut": query = manage_hierarchy_shorcut(model, request, query) return query, exc_query, extras def manage_hierarchy_shorcut(model, request, query): File = apps.get_model("archaeological_files", "File") Operation = apps.get_model("archaeological_operations", "Operation") ContextRecord = apps.get_model("archaeological_context_records", "ContextRecord") Find = apps.get_model("archaeological_finds", "Find") ASSOCIATED_ITEMS = { Operation: (File, "associated_file__pk"), ContextRecord: (Operation, "operation__pk"), Find: (ContextRecord, "base_finds__context_record__pk"), } if model in ASSOCIATED_ITEMS: upper_model, upper_key = ASSOCIATED_ITEMS[model] model_name = upper_model.SLUG current = model_name in request.session and request.session[model_name] if current: dct = {upper_key: current} query &= Q(**dct) return query DEFAULT_ROW_NUMBER = 10 # length is used by ajax DataTables requests EXCLUDED_FIELDS = ["length"] BASE_DATETIME_FIELDS = ["created", "last_modified"] def get_item( model, func_name, default_name, extra_request_keys=None, base_request=None, bool_fields=None, reversed_bool_fields=None, dated_fields=None, associated_models=None, relative_session_names=None, specific_perms=None, own_table_cols=None, relation_types_prefix=None, do_not_deduplicate=False, model_for_perms=None, alt_query_own=None, search_form=None, no_permission_check=False, callback=None, ): """ Generic treatment of tables :param model: model used for query :param func_name: name of the function (used for session storage) :param default_name: key used for default search in session :param extra_request_keys: default query limitation :param base_request: :param bool_fields: :param reversed_bool_fields: :param dated_fields: :param associated_models: :param relative_session_names: :param specific_perms: :param own_table_cols: :param relation_types_prefix: :param do_not_deduplicate: duplication of id can occurs on large queryset a mechanism of deduplication is used. But duplicate ids can be normal (for instance for record_relations view). :param model_for_perms: use another model to check permission :param alt_query_own: name of alternate method to get query_own :param search_form: associated search form to manage JSON query keys :callback: callback to execute after request. It is called with three arguments: request, export format and queryset of the result :return: """ def func( request, data_type="json", full=False, force_own=False, col_names=None, no_link=False, no_limit=False, return_query=False, ishtaruser=None, # could be provided when request is None **dct, ): available_perms = [] if specific_perms: available_perms = specific_perms[:] EMPTY = "" if "type" in dct: data_type = dct.pop("type") if not data_type: data_type = "json" if "json" in data_type: EMPTY = "[]" if not return_query and data_type not in ( "json", "csv", "json-image", "json-map", "json-stats"): return HttpResponse(EMPTY, content_type="text/plain") if data_type == "json-stats" and len(model.STATISTIC_MODALITIES) < 2: return HttpResponse(EMPTY, content_type="text/plain") model_to_check = model if model_for_perms: model_to_check = model_for_perms if no_permission_check: allowed, own = True, False else: if return_query: allowed, own = True, False else: allowed, own = check_model_access_control( request, model_to_check, available_perms ) if not allowed: return HttpResponse(EMPTY, content_type="text/plain") if force_own: own = True if ( full == "shortcut" and request and "SHORTCUT_SEARCH" in request.session and request.session["SHORTCUT_SEARCH"] == "own" ): own = True query_parameters = {} if hasattr(model, "get_query_parameters"): query_parameters = model.get_query_parameters() # get defaults from model if not extra_request_keys: my_extra_request_keys = copy(model.EXTRA_REQUEST_KEYS or {}) if query_parameters: for key in query_parameters: my_extra_request_keys[key] = query_parameters[key].search_query else: my_extra_request_keys = copy(extra_request_keys or {}) if base_request is None and hasattr(model, "BASE_REQUEST"): if callable(model.BASE_REQUEST): my_base_request = model.BASE_REQUEST(request) else: my_base_request = copy(model.BASE_REQUEST) elif base_request is not None: my_base_request = copy(base_request) else: my_base_request = {} if not bool_fields and hasattr(model, "BOOL_FIELDS"): my_bool_fields = model.BOOL_FIELDS[:] else: my_bool_fields = bool_fields[:] if bool_fields else [] if not reversed_bool_fields and hasattr(model, "REVERSED_BOOL_FIELDS"): my_reversed_bool_fields = model.REVERSED_BOOL_FIELDS[:] else: my_reversed_bool_fields = ( reversed_bool_fields[:] if reversed_bool_fields else [] ) many_counted_fields = getattr(model, "MANY_COUNTED_FIELDS", None) reversed_many_counted_fields = getattr( model, "REVERSED_MANY_COUNTED_FIELDS", None ) my_number_fields = getattr(model, "NUMBER_FIELDS", [])[:] if not dated_fields and hasattr(model, "DATED_FIELDS"): my_dated_fields = model.DATED_FIELDS[:] else: my_dated_fields = dated_fields[:] if dated_fields else [] my_dated_fields += BASE_DATETIME_FIELDS if not associated_models and hasattr(model, "ASSOCIATED_MODELS"): my_associated_models = model.ASSOCIATED_MODELS[:] else: my_associated_models = associated_models[:] if associated_models else [] if not relative_session_names and hasattr(model, "RELATIVE_SESSION_NAMES"): my_relative_session_names = model.RELATIVE_SESSION_NAMES[:] else: my_relative_session_names = ( relative_session_names[:] if relative_session_names else [] ) if not relation_types_prefix and hasattr(model, "RELATION_TYPES_PREFIX"): my_relation_types_prefix = copy(model.RELATION_TYPES_PREFIX) else: my_relation_types_prefix = ( copy(relation_types_prefix) if relation_types_prefix else {} ) fields = [model._meta.get_field(k) for k in get_all_field_names(model)] request_keys = dict( [ ( field.name, field.name + ( hasattr(field, "remote_field") and field.remote_field and "__pk" or "" ), ) for field in fields ] ) # add keys of associated models to available request key for associated_model, key in my_associated_models: if type(associated_model) == str: if associated_model not in globals(): continue associated_model = globals()[associated_model] associated_fields = [ associated_model._meta.get_field(k) for k in get_all_field_names(associated_model) ] request_keys.update( dict( [ ( key + "__" + field.name, key + "__" + field.name + (hasattr(field, "rel") and field.rel and "__pk" or ""), ) for field in associated_fields ] ) ) request_keys.update(my_extra_request_keys) # manage search on json fields and excluded fields ishtaruser = request.user.ishtaruser if request else ishtaruser if search_form: available, __, excluded_fields, json_fields = search_form.check_custom_form( ishtaruser ) # for now no manage on excluded_fields: should we prevent search on # some fields regarding the user concerned? if available: for __, jkey, jfield in json_fields: if jfield.alt_name not in request_keys: if isinstance( jfield, (forms.NullBooleanField, forms.BooleanField) ): my_bool_fields.append(jkey) request_keys[jfield.alt_name] = jkey elif isinstance(jfield, DateField): my_dated_fields.append(jkey) request_keys[jfield.alt_name] = jkey else: request_keys[jfield.alt_name] = jkey + "__iexact" if "query" in dct: request_items = dct["query"] request_items["submited"] = True elif request and request.method == "POST": request_items = request.POST elif request: request_items = request.GET else: return HttpResponse(EMPTY, content_type="text/plain") count = dct.get("count", False) # pager try: row_nb = int(request_items.get("length")) except (ValueError, TypeError): row_nb = DEFAULT_ROW_NUMBER if data_type == "json-map": # other limit for map row_nb = settings.ISHTAR_MAP_MAX_ITEMS if no_limit or ( data_type == "json-map" and request_items.get("no_limit", False) ): row_nb = None display_polygon = False if data_type == "json-map" and request_items.get("display_polygon", False): display_polygon = True dct_request_items = {} # filter requested fields for k in request_items: if k in EXCLUDED_FIELDS: continue key = k[:] if key.startswith("searchprefix_"): key = key[len("searchprefix_"):] dct_request_items[key] = request_items[k] request_items = dct_request_items base_query = None if isinstance(my_base_request, Q): base_query = my_base_request dct = {} else: dct = copy(my_base_request) and_reqs = [] distinct_queries = [] dct["extras"], dct["and_reqs"], dct["exc_and_reqs"] = [], [], [] if full == "shortcut": if model.SLUG == "warehouse": key = "name__icontains" else: key = "cached_label__icontains" dct[key] = (request and request.GET.get("term", None)) or None try: old = "old" in request_items and int(request_items["old"]) except ValueError: return HttpResponse("[]", content_type="text/plain") selected_ids = request_items.get("selected_ids", None) if selected_ids: if "-" in selected_ids: q = Q(pk__in=selected_ids.split("-")) else: q = Q(pk=selected_ids) and_reqs.append(q) # translate submited (and default) parameters to dict and queries for k in request_keys: val = request_items.get(k) if not val: continue # manage ambiguity between start_date and "start" for pagination if k == "start": try: int(val) # if can be converted in int -> pagination continue except ValueError: pass req_keys = request_keys[k] target = dct if k in query_parameters: if query_parameters[k].distinct_query: distinct_queries.append({}) target = distinct_queries[-1] if query_parameters[k].extra_query: target.update(query_parameters[k].extra_query) if callable(req_keys): # callable request key for complex queries not managed on GET continue elif type(req_keys) not in (list, tuple): target[req_keys] = val continue # multiple choice target reqs = Q(**{req_keys[0]: val}) for req_key in req_keys[1:]: q = Q(**{req_key: val}) reqs |= q and_reqs.append(reqs) pinned_search = "" base_keys = ["extras", "and_reqs", "exc_and_reqs"] if my_base_request and isinstance(my_base_request, dict): base_keys += list(my_base_request) whole_bool_fields = my_bool_fields[:] + my_reversed_bool_fields[:] if reversed_many_counted_fields: whole_bool_fields += reversed_many_counted_fields[:] has_a_search = any( k for k in dct.keys() if k not in base_keys and ( dct[k] and (k not in whole_bool_fields or dct[k] not in ("1", "unknown")) ) ) # manage default and pinned search and not bookmark if ( not has_a_search and request and not request_items.get("search_vector", "") and not request_items.get("submited", "") and full != "shortcut" ): if data_type == "csv" and func_name and func_name in request.session: dct = request.session[func_name] else: # default search dct, pinned_search = _manage_default_search( dct, request, model, default_name, my_base_request, my_relative_session_names, ) elif func_name and request and hasattr(request, "session"): request.session[func_name] = dct for k in request_keys: if k not in query_parameters: query_parameters[k] = SearchAltName(k, request_keys[k]) related_name_fields = [query_parameters[k].related_name for k in query_parameters if query_parameters[k].related_name] # manage own filters own_key = None if ishtaruser and ishtaruser.is_ishtaradmin: # admin only... # force own POV - used by account sheet for key in ("view_own", "change_own", "delete_own"): if key in dct_request_items: own = True own_key = key break query_own = None if own: # TODO: verify alt_query_own """ if alt_query_own: query_own = getattr(model, alt_query_own)(q.all()[0]) else: query_own = model.get_query_owns(q.all()[0]) print(query_own) # TODO - get old request to transform them """ if not own_key: form_permission = dct_request_items.get("form_permission", "view") if form_permission == "modification": own_key = "change_own" elif form_permission == "deletion": own_key = "delete_own" else: own_key = "view_own" if own_key in dct_request_items: user_pk = dct_request_items[own_key] else: user_pk = request.user.pk if request else ishtaruser.pk codename = f"{own_key}_{model._meta.model_name}" q = UserObjectPermission.objects.filter( user_id=user_pk, permission__codename=codename, content_type=ContentType.objects.get_for_model(model) ) query_own = Q( pk__in=[int(pk) for pk in q.values_list("object_pk", flat=True)] ) items = None for split_type, sub_dct in split_dict(dct): query, exc_query, extras = main_manager( request, model, query_own, full, sub_dct, distinct_queries, query_parameters, my_relation_types_prefix, my_bool_fields, my_reversed_bool_fields, related_name_fields, many_counted_fields, reversed_many_counted_fields, my_dated_fields, my_number_fields, and_reqs[:] ) # print("ishtar_common/views_item.py - 2745") # print(f"query: {query}", f"distinct_queries: {distinct_queries}", # f"base_query: {base_query}", f"exc_query: {exc_query}", # f"extras: {extras}") sub_items = model.objects.filter(query) for d_q in distinct_queries: sub_items = sub_items.filter(d_q) if base_query: sub_items = sub_items.filter(base_query) if exc_query: sub_items = sub_items.exclude(exc_query) for extra in extras: sub_items = sub_items.extra(**extra) if not items: items = sub_items else: if not sub_items.exists(): if split_type == "AND": items = model.objects.filter(pk__isnull=True) continue if split_type == "OR": items |= sub_items else: # in Django m2m queries use the same JOIN... # items &= sub_items do not work items &= model.objects.filter(Q( pk__in=list(sub_items.values_list("pk", flat=True)) )) if return_query: return items items = items.distinct() table_cols = _get_table_cols(request, data_type, own_table_cols, full, model) count_values = ["pk"] query_distinct_count = getattr(model, "QUERY_DISTINCT_COUNT", None) if query_distinct_count: for k, v in query_distinct_count.items(): if v in count_values: continue for col in table_cols: if isinstance(col, list): col = col[0] if col.startswith(k): count_values.append(v) break try: q = items.values(*count_values) items_nb = q.count() or 0 except ProgrammingError: items_nb = 0 if count: return items_nb # print(str(items.values("id").query)) if data_type == "json-stats": stats_modality_1 = request_items.get("stats_modality_1", None) stats_modality_2 = request_items.get("stats_modality_2", None) if ( not stats_modality_1 or stats_modality_1 not in model.STATISTIC_MODALITIES ): stats_modality_1 = model.STATISTIC_MODALITIES[0] if stats_modality_2 not in model.STATISTIC_MODALITIES: stats_modality_2 = None stats_sum_variable = request_items.get("stats_sum_variable", None) stats_sum_variable_keys = list(model.STATISTIC_SUM_VARIABLE.keys()) if ( not stats_sum_variable or stats_sum_variable not in stats_sum_variable_keys ): stats_sum_variable = stats_sum_variable_keys[0] multiply = model.STATISTIC_SUM_VARIABLE[stats_sum_variable][1] return _get_json_stats( items, stats_sum_variable, stats_modality_1, stats_modality_2, multiply=multiply, ) table_cols = [col if col != [] else '' for col in table_cols] query_table_cols = [] for idx, cols in enumerate(table_cols): if type(cols) not in (list, tuple): cols = [cols] for col in cols: query_table_cols += col.split("|") Document = apps.get_model("ishtar_common", "Document") # contextual (full, simple, etc.) col contxt = full and "full" or "simple" if ( hasattr(model, "CONTEXTUAL_TABLE_COLS") and contxt in model.CONTEXTUAL_TABLE_COLS ): for idx, col in enumerate(table_cols): if col in model.CONTEXTUAL_TABLE_COLS[contxt]: query_table_cols[idx] = model.CONTEXTUAL_TABLE_COLS[contxt][col] if data_type in ("json-image", "json-map") or full == "shortcut": if model.SLUG == "warehouse": query_table_cols.append("name") table_cols.append("name") else: query_table_cols.append("cached_label") table_cols.append("cached_label") if data_type == "json-image": prefix = "" if model != Document: prefix = "main_image__" query_table_cols.append(prefix + "thumbnail") table_cols.append(prefix + "thumbnail") query_table_cols.append(prefix + "image") table_cols.append(prefix + "image") elif data_type == "json-map": base_query_key = "main_geodata__" if model.SLUG == "find": base_query_key = "base_finds__" + base_query_key query_table_cols += [base_query_key + "cached_x", base_query_key + "cached_y"] table_cols += [base_query_key + "cached_x", base_query_key + "cached_y"] if display_polygon: query_table_cols += [base_query_key + "multi_line", base_query_key + "multi_polygon"] # manage sort tables manual_sort_key = None sorts = {} for k in request_items: if not k.startswith("order["): continue num = int(k.split("]")[0][len("order["):]) if num not in sorts: sorts[num] = ["", ""] # sign, col_num if k.endswith("[dir]"): order = request_items[k] sign = order and order == "desc" and "-" or "" sorts[num][0] = sign if k.endswith("[column]"): sorts[num][1] = request_items[k] sign = "" if not sorts: items = items.order_by("pk") else: orders = [] sort_keys = list(sorts.keys()) for idx in sorted(sorts.keys()): signe, col_num = sorts[idx] col_num = int(col_num) # id or link col if col_num < 2 and len(sort_keys) <= 2: orders.append("pk") continue if (col_num - 2) >= len(query_table_cols): break k = query_table_cols[col_num - 2] if k in request_keys: ks = request_keys[k] if type(ks) not in (tuple, list): ks = [ks] for k in ks: if k.endswith("__pk"): k = k[: -len("__pk")] + "__label" if k.endswith("towns"): k = k + "__cached_label" if ( k.endswith("__icontains") or k.endswith("__contains") or k.endswith("__iexact") or k.endswith("__exact") ): k = "__".join(k.split("__")[:-1]) # if '__' in k: # k = k.split('__')[0] orders.append(signe + k) else: # not a standard request key if idx: # not the first - we ignore this sort continue sign = signe manual_sort_key = k logger.warning( "**WARN get_item - {}**: manual sort key '{}'".format( func_name, k ) ) break if not manual_sort_key: items = items.order_by(*orders) # pager management start, end = 0, None page_nb = 1 if row_nb and data_type.startswith("json"): try: start = int(request_items.get("start")) page_nb = start // row_nb + 1 if page_nb < 1: raise ValueError("Page number is not relevant.") except (TypeError, ValueError, AssertionError): start = 0 page_nb = 1 end = int(page_nb * row_nb) if full == "shortcut": start = 0 end = 20 if callback: slice_query = None if not manual_sort_key: slice_query = (start, end) callback("get_item", request, data_type, items, slice_query) if manual_sort_key: items = items.all() else: items = items[start:end] if old: items = [item.get_previous(old) for item in items] if data_type == "json-map": if display_polygon: geo_fields = query_table_cols[-4:] else: geo_fields = query_table_cols[-2:] datas = _get_data_from_query( items, query_table_cols, my_extra_request_keys, geo_fields=geo_fields ) elif data_type != "csv" and getattr(model, "NEW_QUERY_ENGINE", False): datas = _get_data_from_query(items, query_table_cols, my_extra_request_keys) else: datas = _get_data_from_query_old( items, query_table_cols, request, my_extra_request_keys, do_not_deduplicate, ) if manual_sort_key: # +1 because the id is added as a first col idx_col = None if manual_sort_key in query_table_cols: idx_col = query_table_cols.index(manual_sort_key) + 1 else: for idx, col in enumerate(query_table_cols): if type(col) in (list, tuple) and manual_sort_key in col: idx_col = idx + 1 if idx_col is not None: null_value = None for d in datas: if isinstance(d[idx_col], (int, float)): null_value = 0 break if isinstance(d[idx_col], str): null_value = "" break if isinstance(d[idx_col], datetime.date): null_value = datetime.date(1, 1, 1) break if isinstance(d[idx_col], datetime.datetime): null_value = datetime.datetime(1, 1, 1) break if not null_value: null_value = "" datas = sorted( datas, key=lambda x: x[idx_col] if x[idx_col] is not None else null_value ) if sign == "-": datas = reversed(datas) datas = list(datas)[start:end] link_template = ( "" '' ) link_ext_template = '{}' lock = ' ' own_lock = ' ' has_locks = hasattr(model, "locked") current_user_id = request and request.user and request.user.id if data_type.startswith("json"): rows = [] if data_type == "json-map": curl = reverse("show-" + default_name, args=[999999, ""]) if not curl.endswith("/"): curl += "/" lnk = link_template.format(curl) lnk = lnk.replace("999999", "") if not has_locks: lnk = lnk.replace("", "") data = json.dumps(_format_geojson(datas, lnk, display_polygon)) return HttpResponse(data, content_type="application/json") for data in datas: res = { "id": data[0], } if not no_link: try: curl = reverse("show-" + default_name, args=[data[0], ""]) except NoReverseMatch: try: curl = reverse("show-" + default_name, args=[data[0]]) except NoReverseMatch: logger.warning( '**WARN "show-' + default_name + '" args (' + str(data[0]) + ") url not available" ) curl, lnk = "", "" if curl: if not curl.endswith("/"): curl += "/" lnk_template = link_template lnk = lnk_template.format(curl) if has_locks and data[-2]: if data[-1] == current_user_id: lnk = lnk.replace("", own_lock) else: lnk = lnk.replace("", lock) else: lnk = lnk.replace("", "") res["link"] = lnk for idx, value in enumerate(data[1:]): if not value or idx >= len(table_cols): continue table_col = table_cols[idx] if type(table_col) not in (list, tuple): table_col = [table_col] tab_cols = [] # foreign key may be divided by "." or "__" for tc in table_col: if "." in tc: tab_cols += tc.split(".") elif "__" in tc: tab_cols += tc.split("__") else: tab_cols.append(tc) k = "__".join(tab_cols) if k.endswith("__image") or k.endswith("__thumbnail"): if ( not value.startswith(settings.MEDIA_ROOT) and not value.startswith("http://") and not value.startswith("https://") ): value = settings.MEDIA_URL + value if hasattr(model, "COL_LINK") and k in model.COL_LINK: value = link_ext_template.format(value, value) if isinstance(value, datetime.date): value = value.strftime("%Y-%m-%d") if isinstance(value, datetime.datetime): value = value.strftime("%Y-%m-%d %H:%M:%S") res[k] = value if full == "shortcut": if "cached_label" in res: res["value"] = res.pop("cached_label") elif "name" in res: res["value"] = res.pop("name") rows.append(res) # v4.0 patch if getattr(model, "SELECT_GROUP_BY", False): new_rows = OrderedDict() for row in rows: idx = row["id"] if idx in new_rows: for key in row: if row[key] == new_rows[idx][key]: continue new_rows[idx][key] += " ; " + row[key] else: new_rows[idx] = row rows = [row for __, row in new_rows.items()] if full == "shortcut": data = json.dumps(rows) else: total = ( (items_nb // row_nb + (1 if items_nb % row_nb else 0)) if row_nb else items_nb ) data = json.dumps( { "recordsTotal": items_nb, "recordsFiltered": items_nb, "rows": rows, "table-cols": table_cols, "pinned-search": pinned_search, "page": page_nb, "total": total, } ) return HttpResponse(data, content_type="application/json") elif data_type == "csv": response = HttpResponse(content_type="text/csv", charset=ENCODING) n = timezone.now() filename = "%s_%s.csv" % (default_name, n.strftime("%Y%m%d-%H%M%S")) response["Content-Disposition"] = "attachment; filename=%s" % filename writer = csv.writer(response, **CSV_OPTIONS) if col_names: col_names = [name for name in col_names] else: col_names = [] for field_name in table_cols: if type(field_name) in (list, tuple): field_name = " & ".join(field_name) if hasattr(model, "COL_LABELS") and field_name in model.COL_LABELS: field = model.COL_LABELS[field_name] col_names.append(str(field)) continue else: try: field = model._meta.get_field(field_name) except: col_names.append("") logger.warning( "**WARN get_item - csv export**: no col name " "for {}\nadd explicit label to " "COL_LABELS attribute of " "{}".format(field_name, model) ) continue col_names.append(str(field.verbose_name)) writer.writerow(col_names) for data in datas: row, delta = [], 0 # regroup cols with join "|" for idx, col_name in enumerate(table_cols): if len(data[1:]) <= idx + delta: break val = data[1:][idx + delta] ccol_name = col_name[0] if isinstance(col_name, (list, tuple)) else col_name if ccol_name and "|" in ccol_name: nb_sub_cols = len(ccol_name.split("|")) for delta_idx in range(nb_sub_cols - 1): delta += 1 val += data[1:][idx + delta] row.append(val) try: writer.writerow(row) except UnicodeEncodeError: vals = [] for v in row: try: vals.append(v.encode(ENCODING).decode(ENCODING)) except UnicodeEncodeError: vals.append(unidecode(v).encode(ENCODING).decode(ENCODING)) writer.writerow(vals) return response return HttpResponse("{}", content_type="text/plain") return func def adapt_distant_search(params, src, model): if "search_vector" in params and params["search_vector"]: search_vector = params["search_vector"][0] match = RE_FACET.search(search_vector) final_search_vector = "" ApiKeyMatch = apps.get_model("ishtar_common", "ApiKeyMatch") while match: key, value, __ = match.groups() q = ApiKeyMatch.objects.filter( source=src, search_model__model__iexact=model, search_keys__contains=[key], local_label=value, ) up, down = match.span() if q.count(): api_key = q.all()[0] final_search_vector += search_vector[0:up] final_search_vector += f'{key}="{api_key.distant_label};{value}" ' else: final_search_vector += search_vector[0:down] + " " search_vector = search_vector[down:] match = RE_FACET.search(search_vector) final_search_vector += search_vector params["search_vector"] = [final_search_vector] def get_distant_item(request, model, external_source_id, data_type=None): # TODO: verify/test check permissions ApiExternalSource = apps.get_model("ishtar_common", "ApiExternalSource") try: src = ApiExternalSource.objects.get(pk=external_source_id) except (ApiExternalSource.DoesNotExist, ValueError): return HttpResponse("{}", content_type="text/plain") url = src.url url += reverse(f"api-search-{model}") params = {k: v for k, v in dict(request.GET).items() if not k.startswith("columns")} params["submited"] = 1 params["data_type"] = "json" if data_type: params["data_type"] = data_type adapt_distant_search(params, src, model) default_keys = [ "draw", "order[", "start", "length", "search[", "_", "submited", "data_type", ] app = API_MAIN_MODELS[model] model_class = ContentType.objects.get(app_label=app, model=model).model_class() bool_fields = model_class.REVERSED_BOOL_FIELDS + model_class.BOOL_FIELDS + model_class.CALLABLE_BOOL_FIELDS is_empty_params = not any( k for k in params if not any(k for default_key in default_keys if not k.startswith(default_key)) and (k not in bool_fields or params.get(k) != ["unknown"]) ) pin_key = "pin-search-" + model if ( is_empty_params and pin_key in request.session and request.session[pin_key] ): # a search is pinned params["search_vector"] = request.session[pin_key] try: response = requests.get( url, params=params, timeout=20, headers={"Authorization": f"Token {src.key}"}, ) except requests.exceptions.Timeout: return HttpResponse("{}", content_type="text/plain") dct = response.json() if "rows" in dct: for row in dct["rows"]: if "id" in row: try: idx = int(row["id"]) except ValueError: continue source_id = f"source-{external_source_id}-{idx}" row["id"] = source_id if "link" in row: row["link"] = row["link"].replace(str(idx), source_id) return HttpResponse(json.dumps(dct), content_type="application/json") def external_export(request, source_id, model_name, slug): external_sources = request.session.get("EXTERNAL_SOURCES", {}) url = None for source in external_sources: try: src_id, __ = source.split("||") src_id = int(src_id) except ValueError: continue if src_id != source_id: continue for model in external_sources[source]: if model_name == model.split("-")[-1]: url = reverse("api-export-" + model_name, args=[slug]) if not url: return HttpResponse('Unauthorized', status=401) ApiExternalSource = apps.get_model("ishtar_common", "ApiExternalSource") try: src = ApiExternalSource.objects.get(pk=source_id) except (ApiExternalSource.DoesNotExist, ValueError): return HttpResponse('Unauthorized', status=401) url = src.url + url try: response = requests.get( url, params=request.GET, timeout=20, headers={"Authorization": f"Token {src.key}"}, ) except requests.exceptions.Timeout: return HttpResponse('Gateway Timeout', status=504) if response.status_code != 200: lbl = { 401: "Unauthorized", 404: "Page not found", 500: "Server error", } lbl = lbl[response.status_code] \ if response.status_code in lbl else "Unknown error" return HttpResponse(lbl, status=response.status_code) response = HttpResponse(response.text, content_type="text/csv") n = timezone.now() filename = f"{model_name}-{n.strftime('%Y%m%d-%H%M%S')}.csv" response["Content-Disposition"] = "attachment; filename=%s" % filename return response