#!/usr/bin/env python3 # -*- coding: utf-8 -*- from collections import OrderedDict from copy import copy, deepcopy import csv import datetime import json import logging import re import requests # nosec: no user input used import subprocess # nosec from tempfile import NamedTemporaryFile import unidecode from django.conf import settings from django.contrib.auth.models import Permission from django.contrib.contenttypes.models import ContentType from django.contrib.gis.geos import GEOSException from django.contrib.staticfiles.templatetags.staticfiles import static from django.core.cache import cache from django.core.exceptions import ObjectDoesNotExist from django.db.models import ( F, Q, Count, Sum, ImageField, ExpressionWrapper, FloatField, FileField, ) from django.db.models.fields import FieldDoesNotExist from django.db.models.functions import ExtractYear from django.db.utils import ProgrammingError from django import forms from django.forms.models import model_to_dict from django.http import HttpResponse, Http404 from django.shortcuts import render from django.template import loader from django.urls import reverse, NoReverseMatch from django.utils.translation import ( ugettext, ugettext_lazy as _, activate, deactivate, pgettext_lazy, ) from tidylib import tidy_document as tidy from unidecode import unidecode from weasyprint import HTML, CSS from weasyprint.fonts import FontConfiguration from ishtar_common.utils import ( check_model_access_control, CSV_OPTIONS, get_all_field_names, Round, PRIVATE_FIELDS, ) from ishtar_common.models import get_current_profile, GeneralType, SearchAltName from ishtar_common.models_common import HistoryError from .menus import Menu from . import models, models_rest from archaeological_files.models import File from archaeological_operations.models import ( Operation, ArchaeologicalSite, AdministrativeAct, ) from archaeological_context_records.models import ContextRecord from archaeological_finds.models import Find, FindBasket, Treatment, TreatmentFile from archaeological_warehouse.models import Warehouse, Container logger = logging.getLogger(__name__) ENCODING = settings.ENCODING or "utf-8" CURRENT_ITEM_KEYS = ( ("file", File), ("operation", Operation), ("site", ArchaeologicalSite), ("contextrecord", ContextRecord), ("warehouse", Warehouse), ("container", Container), ("find", Find), ("findbasket", FindBasket), ("treatmentfile", TreatmentFile), ("treatment", Treatment), ("administrativeact", AdministrativeAct), ("administrativeactop", AdministrativeAct), ("administrativeactfile", AdministrativeAct), ("administrativeacttreatment", AdministrativeAct), ("administrativeacttreatmentfile", AdministrativeAct), ) CURRENT_ITEM_KEYS_DICT = dict(CURRENT_ITEM_KEYS) def get_autocomplete_queries(request, label_attributes, extra=None): if not label_attributes: return [Q(pk__isnull=True)] base_q = request.GET.get("term") or "" queries = [] splited_q = base_q.split(" ") for value_prefix, query_suffix, query_endswith in ( ("", "__startswith", True), # starts with (" ", "__icontains", True), # contain a word which starts with ("", "__endswith", False), # ends with ("", "__icontains", False), ): # contains alt_queries = [None] if len(splited_q) == 1 and query_endswith: alt_queries = ["__endswith", None] for alt_query in alt_queries: query = Q() if extra: query = Q(**extra) for q in splited_q: if not q: continue sub_q = Q(**{label_attributes[0] + query_suffix: value_prefix + q}) if alt_query: sub_q &= Q(**{label_attributes[0] + alt_query: q}) for other_label in label_attributes[1:]: sub_q = sub_q | Q(**{other_label + query_suffix: value_prefix + q}) query = query & sub_q queries.append(query) return queries def get_autocomplete_item(model, extra=None): if not extra: extra = {} def func(request, current_right=None, limit=20): result = OrderedDict() for query in get_autocomplete_queries(request, ["cached_label"], extra=extra): objects = model.objects.filter(query).values("cached_label", "id")[:limit] for obj in objects: if obj["id"] not in list(result.keys()): result[obj["id"]] = obj["cached_label"] limit -= 1 if not limit: break if not limit: break data = json.dumps( [{"id": obj[0], "value": obj[1]} for obj in list(result.items())] ) return HttpResponse(data, content_type="text/plain") return func def check_permission(request, action_slug, obj_id=None): main_menu = Menu(request.user) main_menu.init() if action_slug not in main_menu.items: # TODO return True if obj_id: return main_menu.items[action_slug].is_available( request.user, obj_id, session=request.session ) return main_menu.items[action_slug].can_be_available( request.user, session=request.session ) def new_qa_item( model, frm, many=False, template="ishtar/forms/qa_new_item.html", page_name="" ): def func(request, parent_name, limits=""): model_name = model._meta.object_name not_permitted_msg = ugettext("Operation not permitted.") if not check_permission(request, "add_" + model_name.lower()): return HttpResponse(not_permitted_msg) slug = model.SLUG if model.SLUG == "site": slug = "archaeologicalsite" url_slug = "new-" + slug current_page_name = page_name[:] if not current_page_name: current_page_name = _("New %s" % model_name.lower()) dct = { "page_name": str(current_page_name), "url": reverse(url_slug, args=[parent_name]), "slug": slug, "parent_name": parent_name, "many": many, } if request.method == "POST": dct["form"] = frm(request.POST, limits=limits) if dct["form"].is_valid(): new_item = dct["form"].save(request.user) lbl = str(new_item) if not lbl and hasattr(new_item, "_generate_cached_label"): lbl = new_item._generate_cached_label() dct["new_item_label"] = lbl dct["new_item_pk"] = new_item.pk dct["parent_pk"] = parent_name if dct["parent_pk"] and "_select_" in dct["parent_pk"]: parents = dct["parent_pk"].split("_") dct["parent_pk"] = "_".join([parents[0]] + parents[2:]) return render(request, template, dct) else: dct["form"] = frm(limits=limits) return render(request, template, dct) return func def get_short_html_detail(model): def func(request, pk): model_name = model._meta.object_name not_permitted_msg = ugettext("Operation not permitted.") if not check_permission(request, "view_" + model_name.lower(), pk): return HttpResponse(not_permitted_msg) try: item = model.objects.get(pk=pk) except model.DoesNotExist: return HttpResponse(not_permitted_msg) html = item.get_short_html_detail() return HttpResponse(html) return func def modify_qa_item(model, frm): def func(request, parent_name="", pk=None): template = "ishtar/forms/qa_new_item.html" model_name = model._meta.object_name not_permitted_msg = ugettext("Operation not permitted.") if not check_permission(request, "change_" + model_name.lower(), pk): return HttpResponse(not_permitted_msg) slug = model.SLUG if model.SLUG == "site": slug = "archaeologicalsite" try: item = model.objects.get(pk=pk) except model.DoesNotExist: return HttpResponse(not_permitted_msg) url_slug = "modify-" + slug dct = { "page_name": str(_("Modify a %s" % model_name.lower())), "url": reverse(url_slug, args=[parent_name, pk]), "slug": slug, "modify": True, "parent_name": parent_name, } if request.method == "POST": dct["form"] = frm(request.POST) if dct["form"].is_valid(): new_item = dct["form"].save(request.user, item) lbl = str(new_item) if not lbl and hasattr(new_item, "_generate_cached_label"): lbl = new_item._generate_cached_label() dct["new_item_label"] = lbl dct["new_item_pk"] = new_item.pk dct["parent_pk"] = parent_name if dct["parent_pk"] and "_select_" in dct["parent_pk"]: parents = dct["parent_pk"].split("_") dct["parent_pk"] = "_".join([parents[0]] + parents[2:]) return render(request, template, dct) else: data = model_to_dict(item) for k in list(data.keys()): if data[k] and isinstance(data[k], list) and hasattr(data[k][0], "pk"): data[k] = [i.pk for i in data[k]] dct["form"] = frm(initial=data) return render(request, template, dct) return func def display_item(model, extra_dct=None, show_url=None): def func(request, pk, **dct): if show_url: dct["show_url"] = "/{}{}/".format(show_url, pk) else: dct["show_url"] = "/show-{}/{}/".format(model.SLUG, pk) return render(request, "ishtar/display_item.html", dct) return func def show_source_item(request, source_id, model, name, base_dct, extra_dct): try: __, source_id, external_id = source_id.split("-") source_id, external_id = int(source_id), int(external_id) except ValueError: raise Http404() models_rest.ApiExternalSource.objects.get() # TODO: check permissions try: src = models_rest.ApiExternalSource.objects.get(pk=source_id) except models_rest.ApiExternalSource.DoesNotExist: return HttpResponse("{}", content_type="text/plain") url = src.url if not url.endswith("/"): url += "/" url += f"api/get/{model.SLUG}/{external_id}/" try: response = requests.get( url, timeout=20, headers={"Authorization": f"Token {src.key}"}, ) except requests.exceptions.Timeout: return HttpResponse("{}", content_type="text/plain") item = response.json() dct = deepcopy(base_dct) if not item: return HttpResponse("{}", content_type="text/plain") item["is_external"] = True item["current_source"] = src.name dct["item"], dct["item_name"] = item, name dct["is_external"] = True dct["SHOW_GEO"] = False if extra_dct: dct.update(extra_dct(request, dct)) for perm in Permission.objects.filter( codename__startswith='view_').values_list("codename", flat=True).all(): dct["permission_" + perm] = False permissions = ["permission_view_document"] for p in permissions: dct[p] = True tpl = loader.get_template(f"ishtar/sheet_{name}_window.html") content = tpl.render(dct, request) return HttpResponse(content, content_type="application/xhtml") def show_item(model, name, extra_dct=None, model_for_perms=None): def func(request, pk, **dct): check_model = model if model_for_perms: check_model = model_for_perms allowed, own = check_model_access_control(request, check_model) if not allowed: return HttpResponse("", content_type="application/xhtml") q = model.objects if own: if not hasattr(request.user, "ishtaruser"): return HttpResponse("") query_own = model.get_query_owns(request.user.ishtaruser) if query_own: q = q.filter(query_own).distinct() doc_type = "type" in dct and dct.pop("type") url_name = ( "/".join(reverse("show-" + name, args=["0", ""]).split("/")[:-2]) + "/" ) profile = get_current_profile() dct["PROFILE"] = profile dct["CURRENCY"] = profile.currency dct["ENCODING"] = settings.ENCODING dct["DOT_GENERATION"] = settings.DOT_BINARY and profile.relation_graph dct["SHOW_GEO"] = profile.mapping dct["current_window_url"] = url_name date = None if "date" in dct: date = dct.pop("date") dct["sheet_id"] = f"{name}-{pk}" dct["window_id"] = f"{name}-{pk}-{datetime.datetime.now().strftime('%M%s')}" dct["window_id_underscore"] = dct["window_id"].replace("-", "_") if str(pk).startswith("source-"): return show_source_item(request, pk, model, name, dct, extra_dct) try: item = q.get(pk=pk) except (ObjectDoesNotExist, ValueError): return HttpResponse("") # list current perms for perm in Permission.objects.filter( codename__startswith='view_').values_list("codename", flat=True).all(): dct["permission_" + perm] = False dct["permission_change_own_document"] = False dct["permission_change_document"] = False if hasattr(request.user, "ishtaruser") and request.user.ishtaruser: cache_key = "{}-{}-{}".format( settings.PROJECT_SLUG, "current-perms", request.session.session_key, ) permissions = cache.get(cache_key) if permissions is None: permissions = [] profile = request.user.ishtaruser.person.current_profile if profile: for group in profile.profile_type.groups.all(): for permission in group.permissions.all(): permissions.append(permission.codename) cache.set(cache_key, permissions, settings.CACHE_TIMEOUT) for perm in permissions: dct["permission_" + perm] = True if hasattr(item, "history") and request.user.is_superuser: if date: try: date = datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%f") dct["IS_HISTORY"] = True if item.get_last_history_date() != date: item = item.get_previous(date=date) if item is None: raise ValueError("No previous history entry.") dct["previous"] = item._previous dct["next"] = item._next else: date = None except ValueError: return HttpResponse("", content_type="text/plain") if not date: historized = item.history.all() if historized: item.history_date = historized[0].history_date if len(historized) > 1: dct["previous"] = historized[1].history_date if ( doc_type in ("odt", "pdf") and hasattr(item, "qrcode") and (not item.qrcode or not item.qrcode.name) ): item.generate_qrcode(request=request) dct["item"], dct["item_name"] = item, name # add context if extra_dct: dct.update(extra_dct(request, item)) context_instance = deepcopy(dct) context_instance["output"] = "html" if hasattr(item, "history_object"): filename = item.history_object.associated_filename else: filename = item.associated_filename if doc_type == "odt" and settings.ODT_TEMPLATE: tpl = loader.get_template("ishtar/sheet_%s.html" % name) context_instance["output"] = "ODT" content = tpl.render(context_instance, request) tidy_options = { "output-xhtml": 1, "indent": 1, "tidy-mark": 0, "doctype": "auto", "add-xml-decl": 1, "wrap": 1, } html, errors = tidy(content, options=tidy_options) html = html.replace(" ", " ") html = re.sub("
]*)>\n", "", html) odt = NamedTemporaryFile() html_source = NamedTemporaryFile() with open(html_source.name, "w") as html_file: html_file.write(html) pandoc_args = [ "pandoc", "-f", "html", "-t", "odt", "-o", odt.name, html_source.name, ] try: # nosec: no user input subprocess.check_call( # nosec pandoc_args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) except subprocess.CalledProcessError: return HttpResponse(content, content_type="application/xhtml") response = HttpResponse( content_type="application/vnd.oasis.opendocument.text" ) response["Content-Disposition"] = "attachment; filename={}.odt".format( filename ) with open(odt.name, "rb") as odt_file: response.write(odt_file.read()) return response elif doc_type == "pdf": base_url = "/".join(request.build_absolute_uri().split("/")[0:3]) tpl = loader.get_template("ishtar/sheet_%s_pdf.html" % name) context_instance["output"] = "PDF" html = tpl.render(context_instance, request) font_config = FontConfiguration() css = CSS( string=""" @font-face { font-family: Gentium; src: url(%s); } body{ font-family: Gentium } pre { white-space: pre-wrap; } """ % (base_url + static("gentium/GentiumPlus-R.ttf")) ) css2 = CSS(filename=settings.STATIC_ROOT + "/media/style_basic.css") pdf = HTML(string=html, base_url=base_url).write_pdf( stylesheets=[css, css2], font_config=font_config ) response = HttpResponse(pdf, content_type="application/pdf") response["Content-Disposition"] = "attachment; filename=%s.pdf" % filename return response else: tpl = loader.get_template("ishtar/sheet_%s_window.html" % name) content = tpl.render(context_instance, request) return HttpResponse(content, content_type="application/xhtml") return func def revert_item(model): def func(request, pk, date, **dct): try: item = model.objects.get(pk=pk) date = datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%f") item.rollback(date) except (ObjectDoesNotExist, ValueError, HistoryError): return HttpResponse(None, content_type="text/plain") return HttpResponse("True", content_type="text/plain") return func HIERARCHIC_LEVELS = 5 HIERARCHIC_FIELDS = [ "periods", "period", "unit", "material_types", "material_type", "conservatory_state", "object_types", "source_type", ] def _get_values(request, val): if hasattr(val, "all"): # manage related objects vals = list(val.all()) else: vals = [val] new_vals = [] for v in vals: if callable(v): try: v = v() except TypeError: continue try: if ( not isinstance(v, (models.Person, models.Organization)) and hasattr(v, "url") and v.url ): v = ( (request.is_secure() and "https" or "http") + "://" + request.get_host() + v.url ) except ValueError: pass new_vals.append(v) return new_vals def _push_to_list(obj, current_group, depth): """ parse_parentheses helper function """ try: while depth > 0: current_group = current_group[-1] depth -= 1 except IndexError: # tolerant to parentheses mismatch pass if ( current_group and type(obj) in (str, str) and type(current_group[-1]) in (str, str) ): current_group[-1] += obj else: current_group.append(obj) true_strings = ["1", "true"] for language_code, language_lbl in settings.LANGUAGES: activate(language_code) true_strings.append(str(_("Yes")).lower()) true_strings.append(str(_("True")).lower()) deactivate() def is_true_string(val): val = str(val).lower().replace('"', "") if val in true_strings: return True def _parse_parentheses(s): """ Parse parentheses into list. (OA01 & (pierre | ciseau)) -> ["0A01 &", ["pierre | ciseau"]] """ groups = [] depth = 0 inside_quote = False for char in s: if char == '"': inside_quote = not inside_quote if not inside_quote: if char == "(": _push_to_list([], groups, depth) depth += 1 elif char == ")": if depth > 0: depth -= 1 else: _push_to_list(char, groups, depth) continue _push_to_list(char, groups, depth) # for non tolerant to parentheses mismatch check depth is equal to 0 return groups FORBIDDEN_CHAR = [":"] RESERVED_CHAR = ["|", "&"] RE_FACET = re.compile('([-a-zA-Z]+)="([^"]+)"(?:;"([^"]+)")*') def _parse_query_string( string, query_parameters, current_dct, exc_dct, extra_distinct_q ): string = string.strip().lower() match = RE_FACET.search(string) if match or "=" in string: base_term, queries = "", [] if match: for idx, gp in enumerate(match.groups()): if not idx: base_term = gp elif gp: queries.append('"{}"'.format(gp)) else: splited = string.split("=") if len(splited) == 2: base_term = splited[0] queries.append(splited[1]) if queries: excluded = base_term.startswith("-") if excluded: base_term = base_term[1:] if base_term in query_parameters: dct = current_dct term = query_parameters[base_term].search_query for query in queries: # callable request key for complex queries if callable(term): # !! distinct_queries not managed is_true = is_true_string(query) if excluded: is_true = not is_true cfltr, cexclude, cextra = term(is_true=is_true) if cfltr: if "and_reqs" not in dct: dct["and_reqs"] = [] dct["and_reqs"].append(cfltr) if cexclude: if "exc_and_reqs" not in dct: dct["exc_and_reqs"] = [] dct["exc_and_reqs"].append(cexclude) if cextra: dct["extras"].append(cextra) else: if query_parameters[base_term].distinct_query: extra_distinct_q.append({}) dct = extra_distinct_q[-1] if not query_parameters[base_term].distinct_query and excluded: dct = exc_dct if query_parameters[base_term].extra_query: dct.update(query_parameters[base_term].extra_query) if term in dct: dct[term] += ";" + query else: dct[term] = query if query_parameters[base_term].distinct_query: for k in dct: # clean " dct[k] = dct[k].replace('"', "") # distinct query wait for a query _manage_clean_search_field(dct) extra_distinct_q[-1] = ~Q(**dct) if excluded else Q(**dct) return "" for reserved_char in FORBIDDEN_CHAR: string = string.replace(reserved_char, "") if len(string) != 1: for reserved_char in RESERVED_CHAR: string = string.replace(reserved_char, "") if not string: return "" if string.endswith("*"): if len(string.strip()) == 1: return "" string = string[:-1] + ":*" elif string not in ("&", "|", "!", "-"): # like search by default string = string + ":*" if string.startswith("-"): if len(string.strip()) == 1: return "" string = "!" + string[1:] return string def _parse_parentheses_groups( groups, query_parameters, current_dct=None, exc_dct=None, extra_distinct_q=None ): """ Transform parentheses groups to query :param groups: groups to transform (list) :param query_parameters: query keys for facet search :param current_dct: query dict :param exc_dct: exclude query dict :param exc_dct: exclude query dict :return: query string, query dict, excluded query dict """ if not current_dct: current_dct = {} if not exc_dct: exc_dct = {} if not extra_distinct_q: extra_distinct_q = [] if type(groups) is not list: string = groups.strip() if string.startswith('"') and string.endswith('"') and string.count('"') == 2: string = string[1:-1] # split into many groups if spaces # do not split inside quotes current_index = 0 found = string.find('"', current_index) SEP = "?รง;?" # replace spaces inside quote with this characters previous_quote = None while found != -1: if previous_quote is not None: string = ( string[0:previous_quote] + string[previous_quote:found].replace(" ", SEP) + string[found:] ) previous_quote = None # SEP is larger than a space found = string.find('"', current_index) else: previous_quote = found current_index = found + 1 found = string.find('"', current_index) string_groups = [gp.replace(SEP, " ") for gp in string.split(" ")] if len(string_groups) == 1: return ( _parse_query_string( string_groups[0], query_parameters, current_dct, exc_dct, extra_distinct_q, ), current_dct, exc_dct, extra_distinct_q, ) return _parse_parentheses_groups( string_groups, query_parameters, current_dct, exc_dct, extra_distinct_q ) if not groups: # empty list return "", current_dct, exc_dct, extra_distinct_q query = "(" previous_sep, has_item = None, False for item in groups: q, current_dct, exc_dct, extra_distinct_q = _parse_parentheses_groups( item, query_parameters, current_dct, exc_dct, extra_distinct_q ) q = q.strip() if not q: continue if q in ("|", "&"): if previous_sep or not has_item: continue # multiple sep is not relevant previous_sep = q continue if has_item: query += previous_sep or " & " query += q has_item = True previous_sep = None query += ")" if query == "()": query = "" unaccent_query = unidecode(query) if unaccent_query != query: query = f"({query} | {unaccent_query})" return query, current_dct, exc_dct, extra_distinct_q def _search_manage_search_vector( model, dct, exc_dct, distinct_queries, query_parameters ): if ( "search_vector" not in dct or not model._meta.managed ): # is a view - no search_vector return dct, exc_dct, distinct_queries search_vector = dct["search_vector"] parentheses_groups = _parse_parentheses(search_vector) ( search_query, extra_dct, extra_exc_dct, extra_distinct_q, ) = _parse_parentheses_groups(parentheses_groups, query_parameters) dct.update(extra_dct) distinct_queries += extra_distinct_q exc_dct.update(extra_exc_dct) if not search_query: return dct, exc_dct, distinct_queries # remove inside parenthesis search_query = search_query.replace("(", "").replace(")", "").strip() if search_query: if "extras" not in dct: dct["extras"] = [] dct["extras"].append( { "where": [ f"{model._meta.db_table}.search_vector @@ (to_tsquery(%s, %s)) = true OR " + f"{model._meta.db_table}.search_vector @@ (to_tsquery('simple', %s)) = true OR " f"{model._meta.db_table}.search_vector::tsvector @@ %s::tsquery = true" ], "params": [settings.ISHTAR_SEARCH_LANGUAGE, search_query, search_query, search_query], } ) return dct, exc_dct, distinct_queries def _manage_bool_fields(model, bool_fields, reversed_bool_fields, dct, or_reqs): bool_fields = list(bool_fields) + list(reversed_bool_fields) for k in bool_fields: if k not in dct: continue elif dct[k] == "1" or dct[k] == "unknown": dct.pop(k) continue dct[k] = dct[k].replace('"', "") if dct[k] in ["2", "yes", str(_("Yes")).lower(), "True"]: dct[k] = True else: dct[k] = False if k in reversed_bool_fields: dct[k] = not dct[k] # check also for empty value with image field field_names = k.split("__") # TODO: can be improved in later version of Django try: c_field = model._meta.get_field(field_names[0]) for field_name in field_names[1:-1]: if not hasattr(c_field, "related_model") or not c_field.related_model: return c_field = c_field.related_model._meta.get_field(field_name) if k.endswith("__isnull") and ( isinstance(c_field, (ImageField, FileField)) or field_names[-2] == "associated_url" ): key = "__".join(k.split("__")[:-1]) if dct[k]: or_reqs.append((k, {key + "__exact": ""})) else: dct[key + "__regex"] = ".{1}.*" except FieldDoesNotExist: pass def _manage_many_counted_fields(fields, reversed_fields, dct, excluded_dct): bool_fields = list(fields or []) + list(reversed_fields or []) for k in bool_fields: if k not in dct: continue elif dct[k] == "1" or dct[k] == "unknown": dct.pop(k) continue dct[k] = dct[k].replace('"', "") dct[k] = True if dct[k] in ["2", "yes", str(_("Yes")).lower()] else None if reversed_fields and k in reversed_fields: dct[k] = True if not dct[k] else None if dct[k]: dct[k] = False else: dct.pop(k) excluded_dct[k] = False today_lbl = pgettext_lazy("key for text search", "today") TODAYS = ["today"] for language_code, language_lbl in settings.LANGUAGES: activate(language_code) TODAYS.append(str(today_lbl)) deactivate() def _manage_dated_fields(dated_fields, dct): keys = list(dct.keys()) for k in dated_fields: res = [j for j in keys if j.startswith(k)] if not res: continue k = res[0] if not dct[k]: dct.pop(k) continue value = dct[k].replace('"', "").strip() has_today = False for today in TODAYS: if value.startswith(today): base_date = datetime.date.today() value = value[len(today):].replace(" ", "") if value and value[0] in ("-", "+"): sign = value[0] try: days = int(value[1:]) except ValueError: days = 0 if days: if sign == "-": base_date = base_date - datetime.timedelta(days=days) else: base_date = base_date + datetime.timedelta(days=days) dct[k] = base_date.strftime("%Y-%m-%d") has_today = True break if has_today: continue items = [] if "/" in value: items = list(reversed(value.split("/"))) elif "-" in value: # already date formated items = value.split("-") if len(items) != 3: dct.pop(k) return try: dct[k] = datetime.datetime(*map(lambda x: int(x), items)).strftime( "%Y-%m-%d" ) except ValueError: dct.pop(k) def _clean_type_val(val): for prefix in GeneralType.PREFIX_CODES: val = val.replace(prefix, "") val = val.strip() if val.startswith('"') and val.endswith('"'): val = '"{}"'.format(val[1:-1].strip()) return val def _manage_facet_search(model, dct, and_reqs): if not hasattr(model, "general_types"): return general_types = model.general_types() hierarchic_fields = HIERARCHIC_FIELDS[:] if hasattr(model, "hierarchic_fields"): hierarchic_fields += model.hierarchic_fields() for base_k in general_types: if base_k in hierarchic_fields: # already managed continue if base_k.endswith("_id"): k = base_k else: k = base_k + "__pk" if k not in dct or not dct[k].startswith('"') or not dct[k].startswith('"'): continue val = _clean_type_val(dct.pop(k)) if '";"' in val: # OR request values = val.split(";") else: values = [val] reqs = None for val in values: if not val.endswith('"') or not val.startswith('"'): continue lbl_name = "__label__" try: rel = getattr(model, base_k).field.related_model if not hasattr(rel, "label") and hasattr(rel, "cached_label"): lbl_name = "__cached_label__" except AttributeError: pass suffix = ( "{}icontains".format(lbl_name) if "%" in val else "{}iexact".format(lbl_name) ) query = val[1:-1].replace("*", "") if not reqs: reqs = Q(**{base_k + suffix: query}) else: reqs |= Q(**{base_k + suffix: query}) if reqs: and_reqs.append(reqs) POST_PROCESS_REQUEST = getattr(model, "POST_PROCESS_REQUEST", None) if not POST_PROCESS_REQUEST: return for k in dct: if k in POST_PROCESS_REQUEST and dct[k]: dct[k] = getattr(model, POST_PROCESS_REQUEST[k])(dct[k].replace('"', "")) def _manage_hierarchic_fields(model, dct, and_reqs): hierarchic_fields = HIERARCHIC_FIELDS[:] if hasattr(model, "hierarchic_fields"): hierarchic_fields += model.hierarchic_fields() for reqs in dct.copy(): if type(reqs) not in (list, tuple): reqs = [reqs] for req in reqs: if req.endswith("areas__pk") or req.endswith("areas__label__iexact"): if req.endswith("pk"): suffix = "pk" elif req.endswith("label__iexact"): suffix = "label__iexact" else: continue val = _clean_type_val(dct.pop(req)) if val.startswith('"') and val.endswith('"'): val = val[1:-1] reqs = Q(**{req: val}) for idx in range(HIERARCHIC_LEVELS): req = req[: -(len(suffix))] + "parent__" + suffix q = Q(**{req: val}) reqs |= q and_reqs.append(reqs) # TODO: improve query with "IN ()"? continue if req.endswith("wcontainer_id") or req.endswith("wcontainer_ref_id"): val = _clean_type_val(dct.pop(req)).strip('"') if val.startswith('"') and val.endswith('"'): val = val[1:-1] vals = [v.replace('"', "") for v in val.split(";")] container_ids = [] for val in vals: q = Container.objects.filter(cached_label__iexact=val).values_list( "id", flat=True) if not q.count(): continue container_id = q.all()[0] container_ids.append(container_id) req = req[1:] # remove "w" main_req = Q(**{req + "__in": container_ids}) and_reqs.append(main_req) if req.endswith("precise_town_id"): val = _clean_type_val(dct.pop(req)).strip('"') if val.startswith('"') and val.endswith('"'): val = val[1:-1] vals = [v.replace('"', "") for v in val.split(";")] town_ids = [] for val in vals: q = models.Town.objects.filter(cached_label__iexact=val).values_list( "id", flat=True) if not q.count(): continue town_id = q.all()[0] town_ids.append(town_id) reqs = Q(**{req: val}) for rel_query in ("parents__", "children__"): for idx in range(HIERARCHIC_LEVELS): k = rel_query * (idx + 1) + "pk" q = models.Town.objects.filter( **{k: town_id}).values_list("id", flat=True) if not q.count(): break town_ids += list(q.all()) main_req = Q(**{req + "__in": town_ids}) and_reqs.append(main_req) elif ( req.endswith("town__pk") or req.endswith("towns__pk") or req.endswith("town__cached_label__iexact") or req.endswith("towns__cached_label__iexact") ): if req.endswith("pk"): suffix = "pk" elif req.endswith("cached_label__iexact"): suffix = "cached_label__iexact" else: continue val = _clean_type_val(dct.pop(req)).strip('"') if val.startswith('"') and val.endswith('"'): val = val[1:-1] vals = [v.replace('"', "") for v in val.split(";")] main_req = None for val in vals: reqs = Q(**{req: val}) nreq = base_req = req[:] for idx in range(HIERARCHIC_LEVELS): nreq = nreq[: -(len(suffix))] + "parents__" + suffix q = Q(**{nreq: val}) reqs |= q nreq = base_req[:] for idx in range(HIERARCHIC_LEVELS): nreq = nreq[: -(len(suffix))] + "children__" + suffix q = Q(**{nreq: val}) reqs |= q if not main_req: main_req = reqs else: main_req |= reqs and_reqs.append(main_req) # TODO: improve query with "IN ()"? continue for k_hr in hierarchic_fields: lbl_name = "label" if hasattr(model, k_hr): rel = getattr(model, k_hr).field.related_model if not hasattr(rel, "label") and hasattr(rel, "cached_label"): lbl_name = "cached_label" if type(req) in (list, tuple): val = dct.pop(req) val = _clean_type_val(val) q = None for idx, r in enumerate(req): r = _clean_type_val(r) if not idx: q = Q(**{r: val}) else: q |= Q(**{r: val}) and_reqs.append(q) break elif req.endswith(k_hr + "__pk") or req.endswith( k_hr + "__{}__iexact".format(lbl_name) ): val = _clean_type_val(dct.pop(req)) if '";"' in val: # OR request values = val.split(";") else: values = [val] base_req = req[:] reqs = None if req.endswith("pk"): base_suffix = "pk" elif req.endswith("{}__iexact".format(lbl_name)): base_suffix = lbl_name + "__iexact" else: continue for val in values: suffix = base_suffix[:] req = base_req[:] if val.endswith('"') and val.startswith('"'): val = val[1:-1] # manage search text by label if "*" in val: suffix = lbl_name + "__icontains" val = val.replace("*", "") else: suffix = lbl_name + "__iexact" req = req[: -(len(base_suffix))] + suffix if not reqs: reqs = Q(**{req: val}) else: reqs |= Q(**{req: val}) for idx in range(HIERARCHIC_LEVELS): req = req[: -(len(suffix))] + "parent__" + suffix q = Q(**{req: val}) reqs |= q # TODO: improve query with "IN ()"? if reqs: and_reqs.append(reqs) break def _manage_clean_search_field(dct, exclude=None, reverse=False): for k in list(dct.keys()): # clean quoted search field if type(dct[k]) != str: continue dct[k] = dct[k].replace('"', "") dct[k] = _clean_type_val(dct[k]) if "*" not in dct[k] or not k.endswith("__iexact"): continue value = dct.pop(k).strip() base_key = k[:-len("__iexact")] if value == "*": if not reverse: dct[base_key + "__isnull"] = False if exclude is not None: exclude[base_key + "__exact"] = "" continue if value.startswith("*"): value = value[1:] if value.endswith("*"): value = value[:-1] if value: dct[base_key + "__icontains"] = value elif exclude is not None: exclude[base_key + "__exact"] = "" def _get_relation_type_dict(my_relation_types_prefix, dct): relation_types = {} for rtype_key in my_relation_types_prefix: relation_types[my_relation_types_prefix[rtype_key]] = set() for rtype_key in my_relation_types_prefix: for keys in list(dct.keys()): if type(keys) not in (list, tuple): keys = [keys] for k in keys: if k.startswith(rtype_key): relation_types[my_relation_types_prefix[rtype_key]].add( dct.pop(k) ) return relation_types def _manage_relation_types(relation_types, dct, query, or_reqs): for rtype_prefix in relation_types: vals = relation_types[rtype_prefix] if not vals: continue vals = list(vals)[0].split(";") alt_dct = {} for v in vals: alt_dct = { rtype_prefix + "right_relations__relation_type__label__iexact": v.replace('"', "") } for k in dct: val = dct[k] if rtype_prefix: # only get conditions related to the object if rtype_prefix not in k: continue # tricky: reconstruct the key to make sense - remove the # prefix from the key k = ( k[0 : k.index(rtype_prefix)] + k[k.index(rtype_prefix) + len(rtype_prefix) :] ) if k.endswith("year"): k += "__exact" alt_dct[rtype_prefix + "right_relations__right_record__" + k] = val query |= Q(**alt_dct) for k, or_req in or_reqs: altor_dct = alt_dct.copy() altor_dct.pop(k) for j in or_req: val = or_req[j] if j == "year": j = "year__exact" altor_dct[rtype_prefix + "right_relations__right_record__" + j] = val query |= Q(**altor_dct) return query def _construct_query(relation_types, dct, or_reqs, and_reqs, excluded_relation=False): # excluded -> reverse logic if excluded_relation: and_reqs, or_reqs = or_reqs, and_reqs for key in dct: if isinstance(dct[key], str): values = [v for v in dct[key].split(";") if v] else: values = [dct[key]] if not values: values = [""] # filter empty value for value in values: or_reqs.append((key, {key: value})) dct = {} # manage multi value not already managed for key in list(dct.keys()): if isinstance(dct[key], str) and ";" in dct[key]: values = [v for v in dct[key].split(";") if v] if not values: dct.pop(key) continue dct[key] = values[0] if len(values) == 1: continue for v in values[1:]: or_reqs.append((key, {key: v})) for k in list(dct.keys()): if type(k) not in (list, tuple): continue first_key = k[0] value = dct[k][:] dct.pop(k) dct[first_key] = value for other_key in k[1:]: or_reqs.append((first_key, {other_key: value})) query = Q(**dct) for or_req in or_reqs: alt_dct = dct.copy() if isinstance(or_req, (tuple, list)): k, or_req = or_req if k in alt_dct: alt_dct.pop(k) alt_dct.update(or_req) query |= Q(**alt_dct) else: query |= (Q(**alt_dct) & Q(or_req)) query = _manage_relation_types(relation_types, dct, query, or_reqs) done = [] for and_req in and_reqs: str_q = str(and_req) if str_q in done: continue done.append(str_q) query = query & and_req return query def _manage_default_search( dct, request, model, default_name, my_base_request, my_relative_session_names ): pinned_search = "" pin_key = "pin-search-" + default_name base_request = my_base_request if isinstance(my_base_request, dict) else {} dct = {k: v for k, v in dct.items() if v} if pin_key in request.session and request.session[pin_key]: # a search is pinned pinned_search = request.session[pin_key] dct = {"search_vector": request.session[pin_key]} elif ( default_name in request.session and request.session[default_name] ): # an item is pinned value = request.session[default_name] if "basket-" in value: try: dct = {"basket__pk": request.session[default_name].split("-")[-1]} pinned_search = str(FindBasket.objects.get(pk=dct["basket__pk"])) except FindBasket.DoesNotExist: pass else: try: dct = {"pk": request.session[default_name]} pinned_search = '"{}"'.format(model.objects.get(pk=dct["pk"])) except model.DoesNotExist: pass elif dct == {k: v for k, v in base_request.items() if v}: if not hasattr(model, "UP_MODEL_QUERY"): logger.warning( "**WARN get_item**: - UP_MODEL_QUERY not defined for " "'{}'".format(model) ) else: # a parent item may be selected in the default menu for name, key in my_relative_session_names: if ( name in request.session and request.session[name] and "basket-" not in request.session[name] and name in CURRENT_ITEM_KEYS_DICT ): up_model = CURRENT_ITEM_KEYS_DICT[name] try: dct.update({key: request.session[name]}) up_item = up_model.objects.get(pk=dct[key]) if up_item.SLUG not in model.UP_MODEL_QUERY: logger.warning( "**WARN get_item**: - {} not in " "UP_MODEL_QUERY for {}'".format(up_item.SLUG, model) ) else: req_key, up_attr = model.UP_MODEL_QUERY[up_item.SLUG] pinned_search = '{}="{}"'.format( req_key, getattr(up_item, up_attr) ) break except up_model.DoesNotExist: pass return dct, pinned_search def _format_val(val): if val is None: return "" if type(val) == bool: if val: return str(_("True")) else: return str(_("False")) return str(val) def _format_geojson(rows, link_template, display_polygon): data = { "type": "FeatureCollection", "crs": {"type": "name", "properties": {"name": "EPSG:4326"}}, "link_template": link_template, "features": [], "no-geo": [], } if not rows: return data """ Columns: base: ['id', 'cached_label', 'main_geodata__cached_x', 'main_geodata__cached_y', 'point_x', 'point_y', 'locked', 'lock_user_id'] poly: ['id', 'cached_label', 'main_geodata__cached_x', 'main_geodata__cached_y', 'main_geodata__multi_line', 'main_geodata__multi_polygon', 'point_x', 'point_y', 'locked', 'lock_user_id'] """ delta = 2 if display_polygon else 0 for row in rows: properties = {"id": row[0], "name": row[1]} feature = None base_feature = { "type": "Feature", "properties": properties, } if display_polygon: if row[4]: # lines feature = base_feature feature["geometry"] = json.loads(row[4].geojson) elif row[5]: # polygons feature = base_feature feature["geometry"] = json.loads(row[5].geojson) if not feature: x, y = row[4 + delta], row[5 + delta] if not x or not y or x < -180 or x > 180 or y < -90 or y > 90: data["no-geo"].append(properties) continue feature = base_feature feature["geometry"] = {"type": "Point", "coordinates": [x, y]} data["features"].append(feature) return data def _get_data_from_query(items, query_table_cols, extra_request_keys, geo_fields=None): # TODO: manage data json field for query_keys in query_table_cols: if not isinstance(query_keys, (tuple, list)): query_keys = [query_keys] for query_key in query_keys: if query_key in extra_request_keys: # translate query term query_key = extra_request_keys[query_key] if isinstance(query_key, (list, tuple)): # only manage one level for display query_key = query_key[0] # clean for filtr in ("__icontains", "__contains", "__iexact", "__exact"): if query_key.endswith(filtr): query_key = query_key[: len(query_key) - len(filtr)] query_key.replace(".", "__") # class style to query values = ["id"] + query_table_cols if geo_fields: profile = get_current_profile() precision = profile.point_precision if precision is not None: exp_x = ExpressionWrapper( Round(geo_fields[0], precision), output_field=FloatField(), ) exp_y = ExpressionWrapper( Round(geo_fields[1], precision), output_field=FloatField(), ) else: exp_x = F(geo_fields[0]) exp_y = F(geo_fields[1]) items = items.annotate(point_x=exp_x) items = items.annotate(point_y=exp_y) values += ["point_x", "point_y"] if hasattr(items.model, "locked"): values.append("locked") values.append("lock_user_id") values = [v for v in values if v] # filter empty values return items.values_list(*values) def _get_data_from_query_old( items, query_table_cols, request, extra_request_keys, do_not_deduplicate=False ): c_ids, datas = [], [] has_lock = items and hasattr(items[0], "locked") for item in items: # manual deduplicate when distinct is not enough if not do_not_deduplicate and item.pk in c_ids: continue c_ids.append(item.pk) data = [item.pk] for keys in query_table_cols: if type(keys) not in (list, tuple): keys = [keys] my_vals = [] for k in keys: if k in extra_request_keys: k = extra_request_keys[k] if type(k) in (list, tuple): k = k[0] for filtr in ("__icontains", "__contains", "__iexact", "__exact"): if k.endswith(filtr): k = k[: len(k) - len(filtr)] vals = [item] # foreign key may be divided by "." or "__" splitted_k = [] for ky in k.split("."): if "__" in ky: splitted_k += ky.split("__") else: splitted_k.append(ky) if splitted_k[-1] == "count": splitted_k = splitted_k[:-2] + [splitted_k[-2] + "__count"] for ky in splitted_k: new_vals = [] for val in vals: if ky.endswith("__count"): new_vals += [getattr(val, ky[: -len("__count")]).count()] elif hasattr(val, "all"): # manage related objects val = list(val.all()) for v in val: v = getattr(v, ky) new_vals += _get_values(request, v) elif val and isinstance(val, dict): if ky in val: val = val[ky] new_vals += _get_values(request, val) elif val: try: val = getattr(val, ky) new_vals += _get_values(request, val) except (AttributeError, GEOSException): # must be a query key such as "contains" pass vals = new_vals # manage last related objects if vals and hasattr(vals[0], "all"): new_vals = [] for val in vals: new_vals += list(val.all()) vals = new_vals if not my_vals: my_vals = [_format_val(va) for va in vals] else: new_vals = [] if not vals: for idx, my_v in enumerate(my_vals): new_vals.append("{}{}{}".format(my_v, " - ", "")) else: for idx, v in enumerate(vals): new_vals.append( "{}{}{}".format(vals[idx], " - ", _format_val(v)) ) my_vals = new_vals[:] data.append(" & ".join(set(my_vals)) or "") if has_lock: data.append(item.locked) data.append(item.lock_user_id) datas.append(data) return datas def _format_modality(value): if value is None: value = str(_("Unknown")) if isinstance(value, bool): value = str(_(str(value))) return value def _get_json_stats_optimized( items, stats_sum_variable, stats_modality_1, stats_modality_2, multiply=1 ): q = items value_keys = [] for stat in (stats_modality_1, stats_modality_2): if not stat: continue if stat.endswith("__year"): q = q.annotate(**{stat: ExtractYear(stat[: -len("__year")])}) value_keys.append(stat) q = q.values(*value_keys) if stats_sum_variable == "pk": q = q.annotate(sum=Count("pk")) else: q = q.annotate(sum=Sum(stats_sum_variable)) data = [] if stats_modality_2 and stats_modality_2 != stats_modality_1: q = q.order_by(stats_modality_1, stats_modality_2) for values in q.all(): modality_1 = _format_modality(values[stats_modality_1]) if not data or data[-1][0] != modality_1: data.append([modality_1, []]) data[-1][1].append( ( _format_modality(values[stats_modality_2]), int((values["sum"] or 0) * multiply), ) ) else: q = q.order_by(stats_modality_1) for values in q.all(): modality_1 = values[stats_modality_1] data.append( [_format_modality(modality_1), int((values["sum"] or 0) * multiply)] ) data = json.dumps({"data": data}) return HttpResponse(data, content_type="application/json") def _get_json_stats( items, stats_sum_variable, stats_modality_1, stats_modality_2, multiply=1 ): # _get_json_stats_optimized should work # but problem on container with criteria on CVL q = items value_keys = [] for stat in (stats_modality_1, stats_modality_2): if not stat: continue if stat.endswith("__year"): q = q.annotate(**{stat: ExtractYear(stat[:-len("__year")])}) value_keys.append(stat) value_keys.append(stats_sum_variable) q = q.values(*value_keys) data = [] if stats_modality_2 and stats_modality_2 != stats_modality_1: q = q.order_by(stats_modality_1, stats_modality_2) results = {} for values in q.all(): value = values[stats_sum_variable] or 0 if stats_sum_variable == "pk": value = 1 modality_1 = _format_modality(values[stats_modality_1]) modality_2 = _format_modality(values[stats_modality_2]) key = (modality_1, modality_2) if key not in results: results[key] = 0 results[key] += value * multiply for key in results: modality_1, modality_2 = key if not data or data[-1][0] != modality_1: data.append([modality_1, []]) data[-1][1].append((modality_2, results[key])) else: q = q.order_by(stats_modality_1) results = {} for values in q.all(): value = values[stats_sum_variable] or 0 if stats_sum_variable == "pk": value = 1 modality_1 = _format_modality(values[stats_modality_1]) if modality_1 not in results: results[modality_1] = 0 results[modality_1] += value * multiply for modality_1 in results: data.append([modality_1, results[modality_1]]) data = json.dumps({"data": data}) return HttpResponse(data, content_type="application/json") DEFAULT_ROW_NUMBER = 10 # length is used by ajax DataTables requests EXCLUDED_FIELDS = ["length"] BASE_DATED_FIELDS = ["created", "last_modified"] def get_item( model, func_name, default_name, extra_request_keys=None, base_request=None, bool_fields=None, reversed_bool_fields=None, dated_fields=None, associated_models=None, relative_session_names=None, specific_perms=None, own_table_cols=None, relation_types_prefix=None, do_not_deduplicate=False, model_for_perms=None, alt_query_own=None, search_form=None, no_permission_check=False, ): """ Generic treatment of tables :param model: model used for query :param func_name: name of the function (used for session storage) :param default_name: key used for default search in session :param extra_request_keys: default query limitation :param base_request: :param bool_fields: :param reversed_bool_fields: :param dated_fields: :param associated_models: :param relative_session_names: :param specific_perms: :param own_table_cols: :param relation_types_prefix: :param do_not_deduplicate: duplication of id can occurs on large queryset a mechanism of deduplication is used. But duplicate ids can be normal (for instance for record_relations view). :param model_for_perms: use another model to check permission :param alt_query_own: name of alternate method to get query_own :param search_form: associated search form to manage JSON query keys :return: """ def func( request, data_type="json", full=False, force_own=False, col_names=None, no_link=False, no_limit=False, return_query=False, **dct, ): available_perms = [] if specific_perms: available_perms = specific_perms[:] EMPTY = "" if "type" in dct: data_type = dct.pop("type") if not data_type: data_type = "json" if "json" in data_type: EMPTY = "[]" if data_type not in ("json", "csv", "json-image", "json-map", "json-stats"): return HttpResponse(EMPTY, content_type="text/plain") if data_type == "json-stats" and len(model.STATISTIC_MODALITIES) < 2: return HttpResponse(EMPTY, content_type="text/plain") model_to_check = model if model_for_perms: model_to_check = model_for_perms if no_permission_check: allowed, own = True, False else: if return_query: allowed, own = True, False else: allowed, own = check_model_access_control( request, model_to_check, available_perms ) if not allowed: return HttpResponse(EMPTY, content_type="text/plain") if force_own: own = True if ( full == "shortcut" and "SHORTCUT_SEARCH" in request.session and request.session["SHORTCUT_SEARCH"] == "own" ): own = True query_own = None if own: q = models.IshtarUser.objects.filter(user_ptr=request.user) if not q.count(): return HttpResponse(EMPTY, content_type="text/plain") if alt_query_own: query_own = getattr(model, alt_query_own)(q.all()[0]) else: query_own = model.get_query_owns(q.all()[0]) query_parameters = {} if hasattr(model, "get_query_parameters"): query_parameters = model.get_query_parameters() # get defaults from model if not extra_request_keys and query_parameters: my_extra_request_keys = copy(model.EXTRA_REQUEST_KEYS) for key in query_parameters: my_extra_request_keys[key] = query_parameters[key].search_query else: my_extra_request_keys = copy(extra_request_keys or {}) if base_request is None and hasattr(model, "BASE_REQUEST"): if callable(model.BASE_REQUEST): my_base_request = model.BASE_REQUEST(request) else: my_base_request = copy(model.BASE_REQUEST) elif base_request is not None: my_base_request = copy(base_request) else: my_base_request = {} if not bool_fields and hasattr(model, "BOOL_FIELDS"): my_bool_fields = model.BOOL_FIELDS[:] else: my_bool_fields = bool_fields[:] if bool_fields else [] if not reversed_bool_fields and hasattr(model, "REVERSED_BOOL_FIELDS"): my_reversed_bool_fields = model.REVERSED_BOOL_FIELDS[:] else: my_reversed_bool_fields = ( reversed_bool_fields[:] if reversed_bool_fields else [] ) many_counted_fields = getattr(model, "MANY_COUNTED_FIELDS", None) reversed_many_counted_fields = getattr( model, "REVERSED_MANY_COUNTED_FIELDS", None ) if not dated_fields and hasattr(model, "DATED_FIELDS"): my_dated_fields = model.DATED_FIELDS[:] else: my_dated_fields = dated_fields[:] if dated_fields else [] my_dated_fields += BASE_DATED_FIELDS if not associated_models and hasattr(model, "ASSOCIATED_MODELS"): my_associated_models = model.ASSOCIATED_MODELS[:] else: my_associated_models = associated_models[:] if associated_models else [] if not relative_session_names and hasattr(model, "RELATIVE_SESSION_NAMES"): my_relative_session_names = model.RELATIVE_SESSION_NAMES[:] else: my_relative_session_names = ( relative_session_names[:] if relative_session_names else [] ) if not relation_types_prefix and hasattr(model, "RELATION_TYPES_PREFIX"): my_relation_types_prefix = copy(model.RELATION_TYPES_PREFIX) else: my_relation_types_prefix = ( copy(relation_types_prefix) if relation_types_prefix else {} ) fields = [model._meta.get_field(k) for k in get_all_field_names(model)] request_keys = dict( [ ( field.name, field.name + ( hasattr(field, "remote_field") and field.remote_field and "__pk" or "" ), ) for field in fields ] ) # add keys of associated models to available request key for associated_model, key in my_associated_models: if type(associated_model) == str: if associated_model not in globals(): continue associated_model = globals()[associated_model] associated_fields = [ associated_model._meta.get_field(k) for k in get_all_field_names(associated_model) ] request_keys.update( dict( [ ( key + "__" + field.name, key + "__" + field.name + (hasattr(field, "rel") and field.rel and "__pk" or ""), ) for field in associated_fields ] ) ) request_keys.update(my_extra_request_keys) # manage search on json fields and excluded fields if ( search_form and request and request.user and getattr(request.user, "ishtaruser", None) ): available, __, excluded_fields, json_fields = search_form.check_custom_form( request.user.ishtaruser ) # for now no manage on excluded_fields: should we prevent search on # some fields regarding the user concerned? if available: for __, jkey, jfield in json_fields: if jfield.alt_name not in request_keys: if isinstance( jfield, (forms.NullBooleanField, forms.BooleanField) ): my_bool_fields.append(jkey) request_keys[jfield.alt_name] = jkey else: request_keys[jfield.alt_name] = jkey + "__iexact" if "query" in dct: request_items = dct["query"] request_items["submited"] = True elif request.method == "POST": request_items = request.POST else: request_items = request.GET count = dct.get("count", False) # pager try: row_nb = int(request_items.get("length")) except (ValueError, TypeError): row_nb = DEFAULT_ROW_NUMBER if data_type == "json-map": # other limit for map row_nb = settings.ISHTAR_MAP_MAX_ITEMS if no_limit or ( data_type == "json-map" and request_items.get("no_limit", False) ): row_nb = None display_polygon = False if data_type == "json-map" and request_items.get("display_polygon", False): display_polygon = True dct_request_items = {} # filter requested fields for k in request_items: if k in EXCLUDED_FIELDS: continue key = k[:] if key.startswith("searchprefix_"): key = key[len("searchprefix_") :] dct_request_items[key] = request_items[k] request_items = dct_request_items base_query = None if isinstance(my_base_request, Q): base_query = my_base_request dct = {} else: dct = copy(my_base_request) excluded_dct = {} and_reqs, or_reqs = [], [] exc_and_reqs, exc_or_reqs = [], [] distinct_queries = [] dct["extras"], dct["and_reqs"], dct["exc_and_reqs"] = [], [], [] if full == "shortcut": if model.SLUG == "warehouse": key = "name__icontains" else: key = "cached_label__icontains" dct[key] = request.GET.get("term", None) try: old = "old" in request_items and int(request_items["old"]) except ValueError: return HttpResponse("[]", content_type="text/plain") selected_ids = request_items.get("selected_ids", None) if selected_ids: if "-" in selected_ids: q = Q(pk__in=selected_ids.split("-")) else: q = Q(pk=selected_ids) and_reqs.append(q) for k in request_keys: val = request_items.get(k) if not val: continue req_keys = request_keys[k] target = dct if k in query_parameters: if query_parameters[k].distinct_query: distinct_queries.append({}) target = distinct_queries[-1] if query_parameters[k].extra_query: target.update(query_parameters[k].extra_query) if callable(req_keys): # callable request key for complex queries not managed on GET continue elif type(req_keys) not in (list, tuple): target[req_keys] = val continue # multiple choice target reqs = Q(**{req_keys[0]: val}) for req_key in req_keys[1:]: q = Q(**{req_key: val}) reqs |= q and_reqs.append(reqs) pinned_search = "" base_keys = ["extras", "and_reqs", "exc_and_reqs"] if my_base_request and isinstance(my_base_request, dict): base_keys += list(my_base_request) whole_bool_fields = my_bool_fields[:] + my_reversed_bool_fields[:] if reversed_many_counted_fields: whole_bool_fields += reversed_many_counted_fields[:] has_a_search = any( k for k in dct.keys() if k not in base_keys and ( dct[k] and (k not in whole_bool_fields or dct[k] not in ("1", "unknown")) ) ) # manage default and pinned search and not bookmark if ( not has_a_search and not request_items.get("search_vector", "") and not request_items.get("submited", "") and full != "shortcut" ): if data_type == "csv" and func_name in request.session: dct = request.session[func_name] else: # default search dct, pinned_search = _manage_default_search( dct, request, model, default_name, my_base_request, my_relative_session_names, ) elif func_name and request and hasattr(request, "session"): request.session[func_name] = dct for k in request_keys: if k not in query_parameters: query_parameters[k] = SearchAltName(k, request_keys[k]) dct, excluded_dct, distinct_queries = _search_manage_search_vector( model, dct, excluded_dct, distinct_queries, query_parameters, ) search_vector = "" if "search_vector" in dct: search_vector = dct.pop("search_vector") # manage relations types if "relation_types" not in my_relation_types_prefix: my_relation_types_prefix["relation_types"] = "" relation_types = _get_relation_type_dict(my_relation_types_prefix, dct) exc_relation_types = _get_relation_type_dict(my_relation_types_prefix, excluded_dct) _manage_bool_fields( model, my_bool_fields, my_reversed_bool_fields, dct, or_reqs ) _manage_bool_fields( model, my_bool_fields, my_reversed_bool_fields, excluded_dct, exc_or_reqs ) tmp_excluded = {} _manage_many_counted_fields( many_counted_fields, reversed_many_counted_fields, dct, tmp_excluded ) _manage_many_counted_fields( many_counted_fields, reversed_many_counted_fields, excluded_dct, dct ) if tmp_excluded: excluded_dct.update(tmp_excluded) _manage_dated_fields(my_dated_fields, dct) _manage_dated_fields(my_dated_fields, excluded_dct) _manage_hierarchic_fields(model, dct, and_reqs) _manage_hierarchic_fields(model, excluded_dct, exc_and_reqs) _manage_facet_search(model, dct, and_reqs) _manage_facet_search(model, excluded_dct, exc_and_reqs) extras = [] if "extras" in dct: extras = dct.pop("extras") if "and_reqs" in dct: and_reqs += dct.pop("and_reqs") if "exc_and_reqs" in dct: exc_and_reqs += dct.pop("exc_and_reqs") updated_excluded = {} _manage_clean_search_field(dct, updated_excluded) _manage_clean_search_field(excluded_dct, dct, reverse=True) if updated_excluded: excluded_dct.update(updated_excluded) query = _construct_query(relation_types, dct, or_reqs, and_reqs) exc_query = None if excluded_dct or exc_and_reqs or exc_or_reqs or exc_relation_types: exc_query = _construct_query( exc_relation_types, excluded_dct, exc_or_reqs, exc_and_reqs, excluded_relation=True ) if query_own: query = query & query_own # manage hierarchic in shortcut menu if full == "shortcut": ASSOCIATED_ITEMS = { Operation: (File, "associated_file__pk"), ContextRecord: (Operation, "operation__pk"), Find: (ContextRecord, "base_finds__context_record__pk"), } if model in ASSOCIATED_ITEMS: upper_model, upper_key = ASSOCIATED_ITEMS[model] model_name = upper_model.SLUG current = model_name in request.session and request.session[model_name] if current: dct = {upper_key: current} query &= Q(**dct) # print(query, distinct_queries, base_query, exc_query, extras) items = model.objects.filter(query) for d_q in distinct_queries: items = items.filter(d_q) if base_query: items = items.filter(base_query) if exc_query: items = items.exclude(exc_query) for extra in extras: items = items.extra(**extra) if return_query: return items items = items.distinct() try: items_nb = items.values("pk").aggregate(Count("pk"))["pk__count"] except ProgrammingError: items_nb = 0 if count: return items_nb # print(str(items.values("id").query).encode('utf-8')) if search_vector: # for serialization dct["search_vector"] = search_vector # table cols if own_table_cols: table_cols = own_table_cols else: if full: table_cols = [ field.name for field in model._meta.fields if field.name not in PRIVATE_FIELDS ] table_cols += [ field.name for field in model._meta.many_to_many if field.name not in PRIVATE_FIELDS ] if hasattr(model, "EXTRA_FULL_FIELDS"): table_cols += model.EXTRA_FULL_FIELDS else: tb_key = (getattr(model, "SLUG", None), "TABLE_COLS") if tb_key in settings.TABLE_COLS: table_cols = settings.TABLE_COLS[tb_key] else: table_cols = model.TABLE_COLS if callable(table_cols): table_cols = table_cols() table_cols = list(table_cols) if data_type == "json-map": table_cols = [] # only pk for map elif data_type == "json-stats": stats_modality_1 = request_items.get("stats_modality_1", None) stats_modality_2 = request_items.get("stats_modality_2", None) if ( not stats_modality_1 or stats_modality_1 not in model.STATISTIC_MODALITIES ): stats_modality_1 = model.STATISTIC_MODALITIES[0] if stats_modality_2 not in model.STATISTIC_MODALITIES: stats_modality_2 = None stats_sum_variable = request_items.get("stats_sum_variable", None) stats_sum_variable_keys = list(model.STATISTIC_SUM_VARIABLE.keys()) if ( not stats_sum_variable or stats_sum_variable not in stats_sum_variable_keys ): stats_sum_variable = stats_sum_variable_keys[0] multiply = model.STATISTIC_SUM_VARIABLE[stats_sum_variable][1] return _get_json_stats( items, stats_sum_variable, stats_modality_1, stats_modality_2, multiply=multiply, ) query_table_cols = [] for idx, cols in enumerate(table_cols): if type(cols) not in (list, tuple): cols = [cols] for col in cols: query_table_cols += col.split("|") # contextual (full, simple, etc.) col contxt = full and "full" or "simple" if ( hasattr(model, "CONTEXTUAL_TABLE_COLS") and contxt in model.CONTEXTUAL_TABLE_COLS ): for idx, col in enumerate(table_cols): if col in model.CONTEXTUAL_TABLE_COLS[contxt]: query_table_cols[idx] = model.CONTEXTUAL_TABLE_COLS[contxt][col] if data_type in ("json-image", "json-map") or full == "shortcut": if model.SLUG == "warehouse": query_table_cols.append("name") table_cols.append("name") else: query_table_cols.append("cached_label") table_cols.append("cached_label") if data_type == "json-image": query_table_cols.append("main_image__thumbnail") table_cols.append("main_image__thumbnail") query_table_cols.append("main_image__image") table_cols.append("main_image__image") elif data_type == "json-map": base_query_key = "main_geodata__" if model.SLUG == "find": base_query_key = "base_finds__" + base_query_key query_table_cols += [base_query_key + "cached_x", base_query_key + "cached_y"] table_cols += [base_query_key + "cached_x", base_query_key + "cached_y"] if display_polygon: query_table_cols += [base_query_key + "multi_line", base_query_key + "multi_polygon"] # manage sort tables manual_sort_key = None sorts = {} for k in request_items: if not k.startswith("order["): continue num = int(k.split("]")[0][len("order[") :]) if num not in sorts: sorts[num] = ["", ""] # sign, col_num if k.endswith("[dir]"): order = request_items[k] sign = order and order == "desc" and "-" or "" sorts[num][0] = sign if k.endswith("[column]"): sorts[num][1] = request_items[k] sign = "" if not sorts: items = items.order_by("pk") else: orders = [] sort_keys = list(sorts.keys()) for idx in sorted(sorts.keys()): signe, col_num = sorts[idx] col_num = int(col_num) # id or link col if col_num < 2 and len(sort_keys) <= 2: orders.append("pk") continue k = query_table_cols[col_num - 2] if k in request_keys: ks = request_keys[k] if type(ks) not in (tuple, list): ks = [ks] for k in ks: if k.endswith("__pk"): k = k[: -len("__pk")] + "__label" if k.endswith("towns"): k = k + "__cached_label" if ( k.endswith("__icontains") or k.endswith("__contains") or k.endswith("__iexact") or k.endswith("__exact") ): k = "__".join(k.split("__")[:-1]) # if '__' in k: # k = k.split('__')[0] orders.append(signe + k) else: # not a standard request key if idx: # not the first - we ignore this sort continue sign = signe manual_sort_key = k logger.warning( "**WARN get_item - {}**: manual sort key '{}'".format( func_name, k ) ) break if not manual_sort_key: items = items.order_by(*orders) # pager management start, end = 0, None page_nb = 1 if row_nb and data_type.startswith("json"): try: start = int(request_items.get("start")) page_nb = start // row_nb + 1 if page_nb < 1: raise ValueError("Page number is not relevant.") except (TypeError, ValueError, AssertionError): start = 0 page_nb = 1 end = int(page_nb * row_nb) if full == "shortcut": start = 0 end = 20 if manual_sort_key: items = items.all() else: items = items[start:end] if old: items = [item.get_previous(old) for item in items] if data_type == "json-map": if display_polygon: geo_fields = query_table_cols[-4:] else: geo_fields = query_table_cols[-2:] datas = _get_data_from_query( items, query_table_cols, my_extra_request_keys, geo_fields=geo_fields ) elif data_type != "csv" and getattr(model, "NEW_QUERY_ENGINE", False): datas = _get_data_from_query(items, query_table_cols, my_extra_request_keys) else: datas = _get_data_from_query_old( items, query_table_cols, request, my_extra_request_keys, do_not_deduplicate, ) if manual_sort_key: # +1 because the id is added as a first col idx_col = None if manual_sort_key in query_table_cols: idx_col = query_table_cols.index(manual_sort_key) + 1 else: for idx, col in enumerate(query_table_cols): if type(col) in (list, tuple) and manual_sort_key in col: idx_col = idx + 1 if idx_col is not None: null_value = None for d in datas: if isinstance(d[idx_col], (int, float)): null_value = 0 break if isinstance(d[idx_col], str): null_value = "" break if isinstance(d[idx_col], datetime.date): null_value = datetime.date(1, 1, 1) break if isinstance(d[idx_col], datetime.datetime): null_value = datetime.datetime(1, 1, 1) break if not null_value: null_value = "" datas = sorted( datas, key=lambda x: x[idx_col] if x[idx_col] is not None else null_value ) if sign == "-": datas = reversed(datas) datas = list(datas)[start:end] link_template = ( "" '' ) link_ext_template = '{}' lock = ' ' own_lock = ' ' has_locks = hasattr(model, "locked") current_user_id = request.user and request.user.id if data_type.startswith("json"): rows = [] if data_type == "json-map": lnk = link_template.format( reverse("show-" + default_name, args=[999999, ""]), ) lnk = lnk.replace("999999", " ") if not has_locks: lnk = lnk.replace(" ", "") data = json.dumps(_format_geojson(datas, lnk, display_polygon)) return HttpResponse(data, content_type="application/json") for data in datas: res = { "id": data[0], } if not no_link: try: lnk_template = link_template lnk = lnk_template.format( reverse("show-" + default_name, args=[data[0], ""]) ) if has_locks and data[-2]: if data[-1] == current_user_id: lnk = lnk.replace(" ", own_lock) else: lnk = lnk.replace(" ", lock) else: lnk = lnk.replace(" ", "") except NoReverseMatch: logger.warning( '**WARN "show-' + default_name + '" args (' + str(data[0]) + ") url not available" ) lnk = "" res["link"] = lnk for idx, value in enumerate(data[1:]): if not value or idx >= len(table_cols): continue table_col = table_cols[idx] if type(table_col) not in (list, tuple): table_col = [table_col] tab_cols = [] # foreign key may be divided by "." or "__" for tc in table_col: if "." in tc: tab_cols += tc.split(".") elif "__" in tc: tab_cols += tc.split("__") else: tab_cols.append(tc) k = "__".join(tab_cols) if k.endswith("__image") or k.endswith("__thumbnail"): if ( not value.startswith(settings.MEDIA_ROOT) and not value.startswith("http://") and not value.startswith("https://") ): value = settings.MEDIA_URL + value if hasattr(model, "COL_LINK") and k in model.COL_LINK: value = link_ext_template.format(value, value) if isinstance(value, datetime.date): value = value.strftime("%Y-%m-%d") if isinstance(value, datetime.datetime): value = value.strftime("%Y-%m-%d %H:%M:%S") res[k] = value if full == "shortcut": if "cached_label" in res: res["value"] = res.pop("cached_label") elif "name" in res: res["value"] = res.pop("name") rows.append(res) if full == "shortcut": data = json.dumps(rows) else: total = ( (items_nb // row_nb + (1 if items_nb % row_nb else 0)) if row_nb else items_nb ) data = json.dumps( { "recordsTotal": items_nb, "recordsFiltered": items_nb, "rows": rows, "table-cols": table_cols, "pinned-search": pinned_search, "page": page_nb, "total": total, } ) return HttpResponse(data, content_type="application/json") elif data_type == "csv": response = HttpResponse(content_type="text/csv", charset=ENCODING) n = datetime.datetime.now() filename = "%s_%s.csv" % (default_name, n.strftime("%Y%m%d-%H%M%S")) response["Content-Disposition"] = "attachment; filename=%s" % filename writer = csv.writer(response, **CSV_OPTIONS) if col_names: col_names = [name for name in col_names] else: col_names = [] for field_name in table_cols: if type(field_name) in (list, tuple): field_name = " & ".join(field_name) if hasattr(model, "COL_LABELS") and field_name in model.COL_LABELS: field = model.COL_LABELS[field_name] col_names.append(str(field)) continue else: try: field = model._meta.get_field(field_name) except: col_names.append("") logger.warning( "**WARN get_item - csv export**: no col name " "for {}\nadd explicit label to " "COL_LABELS attribute of " "{}".format(field_name, model) ) continue col_names.append(str(field.verbose_name)) writer.writerow(col_names) for data in datas: row, delta = [], 0 # regroup cols with join "|" for idx, col_name in enumerate(table_cols): if len(data[1:]) <= idx + delta: break val = data[1:][idx + delta] if col_name and "|" in col_name[0]: for delta_idx in range(len(col_name[0].split("|")) - 1): delta += 1 val += data[1:][idx + delta] row.append(val) try: writer.writerow(row) except UnicodeEncodeError: vals = [] for v in row: try: vals.append(v.encode(ENCODING).decode(ENCODING)) except UnicodeEncodeError: vals.append(unidecode(v).encode(ENCODING).decode(ENCODING)) writer.writerow(vals) return response return HttpResponse("{}", content_type="text/plain") return func def adapt_distant_search(params, src, model): if "search_vector" in params and params["search_vector"]: search_vector = params["search_vector"][0] match = RE_FACET.search(search_vector) final_search_vector = "" while match: key, value, __ = match.groups() q = models_rest.ApiKeyMatch.objects.filter( source=src, search_model__model__iexact=model, search_keys__contains=[key], local_label=value, ) up, down = match.span() if q.count(): api_key = q.all()[0] final_search_vector += search_vector[0:up] final_search_vector += f'{key}="{api_key.distant_label};{value}" ' else: final_search_vector += search_vector[0:down] + " " search_vector = search_vector[down:] match = RE_FACET.search(search_vector) final_search_vector += search_vector params["search_vector"] = [final_search_vector] def get_distant_item(request, model, external_source_id, data_type=None): # TODO: check permissions try: src = models_rest.ApiExternalSource.objects.get(pk=external_source_id) except (models_rest.ApiExternalSource.DoesNotExist, ValueError): return HttpResponse("{}", content_type="text/plain") url = src.url url += reverse(f"api-search-{model}") params = {k: v for k, v in dict(request.GET).items() if not k.startswith("columns")} params["submited"] = 1 params["data_type"] = "json" if data_type: params["data_type"] = data_type adapt_distant_search(params, src, model) default_keys = [ "draw", "order[", "start", "length", "search[", "_", "submited", "data_type", ] app = models_rest.MAIN_MODELS[model] model_class = ContentType.objects.get(app_label=app, model=model).model_class() bool_fields = model_class.REVERSED_BOOL_FIELDS + model_class.BOOL_FIELDS + model_class.CALLABLE_BOOL_FIELDS is_empty_params = not any( k for k in params if not any(k for default_key in default_keys if not k.startswith(default_key)) and (k not in bool_fields or params.get(k) != ["unknown"]) ) pin_key = "pin-search-" + model if ( is_empty_params and pin_key in request.session and request.session[pin_key] ): # a search is pinned params["search_vector"] = request.session[pin_key] try: response = requests.get( url, params=params, timeout=20, headers={"Authorization": f"Token {src.key}"}, ) except requests.exceptions.Timeout: return HttpResponse("{}", content_type="text/plain") dct = response.json() if "rows" in dct: for row in dct["rows"]: if "id" in row: try: idx = int(row["id"]) except ValueError: continue source_id = f"source-{external_source_id}-{idx}" row["id"] = source_id if "link" in row: row["link"] = row["link"].replace(str(idx), source_id) return HttpResponse(json.dumps(dct), content_type="application/json") def external_export(request, source_id, model_name, slug): external_sources = request.session.get("EXTERNAL_SOURCES", {}) url = None for source in external_sources: try: src_id, __ = source.split("||") src_id = int(src_id) except ValueError: continue if src_id != source_id: continue for model in external_sources[source]: if model_name == model.split("-")[-1]: url = reverse("api-export-" + model_name, args=[slug]) if not url: return HttpResponse('Unauthorized', status=401) try: src = models_rest.ApiExternalSource.objects.get(pk=source_id) except (models_rest.ApiExternalSource.DoesNotExist, ValueError): return HttpResponse('Unauthorized', status=401) url = src.url + url try: response = requests.get( url, params=request.GET, timeout=20, headers={"Authorization": f"Token {src.key}"}, ) except requests.exceptions.Timeout: return HttpResponse('Gateway Timeout', status=504) if response.status_code != 200: lbl = { 401: "Unauthorized", 404: "Page not found", 500: "Server error", } lbl = lbl[response.status_code] \ if response.status_code in lbl else "Unknown error" return HttpResponse(lbl, status=response.status_code) response = HttpResponse(response.text, content_type="text/csv") n = datetime.datetime.now() filename = f"{model_name}-{n.strftime('%Y%m%d-%H%M%S')}.csv" response["Content-Disposition"] = "attachment; filename=%s" % filename return response