#!/usr/bin/env python3 # -*- coding: utf-8 -*- from collections import OrderedDict from copy import copy, deepcopy import csv import datetime import json import logging import re import requests # nosec: no user input used import subprocess # nosec from tempfile import NamedTemporaryFile import unidecode from django.conf import settings from django.contrib.auth.models import Permission from django.contrib.contenttypes.models import ContentType from django.contrib.gis.geos import GEOSException from django.contrib.staticfiles.templatetags.staticfiles import static from django.core.cache import cache from django.core.exceptions import ObjectDoesNotExist from django.db.models import ( F, Q, Count, Sum, ImageField, ExpressionWrapper, FloatField, FileField, ) from django.db.models.fields import FieldDoesNotExist from django.db.models.functions import ExtractYear from django.db.utils import ProgrammingError from django import forms from django.forms.models import model_to_dict from django.http import HttpResponse, Http404 from django.shortcuts import render from django.template import loader from django.urls import reverse, NoReverseMatch from django.utils.translation import ( ugettext, ugettext_lazy as _, activate, deactivate, pgettext_lazy, ) from tidylib import tidy_document as tidy from unidecode import unidecode from weasyprint import HTML, CSS from weasyprint.fonts import FontConfiguration from bootstrap_datepicker.widgets import DateField from ishtar_common.utils import ( check_model_access_control, CSV_OPTIONS, get_all_field_names, Round, PRIVATE_FIELDS, ) from ishtar_common.models import get_current_profile, GeneralType, SearchAltName from ishtar_common.models_common import HistoryError from .menus import Menu from . import models, models_rest from archaeological_files.models import File from archaeological_operations.models import ( Operation, ArchaeologicalSite, AdministrativeAct, ) from archaeological_context_records.models import ContextRecord from archaeological_finds.models import Find, FindBasket, Treatment, TreatmentFile from archaeological_warehouse.models import Warehouse, Container logger = logging.getLogger(__name__) ENCODING = settings.ENCODING or "utf-8" CURRENT_ITEM_KEYS = ( ("file", File), ("operation", Operation), ("site", ArchaeologicalSite), ("contextrecord", ContextRecord), ("warehouse", Warehouse), ("container", Container), ("find", Find), ("findbasket", FindBasket), ("treatmentfile", TreatmentFile), ("treatment", Treatment), ("administrativeact", AdministrativeAct), ("administrativeactop", AdministrativeAct), ("administrativeactfile", AdministrativeAct), ("administrativeacttreatment", AdministrativeAct), ("administrativeacttreatmentfile", AdministrativeAct), ) CURRENT_ITEM_KEYS_DICT = dict(CURRENT_ITEM_KEYS) def get_autocomplete_queries(request, label_attributes, extra=None): if not label_attributes: return [Q(pk__isnull=True)] base_q = request.GET.get("term") or "" queries = [] splited_q = base_q.split(" ") for value_prefix, query_suffix, query_endswith in ( ("", "__startswith", True), # starts with (" ", "__icontains", True), # contain a word which starts with ("", "__endswith", False), # ends with ("", "__icontains", False), ): # contains alt_queries = [None] if len(splited_q) == 1 and query_endswith: alt_queries = ["__endswith", None] for alt_query in alt_queries: query = Q() if extra: query = Q(**extra) for q in splited_q: if not q: continue sub_q = Q(**{label_attributes[0] + query_suffix: value_prefix + q}) if alt_query: sub_q &= Q(**{label_attributes[0] + alt_query: q}) for other_label in label_attributes[1:]: sub_q = sub_q | Q(**{other_label + query_suffix: value_prefix + q}) query = query & sub_q queries.append(query) return queries def get_autocomplete_item(model, extra=None): if not extra: extra = {} def func(request, current_right=None, limit=20): result = OrderedDict() for query in get_autocomplete_queries(request, ["cached_label"], extra=extra): objects = model.objects.filter(query).values("cached_label", "id")[:limit] for obj in objects: if obj["id"] not in list(result.keys()): result[obj["id"]] = obj["cached_label"] limit -= 1 if not limit: break if not limit: break data = json.dumps( [{"id": obj[0], "value": obj[1]} for obj in list(result.items())] ) return HttpResponse(data, content_type="text/plain") return func def check_permission(request, action_slug, obj_id=None): main_menu = Menu(request.user) main_menu.init() if action_slug not in main_menu.items: # TODO return True if obj_id: return main_menu.items[action_slug].is_available( request.user, obj_id, session=request.session ) return main_menu.items[action_slug].can_be_available( request.user, session=request.session ) def new_qa_item( model, frm, many=False, template="ishtar/forms/qa_new_item.html", page_name="", callback=None ): def func(request, parent_name, limits=""): model_name = model._meta.object_name not_permitted_msg = ugettext("Operation not permitted.") if not check_permission(request, "add_" + model_name.lower()): return HttpResponse(not_permitted_msg) slug = model.SLUG if model.SLUG == "site": slug = "archaeologicalsite" url_slug = "new-" + slug current_page_name = page_name[:] if not current_page_name: current_page_name = _("New %s" % model_name.lower()) dct = { "page_name": str(current_page_name), "url": reverse(url_slug, args=[parent_name]), "slug": slug, "parent_name": parent_name, "many": many, } if request.method == "POST": dct["form"] = frm(request.POST, limits=limits) if dct["form"].is_valid(): new_item = dct["form"].save(request.user) lbl = str(new_item) if not lbl and hasattr(new_item, "_generate_cached_label"): lbl = new_item._generate_cached_label() dct["new_item_label"] = lbl dct["new_item_pk"] = new_item.pk dct["parent_pk"] = parent_name if dct["parent_pk"] and "_select_" in dct["parent_pk"]: parents = dct["parent_pk"].split("_") dct["parent_pk"] = "_".join([parents[0]] + parents[2:]) if callback: callback("new_qa_item", request, None, model.objects.filter(pk=new_item.pk)) return render(request, template, dct) else: dct["form"] = frm(limits=limits) return render(request, template, dct) return func def get_short_html_detail(model): def func(request, pk): model_name = model._meta.object_name not_permitted_msg = ugettext("Operation not permitted.") if not check_permission(request, "view_" + model_name.lower(), pk): return HttpResponse(not_permitted_msg) try: item = model.objects.get(pk=pk) except model.DoesNotExist: return HttpResponse(not_permitted_msg) html = item.get_short_html_detail() return HttpResponse(html) return func def modify_qa_item(model, frm, callback=None): def func(request, parent_name="", pk=None): template = "ishtar/forms/qa_new_item.html" model_name = model._meta.object_name not_permitted_msg = ugettext("Operation not permitted.") if not check_permission(request, "change_" + model_name.lower(), pk): return HttpResponse(not_permitted_msg) slug = model.SLUG if model.SLUG == "site": slug = "archaeologicalsite" try: item = model.objects.get(pk=pk) except model.DoesNotExist: return HttpResponse(not_permitted_msg) url_slug = "modify-" + slug dct = { "page_name": str(_("Modify a %s" % model_name.lower())), "url": reverse(url_slug, args=[parent_name, pk]), "slug": slug, "modify": True, "parent_name": parent_name, } if request.method == "POST": dct["form"] = frm(request.POST) if dct["form"].is_valid(): new_item = dct["form"].save(request.user, item) lbl = str(new_item) if not lbl and hasattr(new_item, "_generate_cached_label"): lbl = new_item._generate_cached_label() dct["new_item_label"] = lbl dct["new_item_pk"] = new_item.pk dct["parent_pk"] = parent_name if dct["parent_pk"] and "_select_" in dct["parent_pk"]: parents = dct["parent_pk"].split("_") dct["parent_pk"] = "_".join([parents[0]] + parents[2:]) if callback: callback("modify_qa_item", request, None, model.objects.filter(pk=new_item.pk)) return render(request, template, dct) else: data = model_to_dict(item) for k in list(data.keys()): if data[k] and isinstance(data[k], list) and hasattr(data[k][0], "pk"): data[k] = [i.pk for i in data[k]] dct["form"] = frm(initial=data) return render(request, template, dct) return func def display_item(model, extra_dct=None, show_url=None): def func(request, pk, **dct): if show_url: dct["show_url"] = "/{}{}/".format(show_url, pk) else: dct["show_url"] = "/show-{}/{}/".format(model.SLUG, pk) return render(request, "ishtar/display_item.html", dct) return func def show_source_item(request, source_id, model, name, base_dct, extra_dct): try: __, source_id, external_id = source_id.split("-") source_id, external_id = int(source_id), int(external_id) except ValueError: raise Http404() models_rest.ApiExternalSource.objects.get() # TODO: check permissions try: src = models_rest.ApiExternalSource.objects.get(pk=source_id) except models_rest.ApiExternalSource.DoesNotExist: return HttpResponse("{}", content_type="text/plain") url = src.url if not url.endswith("/"): url += "/" url += f"api/get/{model.SLUG}/{external_id}/" try: response = requests.get( url, timeout=20, headers={"Authorization": f"Token {src.key}"}, ) except requests.exceptions.Timeout: return HttpResponse("{}", content_type="text/plain") item = response.json() dct = deepcopy(base_dct) if not item: return HttpResponse("{}", content_type="text/plain") item["is_external"] = True item["current_source"] = src.name dct["item"], dct["item_name"] = item, name dct["is_external"] = True dct["SHOW_GEO"] = False if extra_dct: dct.update(extra_dct(request, dct)) for perm in Permission.objects.filter( codename__startswith='view_').values_list("codename", flat=True).all(): dct["permission_" + perm] = False permissions = ["permission_view_document"] for p in permissions: dct[p] = True dct["permission_change_own_document"] = False dct["permission_change_document"] = False tpl = loader.get_template(f"ishtar/sheet_{name}_window.html") content = tpl.render(dct, request) return HttpResponse(content, content_type="application/xhtml") def show_item(model, name, extra_dct=None, model_for_perms=None, callback=None): def func(request, pk, **dct): check_model = model if model_for_perms: check_model = model_for_perms allowed, own = check_model_access_control(request, check_model) if not allowed: return HttpResponse("", content_type="application/xhtml") q = model.objects if own: if not hasattr(request.user, "ishtaruser"): return HttpResponse("") query_own = model.get_query_owns(request.user.ishtaruser) if query_own: q = q.filter(query_own).distinct() doc_type = "type" in dct and dct.pop("type") url_name = ( "/".join(reverse("show-" + name, args=["0", ""]).split("/")[:-2]) + "/" ) profile = get_current_profile() sheet_name = name for profile_key, alt_name in model.SHEET_ALTERNATIVES: if getattr(profile, profile_key): sheet_name = alt_name break dct["PROFILE"] = profile dct["CURRENCY"] = profile.currency dct["ENCODING"] = settings.ENCODING dct["DOT_GENERATION"] = settings.DOT_BINARY and profile.relation_graph dct["SHOW_GEO"] = profile.mapping dct["current_window_url"] = url_name date = None if "date" in dct: date = dct.pop("date") dct["sheet_id"] = f"{name}-{pk}" dct["window_id"] = f"{name}-{pk}-{datetime.datetime.now().strftime('%M%s')}" dct["window_id_underscore"] = dct["window_id"].replace("-", "_") if str(pk).startswith("source-"): return show_source_item(request, pk, model, name, dct, extra_dct) q = q.filter(pk=pk) if not q.count(): return HttpResponse("") if callback: callback("show_item", request, doc_type, q) item = q.all()[0] # list current perms for perm in Permission.objects.filter( codename__startswith='view_').values_list("codename", flat=True).all(): dct["permission_" + perm] = False dct["permission_change_own_document"] = False dct["permission_change_document"] = False if hasattr(request.user, "ishtaruser") and request.user.ishtaruser: cache_key = "{}-{}-{}".format( settings.PROJECT_SLUG, "current-perms", request.session.session_key, ) permissions = cache.get(cache_key) if permissions is None: permissions = [] profile = request.user.ishtaruser.person.current_profile if profile: for group in profile.profile_type.groups.all(): for permission in group.permissions.all(): permissions.append(permission.codename) cache.set(cache_key, permissions, settings.CACHE_TIMEOUT) for perm in permissions: dct["permission_" + perm] = True if hasattr(item, "get_imports"): dct["get_import_list"] = item.get_imports(request.user, limit=5) dct["get_import_updated"] = item.get_imports_updated(request.user, limit=5) if hasattr(item, "history") and request.user.is_superuser: if date: try: date = datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%f") dct["IS_HISTORY"] = True if item.get_last_history_date() != date: item = item.get_previous(date=date) if item is None: raise ValueError("No previous history entry.") dct["previous"] = item._previous dct["next"] = item._next else: date = None except ValueError: return HttpResponse("", content_type="text/plain") if not date: historized = item.history.all() if historized: item.history_date = historized[0].history_date if len(historized) > 1: dct["previous"] = historized[1].history_date if ( doc_type in ("odt", "pdf") and hasattr(item, "qrcode") and (not item.qrcode or not item.qrcode.name) ): item.generate_qrcode(request=request) dct["item"], dct["item_name"] = item, name # add context if extra_dct: dct.update(extra_dct(request, item)) context_instance = deepcopy(dct) context_instance["output"] = "html" if hasattr(item, "associated_filename"): if hasattr(item, "history_object"): filename = item.history_object.associated_filename else: filename = item.associated_filename if doc_type == "odt" and settings.ODT_TEMPLATE: tpl = loader.get_template(f"ishtar/sheet_{sheet_name}.html") context_instance["output"] = "ODT" content = tpl.render(context_instance, request) tidy_options = { "output-xhtml": 1, "indent": 1, "tidy-mark": 0, "doctype": "auto", "add-xml-decl": 1, "wrap": 1, } html, errors = tidy(content, options=tidy_options) html = html.replace(" ", " ") html = re.sub("
]*)>\n", "", html)
odt = NamedTemporaryFile()
html_source = NamedTemporaryFile()
with open(html_source.name, "w") as html_file:
html_file.write(html)
pandoc_args = [
"pandoc",
"-f",
"html",
"-t",
"odt",
"-o",
odt.name,
html_source.name,
]
try:
# nosec: no user input
subprocess.check_call( # nosec
pandoc_args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
except subprocess.CalledProcessError:
return HttpResponse(content, content_type="application/xhtml")
response = HttpResponse(
content_type="application/vnd.oasis.opendocument.text"
)
response["Content-Disposition"] = "attachment; filename={}.odt".format(
filename
)
with open(odt.name, "rb") as odt_file:
response.write(odt_file.read())
return response
elif doc_type == "pdf":
base_url = "/".join(request.build_absolute_uri().split("/")[0:3])
tpl = loader.get_template(f"ishtar/sheet_{sheet_name}_pdf.html")
context_instance["output"] = "PDF"
html = tpl.render(context_instance, request)
font_config = FontConfiguration()
css = CSS(
string="""
@font-face {
font-family: Gentium;
src: url(%s);
}
body{
font-family: Gentium
}
pre {
white-space: pre-wrap;
}
"""
% (base_url + static("gentium/GentiumPlus-R.ttf"))
)
css2 = CSS(filename=settings.STATIC_ROOT + "/media/style_basic.css")
pdf = HTML(string=html, base_url=base_url).write_pdf(
stylesheets=[css, css2], font_config=font_config
)
response = HttpResponse(pdf, content_type="application/pdf")
response["Content-Disposition"] = "attachment; filename=%s.pdf" % filename
return response
else:
tpl = loader.get_template(f"ishtar/sheet_{sheet_name}_window.html")
content = tpl.render(context_instance, request)
return HttpResponse(content, content_type="application/xhtml")
return func
def revert_item(model):
def func(request, pk, date, **dct):
try:
item = model.objects.get(pk=pk)
date = datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%f")
item.rollback(date)
except (ObjectDoesNotExist, ValueError, HistoryError):
return HttpResponse(None, content_type="text/plain")
return HttpResponse("True", content_type="text/plain")
return func
HIERARCHIC_LEVELS = 5
LIST_FIELDS = { # key: hierarchic depth
"conservatory_states": HIERARCHIC_LEVELS,
"identifications": HIERARCHIC_LEVELS,
"material_types": HIERARCHIC_LEVELS,
"material_type": HIERARCHIC_LEVELS,
"object_types": HIERARCHIC_LEVELS,
"period": HIERARCHIC_LEVELS,
"periods": HIERARCHIC_LEVELS,
"source_type": HIERARCHIC_LEVELS,
"unit": HIERARCHIC_LEVELS,
"museum_collection_entry_mode": HIERARCHIC_LEVELS,
"shooting_angle": HIERARCHIC_LEVELS,
"technical_processes": HIERARCHIC_LEVELS,
"museum_inventory_marking_presence": 0,
"museum_marking_type": 0,
"museum_collection": 0,
"batch": 0,
"preservation_to_considers": 0,
"integrities": 0,
"remarkabilities": 0,
"checked_type": 0,
"material_type_quality": 0,
"object_type_quality": 0,
"communicabilities": 0,
"alterations": 0,
"alteration_causes": 0,
"treatment_emergency": 0,
"cultural_attributions": 0,
"remains": 0,
"dating_type": 0,
"quality": 0,
"operation_type": 0,
"report_processing": 0,
"record_quality_type": 0,
"data_type": 0,
"origin": 0,
"provider": 0,
"excavation_technics": 0,
"activity": 0,
"documentations": 0,
}
HIERARCHIC_FIELDS = list(LIST_FIELDS.keys())
def _get_values(request, val):
if hasattr(val, "all"): # manage related objects
vals = list(val.all())
else:
vals = [val]
new_vals = []
for v in vals:
if callable(v):
try:
v = v()
except TypeError:
continue
try:
if (
not isinstance(v, (models.Person, models.Organization))
and hasattr(v, "url")
and v.url
):
v = (
(request.is_secure() and "https" or "http")
+ "://"
+ request.get_host()
+ v.url
)
except ValueError:
pass
new_vals.append(v)
return new_vals
def _push_to_list(obj, current_group, depth):
"""
parse_parentheses helper function
"""
try:
while depth > 0:
current_group = current_group[-1]
depth -= 1
except IndexError:
# tolerant to parentheses mismatch
pass
if (
current_group
and type(obj) in (str, str)
and type(current_group[-1]) in (str, str)
):
current_group[-1] += obj
else:
current_group.append(obj)
true_strings = ["1", "true"]
for language_code, language_lbl in settings.LANGUAGES:
activate(language_code)
true_strings.append(str(_("Yes")).lower())
true_strings.append(str(_("True")).lower())
deactivate()
def is_true_string(val):
val = str(val).lower().replace('"', "")
if val in true_strings:
return True
def _parse_parentheses(s):
"""
Parse parentheses into list.
(OA01 & (pierre | ciseau)) -> ["0A01 &", ["pierre | ciseau"]]
"""
groups = []
depth = 0
inside_quote = False
for char in s:
if char == '"':
inside_quote = not inside_quote
if not inside_quote:
if char == "(":
_push_to_list([], groups, depth)
depth += 1
elif char == ")":
if depth > 0:
depth -= 1
else:
_push_to_list(char, groups, depth)
continue
_push_to_list(char, groups, depth)
# for non tolerant to parentheses mismatch check depth is equal to 0
return groups
FORBIDDEN_CHAR = [":"]
RESERVED_CHAR = ["|", "&"]
RE_FACET = re.compile('([-a-zA-Z]+)="([^"]+)"(?:;"([^"]+)")*')
def _parse_query_string(
string, query_parameters, current_dct, exc_dct, extra_distinct_q
):
string = string.strip().lower()
match = RE_FACET.search(string)
if match or "=" in string:
base_term, queries = "", []
if match:
for idx, gp in enumerate(match.groups()):
if not idx:
base_term = gp
elif gp:
queries.append('"{}"'.format(gp))
else:
splited = string.split("=")
if len(splited) == 2:
base_term = splited[0]
queries.append(splited[1])
if queries:
excluded = base_term.startswith("-")
if excluded:
base_term = base_term[1:]
if base_term in query_parameters:
dct = current_dct
term = query_parameters[base_term].search_query
for query in queries:
# callable request key for complex queries
if callable(term):
# !! distinct_queries not managed
is_true = is_true_string(query)
if excluded:
is_true = not is_true
cfltr, cexclude, cextra = term(is_true=is_true)
if cfltr:
if "and_reqs" not in dct:
dct["and_reqs"] = []
dct["and_reqs"].append(cfltr)
if cexclude:
if "exc_and_reqs" not in dct:
dct["exc_and_reqs"] = []
dct["exc_and_reqs"].append(cexclude)
if cextra:
dct["extras"].append(cextra)
else:
if query_parameters[base_term].distinct_query:
extra_distinct_q.append({})
dct = extra_distinct_q[-1]
if not query_parameters[base_term].distinct_query and excluded:
dct = exc_dct
if query_parameters[base_term].extra_query:
dct.update(query_parameters[base_term].extra_query)
if query_parameters[base_term].related_name and query == '"*"':
term = query_parameters[base_term].related_name
if term in dct:
dct[term] += ";" + query
else:
dct[term] = query
if query_parameters[base_term].distinct_query:
for k in dct: # clean "
dct[k] = dct[k].replace('"', "")
# distinct query wait for a query
_manage_clean_search_field(dct)
extra_distinct_q[-1] = ~Q(**dct) if excluded else Q(**dct)
return ""
for reserved_char in FORBIDDEN_CHAR:
string = string.replace(reserved_char, "")
if len(string) != 1:
for reserved_char in RESERVED_CHAR:
string = string.replace(reserved_char, "")
if not string:
return ""
if string.endswith("*"):
if len(string.strip()) == 1:
return ""
string = string[:-1] + ":*"
elif string not in ("&", "|", "!", "-"):
# like search by default
string = string + ":*"
if string.startswith("-"):
if len(string.strip()) == 1:
return ""
string = "!" + string[1:]
return string
def _parse_parentheses_groups(
groups, query_parameters, current_dct=None, exc_dct=None, extra_distinct_q=None
):
"""
Transform parentheses groups to query
:param groups: groups to transform (list)
:param query_parameters: query keys for facet search
:param current_dct: query dict
:param exc_dct: exclude query dict
:param exc_dct: exclude query dict
:return: query string, query dict, excluded query dict
"""
if not current_dct:
current_dct = {}
if not exc_dct:
exc_dct = {}
if not extra_distinct_q:
extra_distinct_q = []
if type(groups) is not list:
string = groups.strip()
if string.startswith('"') and string.endswith('"') and string.count('"') == 2:
string = string[1:-1]
# split into many groups if spaces
# do not split inside quotes
current_index = 0
found = string.find('"', current_index)
SEP = "?รง;?" # replace spaces inside quote with this characters
previous_quote = None
while found != -1:
if previous_quote is not None:
string = (
string[0:previous_quote]
+ string[previous_quote:found].replace(" ", SEP)
+ string[found:]
)
previous_quote = None
# SEP is larger than a space
found = string.find('"', current_index)
else:
previous_quote = found
current_index = found + 1
found = string.find('"', current_index)
string_groups = [gp.replace(SEP, " ") for gp in string.split(" ")]
if len(string_groups) == 1:
return (
_parse_query_string(
string_groups[0],
query_parameters,
current_dct,
exc_dct,
extra_distinct_q,
),
current_dct,
exc_dct,
extra_distinct_q,
)
return _parse_parentheses_groups(
string_groups, query_parameters, current_dct, exc_dct, extra_distinct_q
)
if not groups: # empty list
return "", current_dct, exc_dct, extra_distinct_q
query = "("
previous_sep, has_item = None, False
for item in groups:
q, current_dct, exc_dct, extra_distinct_q = _parse_parentheses_groups(
item, query_parameters, current_dct, exc_dct, extra_distinct_q
)
q = q.strip()
if not q:
continue
if q in ("|", "&"):
if previous_sep or not has_item:
continue # multiple sep is not relevant
previous_sep = q
continue
if has_item:
query += previous_sep or " & "
query += q
has_item = True
previous_sep = None
query += ")"
if query == "()":
query = ""
unaccent_query = unidecode(query)
if unaccent_query != query:
query = f"({query} | {unaccent_query})"
return query, current_dct, exc_dct, extra_distinct_q
def _search_manage_search_vector(
model, dct, exc_dct, distinct_queries, query_parameters
):
if (
"search_vector" not in dct or not model._meta.managed
): # is a view - no search_vector
return dct, exc_dct, distinct_queries
search_vector = dct["search_vector"]
parentheses_groups = _parse_parentheses(search_vector)
(
search_query,
extra_dct,
extra_exc_dct,
extra_distinct_q,
) = _parse_parentheses_groups(parentheses_groups, query_parameters)
dct.update(extra_dct)
distinct_queries += extra_distinct_q
exc_dct.update(extra_exc_dct)
if not search_query:
return dct, exc_dct, distinct_queries
# remove inside parenthesis
search_query = search_query.replace("(", "").replace(")", "").strip()
if search_query:
if "extras" not in dct:
dct["extras"] = []
dct["extras"].append(
{
"where": [
f"{model._meta.db_table}.search_vector @@ (to_tsquery(%s, %s)) = true OR " +
f"{model._meta.db_table}.search_vector @@ (to_tsquery('simple', %s)) = true OR "
f"{model._meta.db_table}.search_vector::tsvector @@ %s::tsquery = true"
],
"params": [settings.ISHTAR_SEARCH_LANGUAGE, search_query, search_query,
search_query],
}
)
return dct, exc_dct, distinct_queries
def _manage_bool_fields(model, bool_fields, reversed_bool_fields, dct, or_reqs):
bool_fields = list(bool_fields) + list(reversed_bool_fields)
for k in bool_fields:
if k not in dct:
continue
elif dct[k] == "1" or dct[k] == "unknown":
dct.pop(k)
continue
dct[k] = dct[k].replace('"', "")
if dct[k] in ["2", "yes", str(_("Yes")).lower(), "True"]:
dct[k] = True
else:
dct[k] = False
if k in reversed_bool_fields:
dct[k] = not dct[k]
# check also for empty value with image field
field_names = k.split("__")
# TODO: can be improved in later version of Django
try:
c_field = model._meta.get_field(field_names[0])
for field_name in field_names[1:-1]:
if not hasattr(c_field, "related_model") or not c_field.related_model:
return
c_field = c_field.related_model._meta.get_field(field_name)
if k.endswith("__isnull") and (
isinstance(c_field, (ImageField, FileField))
or field_names[-2] == "associated_url"
):
key = "__".join(k.split("__")[:-1])
if dct[k]:
or_reqs.append((k, {key + "__exact": ""}))
else:
dct[key + "__regex"] = ".{1}.*"
except FieldDoesNotExist:
pass
def _manage_many_counted_fields(fields, reversed_fields, dct, excluded_dct):
bool_fields = list(fields or []) + list(reversed_fields or [])
for k in bool_fields:
if k not in dct:
continue
elif dct[k] == "1" or dct[k] == "unknown":
dct.pop(k)
continue
dct[k] = dct[k].replace('"', "")
dct[k] = True if dct[k] in ["2", "yes", str(_("Yes")).lower()] else None
if reversed_fields and k in reversed_fields:
dct[k] = True if not dct[k] else None
if dct[k]:
dct[k] = False
else:
dct.pop(k)
excluded_dct[k] = False
def __manage_relative_search(value):
"""
Parse search to manage "=", "=>" and "=<" searches
:param value: value
:return: value, search type ("eq", "lte" or "gte")
"""
value = value.replace('"', "").strip()
search_type = "eq"
if value.startswith(">"):
value = value[1:]
search_type = "gte"
elif value.startswith("<"):
value = value[1:]
search_type = "lte"
return value, search_type
def _manage_number(k, dct):
"""
Manage number from a search query
:param k: number key
:param dct: search dict
:return: None -> search dict is modified
"""
values = dct[k].split(";")
results = []
for value in values:
value, search_type = __manage_relative_search(value)
try:
value = str(float(value.replace(",", ".")))
if value.endswith(".0"):
value = value[:-2]
results.append((search_type, value))
except ValueError:
continue
mins = [value for dt, value in results if dt == "gte"]
maxs = [value for dt, value in results if dt == "lte"]
eqs = [value for dt, value in results if dt == "eq"]
if eqs and not mins and not maxs: # TODO: min and max not available
dct[k] = ";".join(eqs)
return
dct.pop(k)
if mins:
dct[k + "__gte"] = min(mins)
if maxs:
dct[k + "__lte"] = max(maxs)
def _manage_number_fields(fields, dct):
keys = list(dct.keys())
for key in fields:
res = [j for j in keys if j.startswith(key)]
if not res:
continue
for k in res:
if not dct[k]:
dct.pop(k)
continue
_manage_number(k, dct)
today_lbl = pgettext_lazy("key for text search", "today")
TODAYS = ["today"]
for language_code, language_lbl in settings.LANGUAGES:
activate(language_code)
TODAYS.append(str(today_lbl))
deactivate()
def _manage_date(k, dct):
"""
Manage date from a search query
:param k: date key
:param dct: search dict
:return: None -> search dict is modified
"""
if not isinstance(dct[k], str):
return
values = dct[k].split(";")
results = []
for value in values:
# TODO: manage '*/10/2024' searches?
if value.replace('"', "").replace("'", "") == "*":
dct.pop(k)
dct[k + "__isnull"] = False
return
value, date_type = __manage_relative_search(value)
has_today = False
for today in TODAYS:
if value.startswith(today):
base_date = datetime.date.today()
value = value[len(today):].replace(" ", "")
if value and value[0] in ("-", "+"):
sign = value[0]
try:
days = int(value[1:])
except ValueError:
days = 0
if days:
if sign == "-":
base_date = base_date - datetime.timedelta(days=days)
else:
base_date = base_date + datetime.timedelta(days=days)
value = base_date.strftime("%Y-%m-%d")
has_today = True
break
if has_today:
results.append((date_type, value))
continue
items = []
if "/" in value:
items = list(reversed(value.split("/")))
elif "-" in value: # already date formated
items = value.split("-")
if len(items) != 3:
continue
try:
results.append((date_type, datetime.datetime(*map(lambda x: int(x), items)).strftime(
"%Y-%m-%d"
)))
except ValueError:
try:
# try US format
results.append((
date_type,
datetime.datetime(*map(lambda x: int(x), reversed(items))).strftime("%Y-%m-%d")
))
except ValueError:
pass
mins = [value for dt, value in results if dt == "gte"]
maxs = [value for dt, value in results if dt == "lte"]
eqs = [value for dt, value in results if dt == "eq"]
if eqs and not mins and not maxs: # TODO: min and max not available when using single dates
dct[k] = ";".join(eqs)
return
dct.pop(k)
if mins:
dct[k + "__gte"] = min(mins)
if maxs:
dct[k + "__lte"] = max(maxs)
def _manage_dated_fields(dated_fields, dct):
keys = list(dct.keys())
for key in dated_fields:
res = [j for j in keys if j.startswith(key)]
if not res:
continue
for k in res:
if k not in dct:
continue
if not dct[k]:
dct.pop(k)
continue
_manage_date(k, dct)
def _clean_type_val(val):
for prefix in GeneralType.PREFIX_CODES:
val = val.replace(prefix, "")
val = val.strip()
if val.startswith('"') and val.endswith('"'):
val = '"{}"'.format(val[1:-1].strip())
return val
def _manage_facet_search(model, dct, and_reqs):
if not hasattr(model, "general_types"):
return
general_types = model.general_types()
hierarchic_fields = HIERARCHIC_FIELDS[:]
if hasattr(model, "hierarchic_fields"):
hierarchic_fields += model.hierarchic_fields()
for base_k in general_types:
if base_k in hierarchic_fields: # already managed
continue
if base_k.endswith("_id"):
k = base_k
else:
k = base_k + "__pk"
if k not in dct or not dct[k].startswith('"') or not dct[k].startswith('"'):
continue
val = _clean_type_val(dct.pop(k))
if '";"' in val:
# OR request
values = val.split(";")
else:
values = [val]
reqs = None
for val in values:
if not val.endswith('"') or not val.startswith('"'):
continue
lbl_name = "__label__"
try:
rel = getattr(model, base_k).field.related_model
if not hasattr(rel, "label") and hasattr(rel, "cached_label"):
lbl_name = "__cached_label__"
except AttributeError:
pass
suffix = (
"{}icontains".format(lbl_name)
if "%" in val
else "{}iexact".format(lbl_name)
)
query = val[1:-1].replace("*", "")
if not reqs:
reqs = Q(**{base_k + suffix: query})
else:
reqs |= Q(**{base_k + suffix: query})
if reqs:
and_reqs.append(reqs)
POST_PROCESS_REQUEST = getattr(model, "POST_PROCESS_REQUEST", None)
if not POST_PROCESS_REQUEST:
return
for k in dct:
if k in POST_PROCESS_REQUEST and dct[k]:
dct[k] = getattr(model, POST_PROCESS_REQUEST[k])(dct[k].replace('"', ""))
def _manage_hierarchic_fields(model, dct, and_reqs):
hierarchic_fields = HIERARCHIC_FIELDS[:]
if hasattr(model, "hierarchic_fields"):
hierarchic_fields += model.hierarchic_fields()
for reqs in dct.copy():
if type(reqs) not in (list, tuple):
reqs = [reqs]
for req in reqs:
if req.endswith("areas__pk") or req.endswith("areas__label__iexact"):
if req.endswith("pk"):
suffix = "pk"
elif req.endswith("label__iexact"):
suffix = "label__iexact"
else:
continue
val = _clean_type_val(dct.pop(req))
if val.startswith('"') and val.endswith('"'):
val = val[1:-1]
if val == "*":
req = req.replace("label__", "").replace("pk", "").replace("iexact", "")
req += "isnull"
reqs = Q(**{req: False})
and_reqs.append(reqs)
continue
elif "*" in val and "iexact" in suffix:
suffix = suffix.replace("iexact", "icontains")
req = req.replace("iexact", "icontains")
val = val.replace("*", "")
reqs = Q(**{req: val})
for idx in range(HIERARCHIC_LEVELS):
req = req[: -(len(suffix))] + "parent__" + suffix
q = Q(**{req: val})
reqs |= q
and_reqs.append(reqs)
# TODO: improve query with "IN ()"?
continue
if req.endswith("wcontainer_id") or req.endswith("wcontainer_ref_id"):
val = _clean_type_val(dct.pop(req)).strip('"')
if val.startswith('"') and val.endswith('"'):
val = val[1:-1]
vals = [v.replace('"', "") for v in val.split(";")]
req = req[1:] # remove "w"
container_ids = []
if "*" in vals:
main_req = Q(**{req + "__isnull": False})
and_reqs.append(main_req)
continue
for val in vals:
attr = "cached_label__iexact"
if val.endswith("*"):
attr = "cached_label__icontains"
val = val[:-1]
q = Container.objects.filter(**{attr: val}).values_list(
"id", flat=True)
if not q.count():
continue
container_ids += list(q.all())
main_req = Q(**{req + "__in": container_ids})
and_reqs.append(main_req)
if req.endswith("precise_town_id"):
val = _clean_type_val(dct.pop(req)).strip('"')
if val.startswith('"') and val.endswith('"'):
val = val[1:-1]
vals = [v.replace('"', "") for v in val.split(";")]
town_ids = []
for val in vals:
q = models.Town.objects.filter(cached_label__iexact=val).values_list(
"id", flat=True)
if not q.count():
continue
town_id = q.all()[0]
town_ids.append(town_id)
reqs = Q(**{req: val})
for rel_query in ("parents__", "children__"):
for idx in range(HIERARCHIC_LEVELS):
k = rel_query * (idx + 1) + "pk"
q = models.Town.objects.filter(
**{k: town_id}).values_list("id", flat=True)
if not q.count():
break
town_ids += list(q.all())
main_req = Q(**{req + "__in": town_ids})
and_reqs.append(main_req)
elif (
req.endswith("town__pk")
or req.endswith("towns__pk")
or req.endswith("town__cached_label__iexact")
or req.endswith("towns__cached_label__iexact")
):
if req.endswith("pk"):
suffix = "pk"
elif req.endswith("cached_label__iexact"):
suffix = "cached_label__iexact"
else:
continue
val = _clean_type_val(dct.pop(req)).strip('"')
if val.startswith('"') and val.endswith('"'):
val = val[1:-1]
vals = [v.replace('"', "") for v in val.split(";")]
main_req = None
for val in vals:
suf = suffix
if "*" == val:
req = req.replace("cached_label__", "").replace("pk", "").replace("iexact", "")
req += "isnull"
reqs = Q(**{req: False})
if not main_req:
main_req = reqs
else:
main_req |= reqs
continue
elif "*" in val and "iexact" in suf:
suf = suffix.replace("iexact", "icontains")
req = req.replace("iexact", "icontains")
val = val.replace("*", "")
reqs = Q(**{req: val})
nreq = base_req = req[:]
for idx in range(HIERARCHIC_LEVELS):
nreq = nreq[: -(len(suf))] + "parents__" + suf
q = Q(**{nreq: val})
reqs |= q
nreq = base_req[:]
for idx in range(HIERARCHIC_LEVELS):
nreq = nreq[: -(len(suf))] + "children__" + suf
q = Q(**{nreq: val})
reqs |= q
if not main_req:
main_req = reqs
else:
main_req |= reqs
and_reqs.append(main_req)
# TODO: improve query with "IN ()"?
continue
for k_hr in hierarchic_fields:
lbl_name = "label"
if hasattr(model, k_hr):
rel = getattr(model, k_hr).field.related_model
if not hasattr(rel, "label") and hasattr(rel, "cached_label"):
lbl_name = "cached_label"
if type(req) in (list, tuple):
val = dct.pop(req)
val = _clean_type_val(val)
q = None
for idx, r in enumerate(req):
r = _clean_type_val(r)
if not idx:
q = Q(**{r: val})
else:
q |= Q(**{r: val})
and_reqs.append(q)
break
elif req.endswith(k_hr + "__pk") or req.endswith(
k_hr + "__{}__iexact".format(lbl_name)
):
val = _clean_type_val(dct.pop(req))
if '";"' in val:
# OR request
values = val.split(";")
else:
values = [val]
base_req = req[:]
reqs = None
if req.endswith("pk"):
base_suffix = "pk"
elif req.endswith("{}__iexact".format(lbl_name)):
base_suffix = lbl_name + "__iexact"
else:
continue
for val in values:
suffix = base_suffix[:]
req = base_req[:]
if val.endswith('"') and val.startswith('"'):
val = val[1:-1]
# manage search text by label
if "*" in val:
suffix = lbl_name + "__icontains"
else:
suffix = lbl_name + "__iexact"
current_values = val.strip().split("*")
req = req[: -(len(base_suffix))] + suffix
else:
current_values = [val]
new_req = None
for idx, nval in enumerate(current_values):
if not idx:
new_req = Q(**{req: nval})
else:
new_req &= Q(**{req: nval})
if not reqs:
reqs = new_req
else:
reqs |= new_req
hierarchic_levels = LIST_FIELDS[k_hr] if k_hr in LIST_FIELDS \
else HIERARCHIC_LEVELS
for idx in range(hierarchic_levels):
req = req[: -(len(suffix))] + "parent__" + suffix
for idx, nval in enumerate(current_values):
if not idx:
new_req = Q(**{req: nval})
else:
new_req &= Q(**{req: nval})
reqs |= new_req
# TODO: improve query with "IN ()"?
if reqs:
and_reqs.append(reqs)
break
def _manage_clean_search_field(dct, exclude=None, reverse=False, related_name_fields=None):
related_names = related_name_fields if related_name_fields else []
for k in list(dct.keys()):
# clean quoted search field
if not isinstance(dct[k], str):
continue
dct[k] = dct[k].replace('"', "")
dct[k] = _clean_type_val(dct[k])
if "*" not in dct[k] or k.endswith("regex"):
continue
value = dct.pop(k).strip()
base_key = k[:]
if k.endswith("__iexact"):
base_key = k[:-len("__iexact")]
if value == "*":
if k in related_names or not reverse:
dct[base_key + "__isnull"] = False
if exclude is not None and k.endswith("__iexact"):
exclude[base_key + "__exact"] = ""
continue
if value.startswith("*"):
value = value[1:]
if value.endswith("*"):
value = value[:-1]
if value:
dct[base_key + "__icontains"] = value
elif exclude is not None:
exclude[base_key + "__exact"] = ""
def _get_relation_type_dict(my_relation_types_prefix, dct):
relation_types = {}
for rtype_key in my_relation_types_prefix:
relation_types[my_relation_types_prefix[rtype_key]] = set()
for rtype_key in my_relation_types_prefix:
for keys in list(dct.keys()):
if type(keys) not in (list, tuple):
keys = [keys]
for k in keys:
if k.startswith(rtype_key):
relation_types[my_relation_types_prefix[rtype_key]].add(
dct.pop(k)
)
return relation_types
def _manage_relation_types(relation_types, dct, query, or_reqs):
for rtype_prefix in relation_types:
vals = relation_types[rtype_prefix]
if not vals:
continue
vals = list(vals)[0].split(";")
alt_dct = {}
for v in vals:
alt_dct = {
rtype_prefix
+ "right_relations__relation_type__label__iexact": v.replace('"', "")
}
for k in dct:
val = dct[k]
if rtype_prefix:
# only get conditions related to the object
if rtype_prefix not in k:
continue
# tricky: reconstruct the key to make sense - remove the
# prefix from the key
k = (
k[0 : k.index(rtype_prefix)]
+ k[k.index(rtype_prefix) + len(rtype_prefix) :]
)
if k.endswith("year"):
k += "__exact"
alt_dct[rtype_prefix + "right_relations__right_record__" + k] = val
query |= Q(**alt_dct)
for k, or_req in or_reqs:
altor_dct = alt_dct.copy()
altor_dct.pop(k)
for j in or_req:
val = or_req[j]
if j == "year":
j = "year__exact"
altor_dct[rtype_prefix + "right_relations__right_record__" + j] = val
query |= Q(**altor_dct)
return query
def _construct_query(relation_types, dct, or_reqs, and_reqs, excluded_relation=False):
# excluded -> reverse logic
if excluded_relation:
and_reqs, or_reqs = or_reqs, and_reqs
for key in dct:
if isinstance(dct[key], str):
values = [v for v in dct[key].split(";") if v]
else:
values = [dct[key]]
if not values:
values = [""] # filter empty value
for value in values:
or_reqs.append((key, {key: value}))
dct = {}
# manage multi value not already managed
for key in list(dct.keys()):
if isinstance(dct[key], str) and ";" in dct[key]:
values = [v for v in dct[key].split(";") if v]
if not values:
dct.pop(key)
continue
dct[key] = values[0]
if len(values) == 1:
continue
for v in values[1:]:
or_reqs.append((key, {key: v}))
for k in list(dct.keys()):
if type(k) not in (list, tuple):
continue
first_key = k[0]
value = dct[k][:]
dct.pop(k)
dct[first_key] = value
for other_key in k[1:]:
or_reqs.append((first_key, {other_key: value}))
query = Q(**dct)
for or_req in or_reqs:
alt_dct = dct.copy()
if isinstance(or_req, (tuple, list)):
k, or_req = or_req
if k in alt_dct:
alt_dct.pop(k)
alt_dct.update(or_req)
query |= Q(**alt_dct)
else:
query |= (Q(**alt_dct) & Q(or_req))
query = _manage_relation_types(relation_types, dct, query, or_reqs)
done = []
for and_req in and_reqs:
str_q = str(and_req)
if str_q in done:
continue
done.append(str_q)
query = query & and_req
return query
def _manage_default_search(
dct, request, model, default_name, my_base_request, my_relative_session_names
):
pinned_search = ""
pin_key = "pin-search-" + default_name
base_request = my_base_request if isinstance(my_base_request, dict) else {}
dct = {k: v for k, v in dct.items() if v}
if pin_key in request.session and request.session[pin_key]: # a search is pinned
pinned_search = request.session[pin_key]
dct = {"search_vector": request.session[pin_key]}
elif (
default_name in request.session and request.session[default_name]
): # an item is pinned
value = request.session[default_name]
if "basket-" in value:
try:
dct = {"basket__pk": request.session[default_name].split("-")[-1]}
pinned_search = str(FindBasket.objects.get(pk=dct["basket__pk"]))
except FindBasket.DoesNotExist:
pass
else:
try:
dct = {"pk": request.session[default_name]}
pinned_search = '"{}"'.format(model.objects.get(pk=dct["pk"]))
except model.DoesNotExist:
pass
elif dct == {k: v for k, v in base_request.items() if v}:
if not hasattr(model, "UP_MODEL_QUERY"):
logger.warning(
"**WARN get_item**: - UP_MODEL_QUERY not defined for "
"'{}'".format(model)
)
else:
# a parent item may be selected in the default menu
for name, key in my_relative_session_names:
if (
name in request.session
and request.session[name]
and "basket-" not in request.session[name]
and name in CURRENT_ITEM_KEYS_DICT
):
up_model = CURRENT_ITEM_KEYS_DICT[name]
try:
dct.update({key: request.session[name]})
up_item = up_model.objects.get(pk=dct[key])
if up_item.SLUG not in model.UP_MODEL_QUERY:
logger.warning(
"**WARN get_item**: - {} not in "
"UP_MODEL_QUERY for {}'".format(up_item.SLUG, model)
)
else:
req_key, up_attr = model.UP_MODEL_QUERY[up_item.SLUG]
pinned_search = '{}="{}"'.format(
req_key, getattr(up_item, up_attr)
)
break
except up_model.DoesNotExist:
pass
return dct, pinned_search
def _format_val(val):
if val is None:
return ""
if type(val) == bool:
if val:
return str(_("True"))
else:
return str(_("False"))
return str(val)
def _format_geojson(rows, link_template, display_polygon):
data = {
"type": "FeatureCollection",
"crs": {"type": "name", "properties": {"name": "EPSG:4326"}},
"link_template": link_template,
"features": [],
"no-geo": [],
}
if not rows:
return data
"""
Columns:
base: ['id', 'cached_label', 'main_geodata__cached_x',
'main_geodata__cached_y', 'point_x', 'point_y',
'locked', 'lock_user_id']
poly: ['id', 'cached_label', 'main_geodata__cached_x',
'main_geodata__cached_y',
'main_geodata__multi_line', 'main_geodata__multi_polygon',
'point_x', 'point_y',
'locked', 'lock_user_id']
"""
delta = 2 if display_polygon else 0
for row in rows:
properties = {"id": row[0], "name": row[1]}
feature = None
base_feature = {
"type": "Feature",
"properties": properties,
}
if display_polygon:
if row[4]: # lines
feature = base_feature
feature["geometry"] = json.loads(row[4].geojson)
elif row[5]: # polygons
feature = base_feature
feature["geometry"] = json.loads(row[5].geojson)
if not feature:
x, y = row[4 + delta], row[5 + delta]
if not x or not y or x < -180 or x > 180 or y < -90 or y > 90:
data["no-geo"].append(properties)
continue
feature = base_feature
feature["geometry"] = {"type": "Point", "coordinates": [x, y]}
data["features"].append(feature)
return data
def _get_data_from_query(items, query_table_cols, extra_request_keys,
geo_fields=None):
# TODO: manage data json field
for query_keys in query_table_cols:
if not isinstance(query_keys, (tuple, list)):
query_keys = [query_keys]
for query_key in query_keys:
if query_key in extra_request_keys: # translate query term
query_key = extra_request_keys[query_key]
if isinstance(query_key, (list, tuple)):
# only manage one level for display
query_key = query_key[0]
# clean
for filtr in ("__icontains", "__contains", "__iexact", "__exact"):
if query_key.endswith(filtr):
query_key = query_key[: len(query_key) - len(filtr)]
query_key.replace(".", "__") # class style to query
values = ["id"] + query_table_cols
if geo_fields:
profile = get_current_profile()
precision = profile.point_precision
if precision is not None:
exp_x = ExpressionWrapper(
Round(geo_fields[0], precision),
output_field=FloatField(),
)
exp_y = ExpressionWrapper(
Round(geo_fields[1], precision),
output_field=FloatField(),
)
else:
exp_x = F(geo_fields[0])
exp_y = F(geo_fields[1])
items = items.annotate(point_x=exp_x)
items = items.annotate(point_y=exp_y)
values += ["point_x", "point_y"]
if hasattr(items.model, "locked"):
values.append("locked")
values.append("lock_user_id")
values = [v for v in values if v] # filter empty values
return items.values_list(*values)
def _get_data_from_query_old(
items, query_table_cols, request, extra_request_keys, do_not_deduplicate=False
):
c_ids, datas = [], []
has_lock = items and hasattr(items[0], "locked")
for item in items:
# manual deduplicate when distinct is not enough
if not do_not_deduplicate and item.pk in c_ids:
continue
c_ids.append(item.pk)
data = [item.pk]
for keys in query_table_cols:
if type(keys) not in (list, tuple):
keys = [keys]
my_vals = []
for k in keys:
if k in extra_request_keys:
k = extra_request_keys[k]
if type(k) in (list, tuple):
k = k[0]
for filtr in ("__icontains", "__contains", "__iexact", "__exact"):
if k.endswith(filtr):
k = k[: len(k) - len(filtr)]
vals = [item]
# foreign key may be divided by "." or "__"
splitted_k = []
for ky in k.split("."):
if "__" in ky:
splitted_k += ky.split("__")
else:
splitted_k.append(ky)
if splitted_k[-1] == "count":
splitted_k = splitted_k[:-2] + [splitted_k[-2] + "__count"]
for ky in splitted_k:
new_vals = []
for val in vals:
if ky.endswith("__count"):
new_vals += [getattr(val, ky[: -len("__count")]).count()]
elif hasattr(val, "all"): # manage related objects
val = list(val.all())
for v in val:
v = getattr(v, ky)
new_vals += _get_values(request, v)
elif val and isinstance(val, dict):
if ky in val:
val = val[ky]
new_vals += _get_values(request, val)
elif val:
try:
val = getattr(val, ky)
new_vals += _get_values(request, val)
except (AttributeError, GEOSException):
# must be a query key such as "contains"
pass
vals = new_vals
# manage last related objects
if vals and hasattr(vals[0], "all"):
new_vals = []
for val in vals:
new_vals += list(val.all())
vals = new_vals
if not my_vals:
my_vals = [_format_val(va) for va in vals]
else:
new_vals = []
if not vals:
for idx, my_v in enumerate(my_vals):
new_vals.append("{}{}{}".format(my_v, " - ", ""))
else:
for idx, v in enumerate(vals):
new_vals.append(
"{}{}{}".format(vals[idx], " - ", _format_val(v))
)
my_vals = new_vals[:]
data.append(" & ".join(set(my_vals)) or "")
if has_lock:
data.append(item.locked)
data.append(item.lock_user_id)
datas.append(data)
return datas
def _format_modality(value):
if value is None:
value = str(_("Unknown"))
if isinstance(value, bool):
value = str(_(str(value)))
return value
def _get_json_stats_optimized(
items, stats_sum_variable, stats_modality_1, stats_modality_2, multiply=1
):
q = items
value_keys = []
for stat in (stats_modality_1, stats_modality_2):
if not stat:
continue
if stat.endswith("__year"):
q = q.annotate(**{stat: ExtractYear(stat[: -len("__year")])})
value_keys.append(stat)
q = q.values(*value_keys)
if stats_sum_variable == "pk":
q = q.annotate(sum=Count("pk"))
else:
q = q.annotate(sum=Sum(stats_sum_variable))
data = []
if stats_modality_2 and stats_modality_2 != stats_modality_1:
q = q.order_by(stats_modality_1, stats_modality_2)
for values in q.all():
modality_1 = _format_modality(values[stats_modality_1])
if not data or data[-1][0] != modality_1:
data.append([modality_1, []])
data[-1][1].append(
(
_format_modality(values[stats_modality_2]),
int((values["sum"] or 0) * multiply),
)
)
else:
q = q.order_by(stats_modality_1)
for values in q.all():
modality_1 = values[stats_modality_1]
data.append(
[_format_modality(modality_1), int((values["sum"] or 0) * multiply)]
)
data = json.dumps({"data": data})
return HttpResponse(data, content_type="application/json")
def _get_json_stats(
items, stats_sum_variable, stats_modality_1, stats_modality_2, multiply=1
):
# _get_json_stats_optimized should work
# but problem on container with criteria on CVL
q = items
value_keys = []
for stat in (stats_modality_1, stats_modality_2):
if not stat:
continue
if stat.endswith("__year"):
q = q.annotate(**{stat: ExtractYear(stat[:-len("__year")])})
value_keys.append(stat)
value_keys.append(stats_sum_variable)
q = q.values(*value_keys)
data = []
if stats_modality_2 and stats_modality_2 != stats_modality_1:
q = q.order_by(stats_modality_1, stats_modality_2)
results = {}
for values in q.all():
value = values[stats_sum_variable] or 0
if stats_sum_variable == "pk":
value = 1
modality_1 = _format_modality(values[stats_modality_1])
modality_2 = _format_modality(values[stats_modality_2])
key = (modality_1, modality_2)
if key not in results:
results[key] = 0
results[key] += value * multiply
for key in results:
modality_1, modality_2 = key
if not data or data[-1][0] != modality_1:
data.append([modality_1, []])
data[-1][1].append((modality_2, results[key]))
else:
q = q.order_by(stats_modality_1)
results = {}
for values in q.all():
value = values[stats_sum_variable] or 0
if stats_sum_variable == "pk":
value = 1
modality_1 = _format_modality(values[stats_modality_1])
if modality_1 not in results:
results[modality_1] = 0
results[modality_1] += value * multiply
for modality_1 in results:
data.append([modality_1, results[modality_1]])
data = json.dumps({"data": data})
return HttpResponse(data, content_type="application/json")
def _get_table_cols(data_type, own_table_cols, full, model):
# list of table cols depending on configuration and data send
if data_type == "json-map":
return [] # only pk for map
if own_table_cols:
table_cols = own_table_cols
else:
if full:
table_cols = [
field.name
for field in model._meta.fields
if field.name not in PRIVATE_FIELDS
]
table_cols += [
field.name
for field in model._meta.many_to_many
if field.name not in PRIVATE_FIELDS
]
if hasattr(model, "EXTRA_FULL_FIELDS"):
table_cols += model.EXTRA_FULL_FIELDS
else:
tb_key = (getattr(model, "SLUG", None), "TABLE_COLS")
if tb_key in settings.TABLE_COLS:
table_cols = settings.TABLE_COLS[tb_key]
else:
table_cols = model.TABLE_COLS
if callable(table_cols):
table_cols = table_cols()
table_cols = list(table_cols)
return table_cols
DEFAULT_ROW_NUMBER = 10
# length is used by ajax DataTables requests
EXCLUDED_FIELDS = ["length"]
BASE_DATED_FIELDS = ["created", "last_modified"]
def get_item(
model,
func_name,
default_name,
extra_request_keys=None,
base_request=None,
bool_fields=None,
reversed_bool_fields=None,
dated_fields=None,
associated_models=None,
relative_session_names=None,
specific_perms=None,
own_table_cols=None,
relation_types_prefix=None,
do_not_deduplicate=False,
model_for_perms=None,
alt_query_own=None,
search_form=None,
no_permission_check=False,
callback=None,
):
"""
Generic treatment of tables
:param model: model used for query
:param func_name: name of the function (used for session storage)
:param default_name: key used for default search in session
:param extra_request_keys: default query limitation
:param base_request:
:param bool_fields:
:param reversed_bool_fields:
:param dated_fields:
:param associated_models:
:param relative_session_names:
:param specific_perms:
:param own_table_cols:
:param relation_types_prefix:
:param do_not_deduplicate: duplication of id can occurs on large queryset a
mechanism of deduplication is used. But duplicate ids can be normal (for
instance for record_relations view).
:param model_for_perms: use another model to check permission
:param alt_query_own: name of alternate method to get query_own
:param search_form: associated search form to manage JSON query keys
:callback: callback to execute after request. It is called with three arguments: request, export format and queryset of the result
:return:
"""
def func(
request,
data_type="json",
full=False,
force_own=False,
col_names=None,
no_link=False,
no_limit=False,
return_query=False,
**dct,
):
available_perms = []
if specific_perms:
available_perms = specific_perms[:]
EMPTY = ""
if "type" in dct:
data_type = dct.pop("type")
if not data_type:
data_type = "json"
if "json" in data_type:
EMPTY = "[]"
if data_type not in ("json", "csv", "json-image", "json-map", "json-stats"):
return HttpResponse(EMPTY, content_type="text/plain")
if data_type == "json-stats" and len(model.STATISTIC_MODALITIES) < 2:
return HttpResponse(EMPTY, content_type="text/plain")
model_to_check = model
if model_for_perms:
model_to_check = model_for_perms
if no_permission_check:
allowed, own = True, False
else:
if return_query:
allowed, own = True, False
else:
allowed, own = check_model_access_control(
request, model_to_check, available_perms
)
if not allowed:
return HttpResponse(EMPTY, content_type="text/plain")
if force_own:
own = True
if (
full == "shortcut"
and "SHORTCUT_SEARCH" in request.session
and request.session["SHORTCUT_SEARCH"] == "own"
):
own = True
query_own = None
if own:
q = models.IshtarUser.objects.filter(user_ptr=request.user)
if not q.count():
return HttpResponse(EMPTY, content_type="text/plain")
if alt_query_own:
query_own = getattr(model, alt_query_own)(q.all()[0])
else:
query_own = model.get_query_owns(q.all()[0])
query_parameters = {}
if hasattr(model, "get_query_parameters"):
query_parameters = model.get_query_parameters()
# get defaults from model
if not extra_request_keys:
my_extra_request_keys = copy(model.EXTRA_REQUEST_KEYS or {})
if query_parameters:
for key in query_parameters:
my_extra_request_keys[key] = query_parameters[key].search_query
else:
my_extra_request_keys = copy(extra_request_keys or {})
if base_request is None and hasattr(model, "BASE_REQUEST"):
if callable(model.BASE_REQUEST):
my_base_request = model.BASE_REQUEST(request)
else:
my_base_request = copy(model.BASE_REQUEST)
elif base_request is not None:
my_base_request = copy(base_request)
else:
my_base_request = {}
if not bool_fields and hasattr(model, "BOOL_FIELDS"):
my_bool_fields = model.BOOL_FIELDS[:]
else:
my_bool_fields = bool_fields[:] if bool_fields else []
if not reversed_bool_fields and hasattr(model, "REVERSED_BOOL_FIELDS"):
my_reversed_bool_fields = model.REVERSED_BOOL_FIELDS[:]
else:
my_reversed_bool_fields = (
reversed_bool_fields[:] if reversed_bool_fields else []
)
many_counted_fields = getattr(model, "MANY_COUNTED_FIELDS", None)
reversed_many_counted_fields = getattr(
model, "REVERSED_MANY_COUNTED_FIELDS", None
)
my_number_fields = getattr(model, "NUMBER_FIELDS", [])[:]
if not dated_fields and hasattr(model, "DATED_FIELDS"):
my_dated_fields = model.DATED_FIELDS[:]
else:
my_dated_fields = dated_fields[:] if dated_fields else []
my_dated_fields += BASE_DATED_FIELDS
if not associated_models and hasattr(model, "ASSOCIATED_MODELS"):
my_associated_models = model.ASSOCIATED_MODELS[:]
else:
my_associated_models = associated_models[:] if associated_models else []
if not relative_session_names and hasattr(model, "RELATIVE_SESSION_NAMES"):
my_relative_session_names = model.RELATIVE_SESSION_NAMES[:]
else:
my_relative_session_names = (
relative_session_names[:] if relative_session_names else []
)
if not relation_types_prefix and hasattr(model, "RELATION_TYPES_PREFIX"):
my_relation_types_prefix = copy(model.RELATION_TYPES_PREFIX)
else:
my_relation_types_prefix = (
copy(relation_types_prefix) if relation_types_prefix else {}
)
fields = [model._meta.get_field(k) for k in get_all_field_names(model)]
request_keys = dict(
[
(
field.name,
field.name
+ (
hasattr(field, "remote_field")
and field.remote_field
and "__pk"
or ""
),
)
for field in fields
]
)
# add keys of associated models to available request key
for associated_model, key in my_associated_models:
if type(associated_model) == str:
if associated_model not in globals():
continue
associated_model = globals()[associated_model]
associated_fields = [
associated_model._meta.get_field(k)
for k in get_all_field_names(associated_model)
]
request_keys.update(
dict(
[
(
key + "__" + field.name,
key
+ "__"
+ field.name
+ (hasattr(field, "rel") and field.rel and "__pk" or ""),
)
for field in associated_fields
]
)
)
request_keys.update(my_extra_request_keys)
# manage search on json fields and excluded fields
if (
search_form
and request
and request.user
and getattr(request.user, "ishtaruser", None)
):
available, __, excluded_fields, json_fields = search_form.check_custom_form(
request.user.ishtaruser
)
# for now no manage on excluded_fields: should we prevent search on
# some fields regarding the user concerned?
if available:
for __, jkey, jfield in json_fields:
if jfield.alt_name not in request_keys:
if isinstance(
jfield, (forms.NullBooleanField, forms.BooleanField)
):
my_bool_fields.append(jkey)
request_keys[jfield.alt_name] = jkey
elif isinstance(jfield, DateField):
my_dated_fields.append(jkey)
request_keys[jfield.alt_name] = jkey
else:
request_keys[jfield.alt_name] = jkey + "__iexact"
if "query" in dct:
request_items = dct["query"]
request_items["submited"] = True
elif request.method == "POST":
request_items = request.POST
else:
request_items = request.GET
count = dct.get("count", False)
# pager
try:
row_nb = int(request_items.get("length"))
except (ValueError, TypeError):
row_nb = DEFAULT_ROW_NUMBER
if data_type == "json-map": # other limit for map
row_nb = settings.ISHTAR_MAP_MAX_ITEMS
if no_limit or (
data_type == "json-map" and request_items.get("no_limit", False)
):
row_nb = None
display_polygon = False
if data_type == "json-map" and request_items.get("display_polygon", False):
display_polygon = True
dct_request_items = {}
# filter requested fields
for k in request_items:
if k in EXCLUDED_FIELDS:
continue
key = k[:]
if key.startswith("searchprefix_"):
key = key[len("searchprefix_"):]
dct_request_items[key] = request_items[k]
request_items = dct_request_items
base_query = None
if isinstance(my_base_request, Q):
base_query = my_base_request
dct = {}
else:
dct = copy(my_base_request)
excluded_dct = {}
and_reqs, or_reqs = [], []
exc_and_reqs, exc_or_reqs = [], []
distinct_queries = []
dct["extras"], dct["and_reqs"], dct["exc_and_reqs"] = [], [], []
if full == "shortcut":
if model.SLUG == "warehouse":
key = "name__icontains"
else:
key = "cached_label__icontains"
dct[key] = request.GET.get("term", None)
try:
old = "old" in request_items and int(request_items["old"])
except ValueError:
return HttpResponse("[]", content_type="text/plain")
selected_ids = request_items.get("selected_ids", None)
if selected_ids:
if "-" in selected_ids:
q = Q(pk__in=selected_ids.split("-"))
else:
q = Q(pk=selected_ids)
and_reqs.append(q)
for k in request_keys:
val = request_items.get(k)
if not val:
continue
req_keys = request_keys[k]
target = dct
if k in query_parameters:
if query_parameters[k].distinct_query:
distinct_queries.append({})
target = distinct_queries[-1]
if query_parameters[k].extra_query:
target.update(query_parameters[k].extra_query)
if callable(req_keys):
# callable request key for complex queries not managed on GET
continue
elif type(req_keys) not in (list, tuple):
target[req_keys] = val
continue
# multiple choice target
reqs = Q(**{req_keys[0]: val})
for req_key in req_keys[1:]:
q = Q(**{req_key: val})
reqs |= q
and_reqs.append(reqs)
pinned_search = ""
base_keys = ["extras", "and_reqs", "exc_and_reqs"]
if my_base_request and isinstance(my_base_request, dict):
base_keys += list(my_base_request)
whole_bool_fields = my_bool_fields[:] + my_reversed_bool_fields[:]
if reversed_many_counted_fields:
whole_bool_fields += reversed_many_counted_fields[:]
has_a_search = any(
k
for k in dct.keys()
if k not in base_keys
and (
dct[k]
and (k not in whole_bool_fields or dct[k] not in ("1", "unknown"))
)
)
# manage default and pinned search and not bookmark
if (
not has_a_search
and not request_items.get("search_vector", "")
and not request_items.get("submited", "")
and full != "shortcut"
):
if data_type == "csv" and func_name in request.session:
dct = request.session[func_name]
else:
# default search
dct, pinned_search = _manage_default_search(
dct,
request,
model,
default_name,
my_base_request,
my_relative_session_names,
)
elif func_name and request and hasattr(request, "session"):
request.session[func_name] = dct
for k in request_keys:
if k not in query_parameters:
query_parameters[k] = SearchAltName(k, request_keys[k])
related_name_fields = [query_parameters[k].related_name for k in query_parameters
if query_parameters[k].related_name]
dct, excluded_dct, distinct_queries = _search_manage_search_vector(
model,
dct,
excluded_dct,
distinct_queries,
query_parameters,
)
search_vector = ""
if "search_vector" in dct:
search_vector = dct.pop("search_vector")
# manage relations types
if "relation_types" not in my_relation_types_prefix:
my_relation_types_prefix["relation_types"] = ""
relation_types = _get_relation_type_dict(my_relation_types_prefix, dct)
exc_relation_types = _get_relation_type_dict(my_relation_types_prefix,
excluded_dct)
_manage_bool_fields(
model, my_bool_fields, my_reversed_bool_fields, dct, or_reqs
)
_manage_bool_fields(
model, my_bool_fields, my_reversed_bool_fields, excluded_dct, exc_or_reqs
)
tmp_excluded = {}
_manage_many_counted_fields(
many_counted_fields, reversed_many_counted_fields, dct, tmp_excluded
)
_manage_many_counted_fields(
many_counted_fields, reversed_many_counted_fields, excluded_dct, dct
)
if tmp_excluded:
excluded_dct.update(tmp_excluded)
# dated_fields, number_fields
# ['signature_date', ...], ['signature_date__year', ...]
# -> remove 'signature_date'
filtered_dated_fields = []
for field_name in my_dated_fields:
exc = False
for number_field in my_number_fields:
if number_field.startswith(field_name):
exc = True
break
if not exc:
filtered_dated_fields.append(field_name)
my_dated_fields = filtered_dated_fields
_manage_dated_fields(my_dated_fields, dct)
_manage_dated_fields(my_dated_fields, excluded_dct)
_manage_number_fields(my_number_fields, dct)
_manage_number_fields(my_number_fields, excluded_dct)
_manage_hierarchic_fields(model, dct, and_reqs)
_manage_hierarchic_fields(model, excluded_dct, exc_and_reqs)
_manage_facet_search(model, dct, and_reqs)
_manage_facet_search(model, excluded_dct, exc_and_reqs)
extras = []
if "extras" in dct:
extras = dct.pop("extras")
if "and_reqs" in dct:
and_reqs += dct.pop("and_reqs")
if "exc_and_reqs" in dct:
exc_and_reqs += dct.pop("exc_and_reqs")
updated_excluded = {}
_manage_clean_search_field(dct, updated_excluded, related_name_fields=related_name_fields)
_manage_clean_search_field(excluded_dct, dct, reverse=True, related_name_fields=related_name_fields)
if updated_excluded:
excluded_dct.update(updated_excluded)
query = _construct_query(relation_types, dct, or_reqs, and_reqs)
exc_query = None
if excluded_dct or exc_and_reqs or exc_or_reqs or exc_relation_types:
exc_query = _construct_query(
exc_relation_types, excluded_dct, exc_or_reqs, exc_and_reqs,
excluded_relation=True
)
if query_own:
query = query & query_own
# manage hierarchic in shortcut menu
if full == "shortcut":
ASSOCIATED_ITEMS = {
Operation: (File, "associated_file__pk"),
ContextRecord: (Operation, "operation__pk"),
Find: (ContextRecord, "base_finds__context_record__pk"),
}
if model in ASSOCIATED_ITEMS:
upper_model, upper_key = ASSOCIATED_ITEMS[model]
model_name = upper_model.SLUG
current = model_name in request.session and request.session[model_name]
if current:
dct = {upper_key: current}
query &= Q(**dct)
# print(query, distinct_queries, base_query, exc_query, extras)
items = model.objects.filter(query)
for d_q in distinct_queries:
items = items.filter(d_q)
if base_query:
items = items.filter(base_query)
if exc_query:
items = items.exclude(exc_query)
for extra in extras:
items = items.extra(**extra)
if return_query:
return items
items = items.distinct()
table_cols = _get_table_cols(data_type, own_table_cols, full, model)
count_values = ["pk"]
query_distinct_count = getattr(model, "QUERY_DISTINCT_COUNT", None)
if query_distinct_count:
for k, v in query_distinct_count.items():
if v in count_values:
continue
for col in table_cols:
if col.startswith(k):
count_values.append(v)
break
try:
q = items.values(*count_values)
items_nb = q.count() or 0
except ProgrammingError:
items_nb = 0
if count:
return items_nb
# print(str(items.values("id").query))
if search_vector: # for serialization
dct["search_vector"] = search_vector
if data_type == "json-stats":
stats_modality_1 = request_items.get("stats_modality_1", None)
stats_modality_2 = request_items.get("stats_modality_2", None)
if (
not stats_modality_1
or stats_modality_1 not in model.STATISTIC_MODALITIES
):
stats_modality_1 = model.STATISTIC_MODALITIES[0]
if stats_modality_2 not in model.STATISTIC_MODALITIES:
stats_modality_2 = None
stats_sum_variable = request_items.get("stats_sum_variable", None)
stats_sum_variable_keys = list(model.STATISTIC_SUM_VARIABLE.keys())
if (
not stats_sum_variable
or stats_sum_variable not in stats_sum_variable_keys
):
stats_sum_variable = stats_sum_variable_keys[0]
multiply = model.STATISTIC_SUM_VARIABLE[stats_sum_variable][1]
return _get_json_stats(
items,
stats_sum_variable,
stats_modality_1,
stats_modality_2,
multiply=multiply,
)
table_cols = [col if col != [] else '' for col in table_cols]
query_table_cols = []
for idx, cols in enumerate(table_cols):
if type(cols) not in (list, tuple):
cols = [cols]
for col in cols:
query_table_cols += col.split("|")
# contextual (full, simple, etc.) col
contxt = full and "full" or "simple"
if (
hasattr(model, "CONTEXTUAL_TABLE_COLS")
and contxt in model.CONTEXTUAL_TABLE_COLS
):
for idx, col in enumerate(table_cols):
if col in model.CONTEXTUAL_TABLE_COLS[contxt]:
query_table_cols[idx] = model.CONTEXTUAL_TABLE_COLS[contxt][col]
if data_type in ("json-image", "json-map") or full == "shortcut":
if model.SLUG == "warehouse":
query_table_cols.append("name")
table_cols.append("name")
else:
query_table_cols.append("cached_label")
table_cols.append("cached_label")
if data_type == "json-image":
prefix = ""
if model != models.Document:
prefix = "main_image__"
query_table_cols.append(prefix + "thumbnail")
table_cols.append(prefix + "thumbnail")
query_table_cols.append(prefix + "image")
table_cols.append(prefix + "image")
elif data_type == "json-map":
base_query_key = "main_geodata__"
if model.SLUG == "find":
base_query_key = "base_finds__" + base_query_key
query_table_cols += [base_query_key + "cached_x",
base_query_key + "cached_y"]
table_cols += [base_query_key + "cached_x",
base_query_key + "cached_y"]
if display_polygon:
query_table_cols += [base_query_key + "multi_line",
base_query_key + "multi_polygon"]
# manage sort tables
manual_sort_key = None
sorts = {}
for k in request_items:
if not k.startswith("order["):
continue
num = int(k.split("]")[0][len("order[") :])
if num not in sorts:
sorts[num] = ["", ""] # sign, col_num
if k.endswith("[dir]"):
order = request_items[k]
sign = order and order == "desc" and "-" or ""
sorts[num][0] = sign
if k.endswith("[column]"):
sorts[num][1] = request_items[k]
sign = ""
if not sorts:
items = items.order_by("pk")
else:
orders = []
sort_keys = list(sorts.keys())
for idx in sorted(sorts.keys()):
signe, col_num = sorts[idx]
col_num = int(col_num)
# id or link col
if col_num < 2 and len(sort_keys) <= 2:
orders.append("pk")
continue
k = query_table_cols[col_num - 2]
if k in request_keys:
ks = request_keys[k]
if type(ks) not in (tuple, list):
ks = [ks]
for k in ks:
if k.endswith("__pk"):
k = k[: -len("__pk")] + "__label"
if k.endswith("towns"):
k = k + "__cached_label"
if (
k.endswith("__icontains")
or k.endswith("__contains")
or k.endswith("__iexact")
or k.endswith("__exact")
):
k = "__".join(k.split("__")[:-1])
# if '__' in k:
# k = k.split('__')[0]
orders.append(signe + k)
else:
# not a standard request key
if idx: # not the first - we ignore this sort
continue
sign = signe
manual_sort_key = k
logger.warning(
"**WARN get_item - {}**: manual sort key '{}'".format(
func_name, k
)
)
break
if not manual_sort_key:
items = items.order_by(*orders)
# pager management
start, end = 0, None
page_nb = 1
if row_nb and data_type.startswith("json"):
try:
start = int(request_items.get("start"))
page_nb = start // row_nb + 1
if page_nb < 1:
raise ValueError("Page number is not relevant.")
except (TypeError, ValueError, AssertionError):
start = 0
page_nb = 1
end = int(page_nb * row_nb)
if full == "shortcut":
start = 0
end = 20
if callback:
slice_query = None
if not manual_sort_key:
slice_query = (start, end)
callback("get_item", request, data_type, items, slice_query)
if manual_sort_key:
items = items.all()
else:
items = items[start:end]
if old:
items = [item.get_previous(old) for item in items]
if data_type == "json-map":
if display_polygon:
geo_fields = query_table_cols[-4:]
else:
geo_fields = query_table_cols[-2:]
datas = _get_data_from_query(
items, query_table_cols, my_extra_request_keys,
geo_fields=geo_fields
)
elif data_type != "csv" and getattr(model, "NEW_QUERY_ENGINE", False):
datas = _get_data_from_query(items, query_table_cols, my_extra_request_keys)
else:
datas = _get_data_from_query_old(
items,
query_table_cols,
request,
my_extra_request_keys,
do_not_deduplicate,
)
if manual_sort_key:
# +1 because the id is added as a first col
idx_col = None
if manual_sort_key in query_table_cols:
idx_col = query_table_cols.index(manual_sort_key) + 1
else:
for idx, col in enumerate(query_table_cols):
if type(col) in (list, tuple) and manual_sort_key in col:
idx_col = idx + 1
if idx_col is not None:
null_value = None
for d in datas:
if isinstance(d[idx_col], (int, float)):
null_value = 0
break
if isinstance(d[idx_col], str):
null_value = ""
break
if isinstance(d[idx_col], datetime.date):
null_value = datetime.date(1, 1, 1)
break
if isinstance(d[idx_col], datetime.datetime):
null_value = datetime.datetime(1, 1, 1)
break
if not null_value:
null_value = ""
datas = sorted(
datas,
key=lambda x: x[idx_col] if x[idx_col] is not None else null_value
)
if sign == "-":
datas = reversed(datas)
datas = list(datas)[start:end]
link_template = (
""
' '
)
link_ext_template = '{}'
lock = ' '
own_lock = ' '
has_locks = hasattr(model, "locked")
current_user_id = request.user and request.user.id
if data_type.startswith("json"):
rows = []
if data_type == "json-map":
lnk = link_template.format(
reverse("show-" + default_name, args=[999999, ""]),
)
lnk = lnk.replace("999999", "")
if not has_locks:
lnk = lnk.replace("", "")
data = json.dumps(_format_geojson(datas, lnk, display_polygon))
return HttpResponse(data, content_type="application/json")
for data in datas:
res = {
"id": data[0],
}
if not no_link:
try:
lnk_template = link_template
lnk = lnk_template.format(
reverse("show-" + default_name, args=[data[0], ""])
)
if has_locks and data[-2]:
if data[-1] == current_user_id:
lnk = lnk.replace("", own_lock)
else:
lnk = lnk.replace("", lock)
else:
lnk = lnk.replace("", "")
except NoReverseMatch:
logger.warning(
'**WARN "show-'
+ default_name
+ '" args ('
+ str(data[0])
+ ") url not available"
)
lnk = ""
res["link"] = lnk
for idx, value in enumerate(data[1:]):
if not value or idx >= len(table_cols):
continue
table_col = table_cols[idx]
if type(table_col) not in (list, tuple):
table_col = [table_col]
tab_cols = []
# foreign key may be divided by "." or "__"
for tc in table_col:
if "." in tc:
tab_cols += tc.split(".")
elif "__" in tc:
tab_cols += tc.split("__")
else:
tab_cols.append(tc)
k = "__".join(tab_cols)
if k.endswith("__image") or k.endswith("__thumbnail"):
if (
not value.startswith(settings.MEDIA_ROOT)
and not value.startswith("http://")
and not value.startswith("https://")
):
value = settings.MEDIA_URL + value
if hasattr(model, "COL_LINK") and k in model.COL_LINK:
value = link_ext_template.format(value, value)
if isinstance(value, datetime.date):
value = value.strftime("%Y-%m-%d")
if isinstance(value, datetime.datetime):
value = value.strftime("%Y-%m-%d %H:%M:%S")
res[k] = value
if full == "shortcut":
if "cached_label" in res:
res["value"] = res.pop("cached_label")
elif "name" in res:
res["value"] = res.pop("name")
rows.append(res)
# v4.0 patch
if getattr(model, "SELECT_GROUP_BY", False):
new_rows = OrderedDict()
for row in rows:
idx = row["id"]
if idx in new_rows:
for key in row:
if row[key] == new_rows[idx][key]:
continue
new_rows[idx][key] += " ; " + row[key]
else:
new_rows[idx] = row
rows = [row for __, row in new_rows.items()]
if full == "shortcut":
data = json.dumps(rows)
else:
total = (
(items_nb // row_nb + (1 if items_nb % row_nb else 0))
if row_nb
else items_nb
)
data = json.dumps(
{
"recordsTotal": items_nb,
"recordsFiltered": items_nb,
"rows": rows,
"table-cols": table_cols,
"pinned-search": pinned_search,
"page": page_nb,
"total": total,
}
)
return HttpResponse(data, content_type="application/json")
elif data_type == "csv":
response = HttpResponse(content_type="text/csv", charset=ENCODING)
n = datetime.datetime.now()
filename = "%s_%s.csv" % (default_name, n.strftime("%Y%m%d-%H%M%S"))
response["Content-Disposition"] = "attachment; filename=%s" % filename
writer = csv.writer(response, **CSV_OPTIONS)
if col_names:
col_names = [name for name in col_names]
else:
col_names = []
for field_name in table_cols:
if type(field_name) in (list, tuple):
field_name = " & ".join(field_name)
if hasattr(model, "COL_LABELS") and field_name in model.COL_LABELS:
field = model.COL_LABELS[field_name]
col_names.append(str(field))
continue
else:
try:
field = model._meta.get_field(field_name)
except:
col_names.append("")
logger.warning(
"**WARN get_item - csv export**: no col name "
"for {}\nadd explicit label to "
"COL_LABELS attribute of "
"{}".format(field_name, model)
)
continue
col_names.append(str(field.verbose_name))
writer.writerow(col_names)
for data in datas:
row, delta = [], 0
# regroup cols with join "|"
for idx, col_name in enumerate(table_cols):
if len(data[1:]) <= idx + delta:
break
val = data[1:][idx + delta]
if col_name and "|" in col_name[0]:
for delta_idx in range(len(col_name[0].split("|")) - 1):
delta += 1
val += data[1:][idx + delta]
row.append(val)
try:
writer.writerow(row)
except UnicodeEncodeError:
vals = []
for v in row:
try:
vals.append(v.encode(ENCODING).decode(ENCODING))
except UnicodeEncodeError:
vals.append(unidecode(v).encode(ENCODING).decode(ENCODING))
writer.writerow(vals)
return response
return HttpResponse("{}", content_type="text/plain")
return func
def adapt_distant_search(params, src, model):
if "search_vector" in params and params["search_vector"]:
search_vector = params["search_vector"][0]
match = RE_FACET.search(search_vector)
final_search_vector = ""
while match:
key, value, __ = match.groups()
q = models_rest.ApiKeyMatch.objects.filter(
source=src,
search_model__model__iexact=model,
search_keys__contains=[key],
local_label=value,
)
up, down = match.span()
if q.count():
api_key = q.all()[0]
final_search_vector += search_vector[0:up]
final_search_vector += f'{key}="{api_key.distant_label};{value}" '
else:
final_search_vector += search_vector[0:down] + " "
search_vector = search_vector[down:]
match = RE_FACET.search(search_vector)
final_search_vector += search_vector
params["search_vector"] = [final_search_vector]
def get_distant_item(request, model, external_source_id, data_type=None):
# TODO: check permissions
try:
src = models_rest.ApiExternalSource.objects.get(pk=external_source_id)
except (models_rest.ApiExternalSource.DoesNotExist, ValueError):
return HttpResponse("{}", content_type="text/plain")
url = src.url
url += reverse(f"api-search-{model}")
params = {k: v for k, v in dict(request.GET).items() if not k.startswith("columns")}
params["submited"] = 1
params["data_type"] = "json"
if data_type:
params["data_type"] = data_type
adapt_distant_search(params, src, model)
default_keys = [
"draw",
"order[",
"start",
"length",
"search[",
"_",
"submited",
"data_type",
]
app = models_rest.MAIN_MODELS[model]
model_class = ContentType.objects.get(app_label=app, model=model).model_class()
bool_fields = model_class.REVERSED_BOOL_FIELDS + model_class.BOOL_FIELDS + model_class.CALLABLE_BOOL_FIELDS
is_empty_params = not any(
k
for k in params
if not any(k for default_key in default_keys if not k.startswith(default_key))
and (k not in bool_fields or params.get(k) != ["unknown"])
)
pin_key = "pin-search-" + model
if (
is_empty_params and pin_key in request.session and request.session[pin_key]
): # a search is pinned
params["search_vector"] = request.session[pin_key]
try:
response = requests.get(
url,
params=params,
timeout=20,
headers={"Authorization": f"Token {src.key}"},
)
except requests.exceptions.Timeout:
return HttpResponse("{}", content_type="text/plain")
dct = response.json()
if "rows" in dct:
for row in dct["rows"]:
if "id" in row:
try:
idx = int(row["id"])
except ValueError:
continue
source_id = f"source-{external_source_id}-{idx}"
row["id"] = source_id
if "link" in row:
row["link"] = row["link"].replace(str(idx), source_id)
return HttpResponse(json.dumps(dct), content_type="application/json")
def external_export(request, source_id, model_name, slug):
external_sources = request.session.get("EXTERNAL_SOURCES", {})
url = None
for source in external_sources:
try:
src_id, __ = source.split("||")
src_id = int(src_id)
except ValueError:
continue
if src_id != source_id:
continue
for model in external_sources[source]:
if model_name == model.split("-")[-1]:
url = reverse("api-export-" + model_name, args=[slug])
if not url:
return HttpResponse('Unauthorized', status=401)
try:
src = models_rest.ApiExternalSource.objects.get(pk=source_id)
except (models_rest.ApiExternalSource.DoesNotExist, ValueError):
return HttpResponse('Unauthorized', status=401)
url = src.url + url
try:
response = requests.get(
url,
params=request.GET,
timeout=20,
headers={"Authorization": f"Token {src.key}"},
)
except requests.exceptions.Timeout:
return HttpResponse('Gateway Timeout', status=504)
if response.status_code != 200:
lbl = {
401: "Unauthorized",
404: "Page not found",
500: "Server error",
}
lbl = lbl[response.status_code] \
if response.status_code in lbl else "Unknown error"
return HttpResponse(lbl, status=response.status_code)
response = HttpResponse(response.text, content_type="text/csv")
n = datetime.datetime.now()
filename = f"{model_name}-{n.strftime('%Y%m%d-%H%M%S')}.csv"
response["Content-Disposition"] = "attachment; filename=%s" % filename
return response