summaryrefslogtreecommitdiff
path: root/ishtar_common/views_item.py
diff options
context:
space:
mode:
authorÉtienne Loks <etienne.loks@iggdrasil.net>2021-03-19 11:05:22 +0100
committerÉtienne Loks <etienne.loks@iggdrasil.net>2021-03-19 11:05:22 +0100
commit3039fae5124c00a67283c9b707e4a411149d93b1 (patch)
tree5d7fde3628825aebeeef3d85d2dfcf09a52116de /ishtar_common/views_item.py
parentb38e35ad05ae5b7d1c3d45436921f573bc9e5ba6 (diff)
downloadIshtar-3039fae5124c00a67283c9b707e4a411149d93b1.tar.bz2
Ishtar-3039fae5124c00a67283c9b707e4a411149d93b1.zip
Format - black: ishtar_common
Diffstat (limited to 'ishtar_common/views_item.py')
-rw-r--r--ishtar_common/views_item.py1392
1 files changed, 776 insertions, 616 deletions
diff --git a/ishtar_common/views_item.py b/ishtar_common/views_item.py
index 0b803b00e..31a16b672 100644
--- a/ishtar_common/views_item.py
+++ b/ishtar_common/views_item.py
@@ -18,55 +18,74 @@ from django.contrib.staticfiles.templatetags.staticfiles import static
from django.core.cache import cache
from django.core.exceptions import ObjectDoesNotExist
from django.core.urlresolvers import reverse, NoReverseMatch
-from django.db.models import Q, Count, Sum, ImageField, Func, \
- ExpressionWrapper, FloatField, FileField
+from django.db.models import (
+ Q,
+ Count,
+ Sum,
+ ImageField,
+ Func,
+ ExpressionWrapper,
+ FloatField,
+ FileField,
+)
from django.db.models.fields import FieldDoesNotExist
from django.db.utils import ProgrammingError
from django.forms.models import model_to_dict
from django.http import HttpResponse
from django.shortcuts import render
from django.template import loader
-from django.utils.translation import ugettext, ugettext_lazy as _, \
- activate, deactivate, pgettext_lazy
+from django.utils.translation import (
+ ugettext,
+ ugettext_lazy as _,
+ activate,
+ deactivate,
+ pgettext_lazy,
+)
from tidylib import tidy_document as tidy
from unidecode import unidecode
from weasyprint import HTML, CSS
from weasyprint.fonts import FontConfiguration
-from ishtar_common.utils import check_model_access_control, CSV_OPTIONS, \
- get_all_field_names, Round, PRIVATE_FIELDS
-from ishtar_common.models import get_current_profile, \
- GeneralType, SearchAltName
+from ishtar_common.utils import (
+ check_model_access_control,
+ CSV_OPTIONS,
+ get_all_field_names,
+ Round,
+ PRIVATE_FIELDS,
+)
+from ishtar_common.models import get_current_profile, GeneralType, SearchAltName
from ishtar_common.models_common import HistoryError
from .menus import Menu
from . import models
from archaeological_files.models import File
-from archaeological_operations.models import Operation, ArchaeologicalSite, \
- AdministrativeAct
+from archaeological_operations.models import (
+ Operation,
+ ArchaeologicalSite,
+ AdministrativeAct,
+)
from archaeological_context_records.models import ContextRecord
-from archaeological_finds.models import Find, FindBasket, Treatment, \
- TreatmentFile
+from archaeological_finds.models import Find, FindBasket, Treatment, TreatmentFile
from archaeological_warehouse.models import Warehouse
logger = logging.getLogger(__name__)
-ENCODING = settings.ENCODING or 'utf-8'
+ENCODING = settings.ENCODING or "utf-8"
CURRENT_ITEM_KEYS = (
- ('file', File),
- ('operation', Operation),
- ('site', ArchaeologicalSite),
- ('contextrecord', ContextRecord),
- ('warehouse', Warehouse),
- ('find', Find),
- ('treatmentfile', TreatmentFile),
- ('treatment', Treatment),
- ('administrativeact', AdministrativeAct),
- ('administrativeactop', AdministrativeAct),
- ('administrativeactfile', AdministrativeAct),
- ('administrativeacttreatment', AdministrativeAct),
- ('administrativeacttreatmentfile', AdministrativeAct),
+ ("file", File),
+ ("operation", Operation),
+ ("site", ArchaeologicalSite),
+ ("contextrecord", ContextRecord),
+ ("warehouse", Warehouse),
+ ("find", Find),
+ ("treatmentfile", TreatmentFile),
+ ("treatment", Treatment),
+ ("administrativeact", AdministrativeAct),
+ ("administrativeactop", AdministrativeAct),
+ ("administrativeactfile", AdministrativeAct),
+ ("administrativeacttreatment", AdministrativeAct),
+ ("administrativeacttreatmentfile", AdministrativeAct),
)
CURRENT_ITEM_KEYS_DICT = dict(CURRENT_ITEM_KEYS)
@@ -74,14 +93,15 @@ CURRENT_ITEM_KEYS_DICT = dict(CURRENT_ITEM_KEYS)
def get_autocomplete_queries(request, label_attributes, extra=None):
if not label_attributes:
return [Q(pk__isnull=True)]
- base_q = request.GET.get('term') or ""
+ base_q = request.GET.get("term") or ""
queries = []
- splited_q = base_q.split(' ')
+ splited_q = base_q.split(" ")
for value_prefix, query_suffix, query_endswith in (
- ('', '__startswith', True), # starts with
- (' ', '__icontains', True), # contain a word which starts with
- ('', '__endswith', False), # ends with
- ('', '__icontains', False)): # contains
+ ("", "__startswith", True), # starts with
+ (" ", "__icontains", True), # contain a word which starts with
+ ("", "__endswith", False), # ends with
+ ("", "__icontains", False),
+ ): # contains
alt_queries = [None]
if len(splited_q) == 1 and query_endswith:
alt_queries = ["__endswith", None]
@@ -92,15 +112,11 @@ def get_autocomplete_queries(request, label_attributes, extra=None):
for q in splited_q:
if not q:
continue
- sub_q = Q(
- **{label_attributes[0] + query_suffix: value_prefix + q})
+ sub_q = Q(**{label_attributes[0] + query_suffix: value_prefix + q})
if alt_query:
- sub_q &= Q(
- **{label_attributes[0] + alt_query: q}
- )
+ sub_q &= Q(**{label_attributes[0] + alt_query: q})
for other_label in label_attributes[1:]:
- sub_q = sub_q | Q(
- **{other_label + query_suffix: value_prefix + q})
+ sub_q = sub_q | Q(**{other_label + query_suffix: value_prefix + q})
query = query & sub_q
queries.append(query)
return queries
@@ -112,10 +128,8 @@ def get_autocomplete_item(model, extra=None):
def func(request, current_right=None, limit=20):
result = OrderedDict()
- for query in get_autocomplete_queries(request, ['cached_label'],
- extra=extra):
- objects = model.objects.filter(query).values(
- 'cached_label', 'id')[:limit]
+ for query in get_autocomplete_queries(request, ["cached_label"], extra=extra):
+ objects = model.objects.filter(query).values("cached_label", "id")[:limit]
for obj in objects:
if obj["id"] not in list(result.keys()):
result[obj["id"]] = obj["cached_label"]
@@ -124,9 +138,11 @@ def get_autocomplete_item(model, extra=None):
break
if not limit:
break
- data = json.dumps([{'id': obj[0], 'value': obj[1]}
- for obj in list(result.items())])
- return HttpResponse(data, content_type='text/plain')
+ data = json.dumps(
+ [{"id": obj[0], "value": obj[1]} for obj in list(result.items())]
+ )
+ return HttpResponse(data, content_type="text/plain")
+
return func
@@ -138,17 +154,20 @@ def check_permission(request, action_slug, obj_id=None):
return True
if obj_id:
return MAIN_MENU.items[action_slug].is_available(
- request.user, obj_id, session=request.session)
+ request.user, obj_id, session=request.session
+ )
return MAIN_MENU.items[action_slug].can_be_available(
- request.user, session=request.session)
+ request.user, session=request.session
+ )
-def new_qa_item(model, frm, many=False,
- template="ishtar/forms/qa_new_item.html", page_name=""):
- def func(request, parent_name, limits=''):
+def new_qa_item(
+ model, frm, many=False, template="ishtar/forms/qa_new_item.html", page_name=""
+):
+ def func(request, parent_name, limits=""):
model_name = model._meta.object_name
not_permitted_msg = ugettext("Operation not permitted.")
- if not check_permission(request, 'add_' + model_name.lower()):
+ if not check_permission(request, "add_" + model_name.lower()):
return HttpResponse(not_permitted_msg)
slug = model.SLUG
if model.SLUG == "site":
@@ -156,29 +175,32 @@ def new_qa_item(model, frm, many=False,
url_slug = "new-" + slug
current_page_name = page_name[:]
if not current_page_name:
- current_page_name = _('New %s' % model_name.lower())
- dct = {'page_name': str(current_page_name),
- 'url': reverse(url_slug, args=[parent_name]),
- 'slug': slug,
- 'parent_name': parent_name,
- 'many': many}
- if request.method == 'POST':
- dct['form'] = frm(request.POST, limits=limits)
- if dct['form'].is_valid():
- new_item = dct['form'].save(request.user)
+ current_page_name = _("New %s" % model_name.lower())
+ dct = {
+ "page_name": str(current_page_name),
+ "url": reverse(url_slug, args=[parent_name]),
+ "slug": slug,
+ "parent_name": parent_name,
+ "many": many,
+ }
+ if request.method == "POST":
+ dct["form"] = frm(request.POST, limits=limits)
+ if dct["form"].is_valid():
+ new_item = dct["form"].save(request.user)
lbl = str(new_item)
if not lbl and hasattr(new_item, "_generate_cached_label"):
lbl = new_item._generate_cached_label()
- dct['new_item_label'] = lbl
- dct['new_item_pk'] = new_item.pk
- dct['parent_pk'] = parent_name
- if dct['parent_pk'] and '_select_' in dct['parent_pk']:
- parents = dct['parent_pk'].split('_')
- dct['parent_pk'] = "_".join([parents[0]] + parents[2:])
+ dct["new_item_label"] = lbl
+ dct["new_item_pk"] = new_item.pk
+ dct["parent_pk"] = parent_name
+ if dct["parent_pk"] and "_select_" in dct["parent_pk"]:
+ parents = dct["parent_pk"].split("_")
+ dct["parent_pk"] = "_".join([parents[0]] + parents[2:])
return render(request, template, dct)
else:
- dct['form'] = frm(limits=limits)
+ dct["form"] = frm(limits=limits)
return render(request, template, dct)
+
return func
@@ -186,8 +208,7 @@ def get_short_html_detail(model):
def func(request, pk):
model_name = model._meta.object_name
not_permitted_msg = ugettext("Operation not permitted.")
- if not check_permission(request, 'view_' + model_name.lower(),
- pk):
+ if not check_permission(request, "view_" + model_name.lower(), pk):
return HttpResponse(not_permitted_msg)
try:
item = model.objects.get(pk=pk)
@@ -195,6 +216,7 @@ def get_short_html_detail(model):
return HttpResponse(not_permitted_msg)
html = item.get_short_html_detail()
return HttpResponse(html)
+
return func
@@ -203,8 +225,7 @@ def modify_qa_item(model, frm):
template = "ishtar/forms/qa_new_item.html"
model_name = model._meta.object_name
not_permitted_msg = ugettext("Operation not permitted.")
- if not check_permission(request, 'change_' + model_name.lower(),
- pk):
+ if not check_permission(request, "change_" + model_name.lower(), pk):
return HttpResponse(not_permitted_msg)
slug = model.SLUG
if model.SLUG == "site":
@@ -214,43 +235,46 @@ def modify_qa_item(model, frm):
except model.DoesNotExist:
return HttpResponse(not_permitted_msg)
url_slug = "modify-" + slug
- dct = {'page_name': str(_('Modify a %s' % model_name.lower())),
- 'url': reverse(url_slug, args=[parent_name, pk]),
- 'slug': slug,
- "modify": True,
- 'parent_name': parent_name}
- if request.method == 'POST':
- dct['form'] = frm(request.POST)
- if dct['form'].is_valid():
- new_item = dct['form'].save(request.user, item)
+ dct = {
+ "page_name": str(_("Modify a %s" % model_name.lower())),
+ "url": reverse(url_slug, args=[parent_name, pk]),
+ "slug": slug,
+ "modify": True,
+ "parent_name": parent_name,
+ }
+ if request.method == "POST":
+ dct["form"] = frm(request.POST)
+ if dct["form"].is_valid():
+ new_item = dct["form"].save(request.user, item)
lbl = str(new_item)
if not lbl and hasattr(new_item, "_generate_cached_label"):
lbl = new_item._generate_cached_label()
- dct['new_item_label'] = lbl
- dct['new_item_pk'] = new_item.pk
- dct['parent_pk'] = parent_name
- if dct['parent_pk'] and '_select_' in dct['parent_pk']:
- parents = dct['parent_pk'].split('_')
- dct['parent_pk'] = "_".join([parents[0]] + parents[2:])
+ dct["new_item_label"] = lbl
+ dct["new_item_pk"] = new_item.pk
+ dct["parent_pk"] = parent_name
+ if dct["parent_pk"] and "_select_" in dct["parent_pk"]:
+ parents = dct["parent_pk"].split("_")
+ dct["parent_pk"] = "_".join([parents[0]] + parents[2:])
return render(request, template, dct)
else:
data = model_to_dict(item)
for k in list(data.keys()):
- if data[k] and isinstance(data[k], list) and hasattr(
- data[k][0], "pk"):
+ if data[k] and isinstance(data[k], list) and hasattr(data[k][0], "pk"):
data[k] = [i.pk for i in data[k]]
- dct['form'] = frm(initial=data)
+ dct["form"] = frm(initial=data)
return render(request, template, dct)
+
return func
def display_item(model, extra_dct=None, show_url=None):
def func(request, pk, **dct):
if show_url:
- dct['show_url'] = "/{}{}/".format(show_url, pk)
+ dct["show_url"] = "/{}{}/".format(show_url, pk)
else:
- dct['show_url'] = "/show-{}/{}/".format(model.SLUG, pk)
- return render(request, 'ishtar/display_item.html', dct)
+ dct["show_url"] = "/show-{}/{}/".format(model.SLUG, pk)
+ return render(request, "ishtar/display_item.html", dct)
+
return func
@@ -261,38 +285,43 @@ def show_item(model, name, extra_dct=None, model_for_perms=None):
check_model = model_for_perms
allowed, own = check_model_access_control(request, check_model)
if not allowed:
- return HttpResponse('', content_type="application/xhtml")
+ return HttpResponse("", content_type="application/xhtml")
q = model.objects
if own:
- if not hasattr(request.user, 'ishtaruser'):
- return HttpResponse('')
+ if not hasattr(request.user, "ishtaruser"):
+ return HttpResponse("")
query_own = model.get_query_owns(request.user.ishtaruser)
if query_own:
q = q.filter(query_own).distinct()
try:
item = q.get(pk=pk)
except (ObjectDoesNotExist, ValueError):
- return HttpResponse('')
- doc_type = 'type' in dct and dct.pop('type')
- url_name = "/".join(reverse('show-' + name, args=['0', '']
- ).split('/')[:-2]) + "/"
+ return HttpResponse("")
+ doc_type = "type" in dct and dct.pop("type")
+ url_name = (
+ "/".join(reverse("show-" + name, args=["0", ""]).split("/")[:-2]) + "/"
+ )
profile = get_current_profile()
- dct['PROFILE'] = profile
- dct['CURRENCY'] = profile.currency
- dct['ENCODING'] = settings.ENCODING
- dct['DOT_GENERATION'] = settings.DOT_BINARY and profile.relation_graph
- dct['current_window_url'] = url_name
+ dct["PROFILE"] = profile
+ dct["CURRENCY"] = profile.currency
+ dct["ENCODING"] = settings.ENCODING
+ dct["DOT_GENERATION"] = settings.DOT_BINARY and profile.relation_graph
+ dct["current_window_url"] = url_name
date = None
- if 'date' in dct:
- date = dct.pop('date')
- dct['sheet_id'] = "%s-%d" % (name, item.pk)
- dct['window_id'] = "%s-%d-%s" % (
- name, item.pk, datetime.datetime.now().strftime('%M%s'))
+ if "date" in dct:
+ date = dct.pop("date")
+ dct["sheet_id"] = "%s-%d" % (name, item.pk)
+ dct["window_id"] = "%s-%d-%s" % (
+ name,
+ item.pk,
+ datetime.datetime.now().strftime("%M%s"),
+ )
# list current perms
- if hasattr(request.user, 'ishtaruser') and request.user.ishtaruser:
+ if hasattr(request.user, "ishtaruser") and request.user.ishtaruser:
cache_key = "{}-{}-{}".format(
- settings.PROJECT_SLUG, "current-perms",
+ settings.PROJECT_SLUG,
+ "current-perms",
request.session.session_key,
)
permissions = cache.get(cache_key)
@@ -308,80 +337,96 @@ def show_item(model, name, extra_dct=None, model_for_perms=None):
for perm in permissions:
dct["permission_" + perm] = True
- if hasattr(item, 'history') and request.user.is_superuser:
+ if hasattr(item, "history") and request.user.is_superuser:
if date:
try:
- date = datetime.datetime.strptime(date,
- '%Y-%m-%dT%H:%M:%S.%f')
+ date = datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%f")
if item.get_last_history_date() != date:
item = item.get_previous(date=date)
assert item is not None
- dct['previous'] = item._previous
- dct['next'] = item._next
+ dct["previous"] = item._previous
+ dct["next"] = item._next
else:
date = None
except (ValueError, AssertionError):
- return HttpResponse('', content_type='text/plain')
+ return HttpResponse("", content_type="text/plain")
if not date:
historized = item.history.all()
if historized:
item.history_date = historized[0].history_date
if len(historized) > 1:
- dct['previous'] = historized[1].history_date
- if doc_type in ("odt", "pdf") and hasattr(item, 'qrcode') \
- and (not item.qrcode or not item.qrcode.name):
+ dct["previous"] = historized[1].history_date
+ if (
+ doc_type in ("odt", "pdf")
+ and hasattr(item, "qrcode")
+ and (not item.qrcode or not item.qrcode.name)
+ ):
item.generate_qrcode(request=request)
- dct['item'], dct['item_name'] = item, name
+ dct["item"], dct["item_name"] = item, name
# add context
if extra_dct:
dct.update(extra_dct(request, item))
context_instance = deepcopy(dct)
- context_instance['output'] = 'html'
- if hasattr(item, 'history_object'):
+ context_instance["output"] = "html"
+ if hasattr(item, "history_object"):
filename = item.history_object.associated_filename
else:
filename = item.associated_filename
if doc_type == "odt" and settings.ODT_TEMPLATE:
- tpl = loader.get_template('ishtar/sheet_%s.html' % name)
- context_instance['output'] = 'ODT'
+ tpl = loader.get_template("ishtar/sheet_%s.html" % name)
+ context_instance["output"] = "ODT"
content = tpl.render(context_instance, request)
- tidy_options = {'output-xhtml': 1, 'indent': 1,
- 'tidy-mark': 0, 'doctype': 'auto',
- 'add-xml-decl': 1, 'wrap': 1}
+ tidy_options = {
+ "output-xhtml": 1,
+ "indent": 1,
+ "tidy-mark": 0,
+ "doctype": "auto",
+ "add-xml-decl": 1,
+ "wrap": 1,
+ }
html, errors = tidy(content, options=tidy_options)
html = html.replace("&nbsp;", "&#160;")
- html = re.sub('<pre([^>]*)>\n', '<pre\\1>', html)
+ html = re.sub("<pre([^>]*)>\n", "<pre\\1>", html)
odt = NamedTemporaryFile()
html_source = NamedTemporaryFile()
- with open(html_source.name, 'w') as html_file:
+ with open(html_source.name, "w") as html_file:
html_file.write(html)
- pandoc_args = ["pandoc", "-f", "html", "-t", "odt",
- "-o", odt.name, html_source.name]
+ pandoc_args = [
+ "pandoc",
+ "-f",
+ "html",
+ "-t",
+ "odt",
+ "-o",
+ odt.name,
+ html_source.name,
+ ]
try:
- subprocess.check_call(pandoc_args, stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL)
+ subprocess.check_call(
+ pandoc_args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
+ )
except subprocess.CalledProcessError:
- return HttpResponse(content,
- content_type="application/xhtml")
+ return HttpResponse(content, content_type="application/xhtml")
response = HttpResponse(
- content_type='application/vnd.oasis.opendocument.text')
- response['Content-Disposition'] = \
- 'attachment; filename={}.odt'.format(filename)
- with open(odt.name, 'rb') as odt_file:
+ content_type="application/vnd.oasis.opendocument.text"
+ )
+ response["Content-Disposition"] = "attachment; filename={}.odt".format(
+ filename
+ )
+ with open(odt.name, "rb") as odt_file:
response.write(odt_file.read())
return response
- elif doc_type == 'pdf':
- base_url = "/".join(
- request.build_absolute_uri().split("/")[0:3]
- )
+ elif doc_type == "pdf":
+ base_url = "/".join(request.build_absolute_uri().split("/")[0:3])
- tpl = loader.get_template('ishtar/sheet_%s_pdf.html' % name)
- context_instance['output'] = 'PDF'
+ tpl = loader.get_template("ishtar/sheet_%s_pdf.html" % name)
+ context_instance["output"] = "PDF"
html = tpl.render(context_instance, request)
font_config = FontConfiguration()
- css = CSS(string='''
+ css = CSS(
+ string="""
@font-face {
font-family: Gentium;
src: url(%s);
@@ -389,20 +434,21 @@ def show_item(model, name, extra_dct=None, model_for_perms=None):
body{
font-family: Gentium
}
- ''' % (base_url + static("gentium/GentiumPlus-R.ttf")))
- css2 = CSS(filename=settings.STATIC_ROOT + '/media/style_basic.css')
- pdf = HTML(
- string=html, base_url=base_url
- ).write_pdf(
- stylesheets=[css, css2], font_config=font_config)
- response = HttpResponse(pdf, content_type='application/pdf')
- response['Content-Disposition'] = 'attachment; filename=%s.pdf' % \
- filename
+ """
+ % (base_url + static("gentium/GentiumPlus-R.ttf"))
+ )
+ css2 = CSS(filename=settings.STATIC_ROOT + "/media/style_basic.css")
+ pdf = HTML(string=html, base_url=base_url).write_pdf(
+ stylesheets=[css, css2], font_config=font_config
+ )
+ response = HttpResponse(pdf, content_type="application/pdf")
+ response["Content-Disposition"] = "attachment; filename=%s.pdf" % filename
return response
else:
- tpl = loader.get_template('ishtar/sheet_%s_window.html' % name)
+ tpl = loader.get_template("ishtar/sheet_%s_window.html" % name)
content = tpl.render(context_instance, request)
return HttpResponse(content, content_type="application/xhtml")
+
return func
@@ -410,21 +456,29 @@ def revert_item(model):
def func(request, pk, date, **dct):
try:
item = model.objects.get(pk=pk)
- date = datetime.datetime.strptime(date, '%Y-%m-%dT%H:%M:%S.%f')
+ date = datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%f")
item.rollback(date)
except (ObjectDoesNotExist, ValueError, HistoryError):
- return HttpResponse(None, content_type='text/plain')
- return HttpResponse("True", content_type='text/plain')
+ return HttpResponse(None, content_type="text/plain")
+ return HttpResponse("True", content_type="text/plain")
+
return func
HIERARCHIC_LEVELS = 5
-HIERARCHIC_FIELDS = ['periods', 'period', 'unit', 'material_types',
- 'material_type', 'conservatory_state', 'object_types']
+HIERARCHIC_FIELDS = [
+ "periods",
+ "period",
+ "unit",
+ "material_types",
+ "material_type",
+ "conservatory_state",
+ "object_types",
+]
def _get_values(request, val):
- if hasattr(val, 'all'): # manage related objects
+ if hasattr(val, "all"): # manage related objects
vals = list(val.all())
else:
vals = [val]
@@ -433,9 +487,13 @@ def _get_values(request, val):
if callable(v):
v = v()
try:
- if hasattr(v, 'url') and v.url:
- v = (request.is_secure() and
- 'https' or 'http') + '://' + request.get_host() + v.url
+ if hasattr(v, "url") and v.url:
+ v = (
+ (request.is_secure() and "https" or "http")
+ + "://"
+ + request.get_host()
+ + v.url
+ )
except ValueError:
pass
new_vals.append(v)
@@ -453,8 +511,11 @@ def _push_to_list(obj, current_group, depth):
except IndexError:
# tolerant to parentheses mismatch
pass
- if current_group and type(obj) in (str, str) and \
- type(current_group[-1]) in (str, str):
+ if (
+ current_group
+ and type(obj) in (str, str)
+ and type(current_group[-1]) in (str, str)
+ ):
current_group[-1] += obj
else:
current_group.append(obj)
@@ -488,10 +549,10 @@ def _parse_parentheses(s):
if char == '"':
inside_quote = not inside_quote
if not inside_quote:
- if char == '(':
+ if char == "(":
_push_to_list([], groups, depth)
depth += 1
- elif char == ')':
+ elif char == ")":
if depth > 0:
depth -= 1
else:
@@ -507,8 +568,9 @@ RESERVED_CHAR = ["|", "&"]
RE_FACET = re.compile('([-a-zA-Z]+)="([^"]+)"(?:;"([^"]+)")*')
-def _parse_query_string(string, query_parameters, current_dct, exc_dct,
- extra_distinct_q):
+def _parse_query_string(
+ string, query_parameters, current_dct, exc_dct, extra_distinct_q
+):
string = string.strip().lower()
match = RE_FACET.search(string)
@@ -541,21 +603,20 @@ def _parse_query_string(string, query_parameters, current_dct, exc_dct,
is_true = not is_true
cfltr, cexclude, cextra = term(is_true=is_true)
if cfltr:
- if 'and_reqs' not in dct:
- dct['and_reqs'] = []
- dct['and_reqs'].append(cfltr)
+ if "and_reqs" not in dct:
+ dct["and_reqs"] = []
+ dct["and_reqs"].append(cfltr)
if cexclude:
- if 'exc_and_reqs' not in dct:
- dct['exc_and_reqs'] = []
- dct['exc_and_reqs'].append(cexclude)
+ if "exc_and_reqs" not in dct:
+ dct["exc_and_reqs"] = []
+ dct["exc_and_reqs"].append(cexclude)
if cextra:
- dct['extras'].append(cextra)
+ dct["extras"].append(cextra)
else:
if query_parameters[base_term].distinct_query:
extra_distinct_q.append({})
dct = extra_distinct_q[-1]
- if not query_parameters[base_term].distinct_query and \
- excluded:
+ if not query_parameters[base_term].distinct_query and excluded:
dct = exc_dct
if query_parameters[base_term].extra_query:
dct.update(query_parameters[base_term].extra_query)
@@ -565,11 +626,10 @@ def _parse_query_string(string, query_parameters, current_dct, exc_dct,
dct[term] = query
if query_parameters[base_term].distinct_query:
for k in dct: # clean "
- dct[k] = dct[k].replace('"', '')
+ dct[k] = dct[k].replace('"', "")
# distinct query wait for a query
_manage_clean_search_field(dct)
- extra_distinct_q[-1] = \
- ~Q(**dct) if excluded else Q(**dct)
+ extra_distinct_q[-1] = ~Q(**dct) if excluded else Q(**dct)
return ""
for reserved_char in FORBIDDEN_CHAR:
string = string.replace(reserved_char, "")
@@ -578,22 +638,23 @@ def _parse_query_string(string, query_parameters, current_dct, exc_dct,
string = string.replace(reserved_char, "")
if not string:
return ""
- if string.endswith('*'):
+ if string.endswith("*"):
if len(string.strip()) == 1:
return ""
- string = string[:-1] + ':*'
+ string = string[:-1] + ":*"
elif string not in ("&", "|", "!", "-"):
# like search by default
- string = string + ':*'
- if string.startswith('-'):
+ string = string + ":*"
+ if string.startswith("-"):
if len(string.strip()) == 1:
return ""
string = "!" + string[1:]
return string
-def _parse_parentheses_groups(groups, query_parameters, current_dct=None,
- exc_dct=None, extra_distinct_q=None):
+def _parse_parentheses_groups(
+ groups, query_parameters, current_dct=None, exc_dct=None, extra_distinct_q=None
+):
"""
Transform parentheses groups to query
@@ -612,8 +673,7 @@ def _parse_parentheses_groups(groups, query_parameters, current_dct=None,
extra_distinct_q = []
if type(groups) is not list:
string = groups.strip()
- if string.startswith('"') and string.endswith('"') and \
- string.count('"') == 2:
+ if string.startswith('"') and string.endswith('"') and string.count('"') == 2:
string = string[1:-1]
# split into many groups if spaces
@@ -624,9 +684,11 @@ def _parse_parentheses_groups(groups, query_parameters, current_dct=None,
previous_quote = None
while found != -1:
if previous_quote is not None:
- string = string[0:previous_quote] + \
- string[previous_quote:found].replace(' ', SEP) + \
- string[found:]
+ string = (
+ string[0:previous_quote]
+ + string[previous_quote:found].replace(" ", SEP)
+ + string[found:]
+ )
previous_quote = None
# SEP is larger than a space
found = string.find('"', current_index)
@@ -637,20 +699,29 @@ def _parse_parentheses_groups(groups, query_parameters, current_dct=None,
string_groups = [gp.replace(SEP, " ") for gp in string.split(" ")]
if len(string_groups) == 1:
- return _parse_query_string(
- string_groups[0], query_parameters, current_dct, exc_dct,
- extra_distinct_q
- ), current_dct, exc_dct, extra_distinct_q
+ return (
+ _parse_query_string(
+ string_groups[0],
+ query_parameters,
+ current_dct,
+ exc_dct,
+ extra_distinct_q,
+ ),
+ current_dct,
+ exc_dct,
+ extra_distinct_q,
+ )
return _parse_parentheses_groups(
- string_groups, query_parameters, current_dct, exc_dct,
- extra_distinct_q)
+ string_groups, query_parameters, current_dct, exc_dct, extra_distinct_q
+ )
if not groups: # empty list
return "", current_dct, exc_dct, extra_distinct_q
query = "("
previous_sep, has_item = None, False
for item in groups:
q, current_dct, exc_dct, extra_distinct_q = _parse_parentheses_groups(
- item, query_parameters, current_dct, exc_dct, extra_distinct_q)
+ item, query_parameters, current_dct, exc_dct, extra_distinct_q
+ )
q = q.strip()
if not q:
continue
@@ -671,16 +742,22 @@ def _parse_parentheses_groups(groups, query_parameters, current_dct=None,
return query, current_dct, exc_dct, extra_distinct_q
-def _search_manage_search_vector(model, dct, exc_dct, distinct_queries,
- query_parameters):
- if 'search_vector' not in dct \
- or not model._meta.managed: # is a view - no search_vector
+def _search_manage_search_vector(
+ model, dct, exc_dct, distinct_queries, query_parameters
+):
+ if (
+ "search_vector" not in dct or not model._meta.managed
+ ): # is a view - no search_vector
return dct, exc_dct, distinct_queries
- search_vector = dct['search_vector']
+ search_vector = dct["search_vector"]
parentheses_groups = _parse_parentheses(search_vector)
- search_query, extra_dct, extra_exc_dct, extra_distinct_q = \
- _parse_parentheses_groups(parentheses_groups, query_parameters)
+ (
+ search_query,
+ extra_dct,
+ extra_exc_dct,
+ extra_distinct_q,
+ ) = _parse_parentheses_groups(parentheses_groups, query_parameters)
dct.update(extra_dct)
distinct_queries += extra_distinct_q
@@ -688,15 +765,18 @@ def _search_manage_search_vector(model, dct, exc_dct, distinct_queries,
if not search_query:
return dct, exc_dct, distinct_queries
# remove inside parenthesis
- search_query = search_query.replace('(', '').replace(')', '').strip()
+ search_query = search_query.replace("(", "").replace(")", "").strip()
if search_query:
- if 'extras' not in dct:
- dct['extras'] = []
- dct['extras'].append(
- {'where': [model._meta.db_table +
- ".search_vector @@ (to_tsquery(%s, %s)) = true"],
- 'params': [settings.ISHTAR_SEARCH_LANGUAGE,
- search_query]}
+ if "extras" not in dct:
+ dct["extras"] = []
+ dct["extras"].append(
+ {
+ "where": [
+ model._meta.db_table
+ + ".search_vector @@ (to_tsquery(%s, %s)) = true"
+ ],
+ "params": [settings.ISHTAR_SEARCH_LANGUAGE, search_query],
+ }
)
return dct, exc_dct, distinct_queries
@@ -709,7 +789,7 @@ def _manage_bool_fields(model, bool_fields, reversed_bool_fields, dct, or_reqs):
elif dct[k] == "1":
dct.pop(k)
continue
- dct[k] = dct[k].replace('"', '')
+ dct[k] = dct[k].replace('"', "")
if dct[k] in ["2", "yes", str(_("Yes")).lower(), "True"]:
dct[k] = True
else:
@@ -717,7 +797,7 @@ def _manage_bool_fields(model, bool_fields, reversed_bool_fields, dct, or_reqs):
if k in reversed_bool_fields:
dct[k] = not dct[k]
# check also for empty value with image field
- field_names = k.split('__')
+ field_names = k.split("__")
# TODO: can be improved in later version of Django
try:
c_field = model._meta.get_field(field_names[0])
@@ -725,15 +805,15 @@ def _manage_bool_fields(model, bool_fields, reversed_bool_fields, dct, or_reqs):
if not hasattr(c_field, "related_model"):
return
c_field = c_field.related_model._meta.get_field(field_name)
- if k.endswith('__isnull') and \
- (isinstance(c_field, (ImageField, FileField))
- or field_names[-2] == "associated_url"):
- key = "__".join(k.split('__')[:-1])
+ if k.endswith("__isnull") and (
+ isinstance(c_field, (ImageField, FileField))
+ or field_names[-2] == "associated_url"
+ ):
+ key = "__".join(k.split("__")[:-1])
if dct[k]:
- or_reqs.append(
- (k, {key + '__exact': ''}))
+ or_reqs.append((k, {key + "__exact": ""}))
else:
- dct[key + '__regex'] = '.{1}.*'
+ dct[key + "__regex"] = ".{1}.*"
except FieldDoesNotExist:
pass
@@ -746,7 +826,7 @@ def _manage_many_counted_fields(fields, reversed_fields, dct, excluded_dct):
elif dct[k] == "1":
dct.pop(k)
continue
- dct[k] = dct[k].replace('"', '')
+ dct[k] = dct[k].replace('"', "")
dct[k] = True if dct[k] in ["2", "yes", str(_("Yes")).lower()] else None
if reversed_fields and k in reversed_fields:
dct[k] = True if not dct[k] else None
@@ -758,7 +838,7 @@ def _manage_many_counted_fields(fields, reversed_fields, dct, excluded_dct):
today_lbl = pgettext_lazy("key for text search", "today")
-TODAYS = ['today']
+TODAYS = ["today"]
for language_code, language_lbl in settings.LANGUAGES:
activate(language_code)
@@ -776,12 +856,12 @@ def _manage_dated_fields(dated_fields, dct):
if not dct[k]:
dct.pop(k)
continue
- value = dct[k].replace('"', '').strip()
+ value = dct[k].replace('"', "").strip()
has_today = False
for today in TODAYS:
if value.startswith(today):
base_date = datetime.date.today()
- value = value[len(today):].replace(' ', '')
+ value = value[len(today) :].replace(" ", "")
if value and value[0] in ("-", "+"):
sign = value[0]
try:
@@ -790,27 +870,26 @@ def _manage_dated_fields(dated_fields, dct):
days = 0
if days:
if sign == "-":
- base_date = base_date - datetime.timedelta(
- days=days)
+ base_date = base_date - datetime.timedelta(days=days)
else:
- base_date = base_date + datetime.timedelta(
- days=days)
- dct[k] = base_date.strftime('%Y-%m-%d')
+ base_date = base_date + datetime.timedelta(days=days)
+ dct[k] = base_date.strftime("%Y-%m-%d")
has_today = True
break
if has_today:
continue
items = []
if "/" in value:
- items = list(reversed(value.split('/')))
+ items = list(reversed(value.split("/")))
elif "-" in value: # already date formated
- items = value.split('-')
+ items = value.split("-")
if len(items) != 3:
dct.pop(k)
return
try:
- dct[k] = datetime.datetime(
- *map(lambda x: int(x), items)).strftime('%Y-%m-%d')
+ dct[k] = datetime.datetime(*map(lambda x: int(x), items)).strftime(
+ "%Y-%m-%d"
+ )
except ValueError:
dct.pop(k)
@@ -838,8 +917,7 @@ def _manage_facet_search(model, dct, and_reqs):
k = base_k
else:
k = base_k + "__pk"
- if k not in dct or not dct[k].startswith('"') \
- or not dct[k].startswith('"'):
+ if k not in dct or not dct[k].startswith('"') or not dct[k].startswith('"'):
continue
val = _clean_type_val(dct.pop(k))
if '";"' in val:
@@ -858,9 +936,12 @@ def _manage_facet_search(model, dct, and_reqs):
lbl_name = "__cached_label__"
except:
pass
- suffix = "{}icontains".format(lbl_name) if "%" in val else \
- "{}iexact".format(lbl_name)
- query = val[1:-1].replace('*', "")
+ suffix = (
+ "{}icontains".format(lbl_name)
+ if "%" in val
+ else "{}iexact".format(lbl_name)
+ )
+ query = val[1:-1].replace("*", "")
if not reqs:
reqs = Q(**{base_k + suffix: query})
else:
@@ -868,13 +949,12 @@ def _manage_facet_search(model, dct, and_reqs):
if reqs:
and_reqs.append(reqs)
- POST_PROCESS_REQUEST = getattr(model, 'POST_PROCESS_REQUEST', None)
+ POST_PROCESS_REQUEST = getattr(model, "POST_PROCESS_REQUEST", None)
if not POST_PROCESS_REQUEST:
return
for k in dct:
if k in POST_PROCESS_REQUEST and dct[k]:
- dct[k] = getattr(model, POST_PROCESS_REQUEST[k])(
- dct[k].replace('"', ''))
+ dct[k] = getattr(model, POST_PROCESS_REQUEST[k])(dct[k].replace('"', ""))
def _manage_hierarchic_fields(model, dct, and_reqs):
@@ -885,12 +965,11 @@ def _manage_hierarchic_fields(model, dct, and_reqs):
if type(reqs) not in (list, tuple):
reqs = [reqs]
for req in reqs:
- if req.endswith('areas__pk') \
- or req.endswith('areas__label__iexact'):
- if req.endswith('pk'):
- suffix = 'pk'
- elif req.endswith('label__iexact'):
- suffix = 'label__iexact'
+ if req.endswith("areas__pk") or req.endswith("areas__label__iexact"):
+ if req.endswith("pk"):
+ suffix = "pk"
+ elif req.endswith("label__iexact"):
+ suffix = "label__iexact"
else:
continue
@@ -900,38 +979,41 @@ def _manage_hierarchic_fields(model, dct, and_reqs):
reqs = Q(**{req: val})
for idx in range(HIERARCHIC_LEVELS):
- req = req[:-(len(suffix))] + 'parent__' + suffix
+ req = req[: -(len(suffix))] + "parent__" + suffix
q = Q(**{req: val})
reqs |= q
and_reqs.append(reqs)
# TODO: improve query with "IN ()"?
continue
- if req.endswith('town__pk') or req.endswith('towns__pk') \
- or req.endswith('town__cached_label__iexact') \
- or req.endswith('towns__cached_label__iexact'):
-
- if req.endswith('pk'):
- suffix = 'pk'
- elif req.endswith('cached_label__iexact'):
- suffix = 'cached_label__iexact'
+ if (
+ req.endswith("town__pk")
+ or req.endswith("towns__pk")
+ or req.endswith("town__cached_label__iexact")
+ or req.endswith("towns__cached_label__iexact")
+ ):
+
+ if req.endswith("pk"):
+ suffix = "pk"
+ elif req.endswith("cached_label__iexact"):
+ suffix = "cached_label__iexact"
else:
continue
val = _clean_type_val(dct.pop(req)).strip('"')
if val.startswith('"') and val.endswith('"'):
val = val[1:-1]
- vals = [v.replace('"', '') for v in val.split(';')]
+ vals = [v.replace('"', "") for v in val.split(";")]
main_req = None
for val in vals:
reqs = Q(**{req: val})
nreq = base_req = req[:]
for idx in range(HIERARCHIC_LEVELS):
- nreq = nreq[:-(len(suffix))] + 'parents__' + suffix
+ nreq = nreq[: -(len(suffix))] + "parents__" + suffix
q = Q(**{nreq: val})
reqs |= q
nreq = base_req[:]
for idx in range(HIERARCHIC_LEVELS):
- nreq = nreq[:-(len(suffix))] + 'children__' + suffix
+ nreq = nreq[: -(len(suffix))] + "children__" + suffix
q = Q(**{nreq: val})
reqs |= q
if not main_req:
@@ -946,8 +1028,7 @@ def _manage_hierarchic_fields(model, dct, and_reqs):
lbl_name = "label"
try:
rel = getattr(model, k_hr).field.related_model
- if not hasattr(rel, "label") and hasattr(rel,
- "cached_label"):
+ if not hasattr(rel, "label") and hasattr(rel, "cached_label"):
lbl_name = "cached_label"
except:
pass
@@ -963,8 +1044,9 @@ def _manage_hierarchic_fields(model, dct, and_reqs):
q |= Q(**{r: val})
and_reqs.append(q)
break
- elif req.endswith(k_hr + '__pk') \
- or req.endswith(k_hr + '__{}__iexact'.format(lbl_name)):
+ elif req.endswith(k_hr + "__pk") or req.endswith(
+ k_hr + "__{}__iexact".format(lbl_name)
+ ):
val = _clean_type_val(dct.pop(req))
if '";"' in val:
@@ -974,9 +1056,9 @@ def _manage_hierarchic_fields(model, dct, and_reqs):
values = [val]
base_req = req[:]
reqs = None
- if req.endswith('pk'):
+ if req.endswith("pk"):
base_suffix = "pk"
- elif req.endswith('{}__iexact'.format(lbl_name)):
+ elif req.endswith("{}__iexact".format(lbl_name)):
base_suffix = lbl_name + "__iexact"
else:
continue
@@ -988,17 +1070,17 @@ def _manage_hierarchic_fields(model, dct, and_reqs):
# manage search text by label
if "*" in val:
suffix = lbl_name + "__icontains"
- val = val.replace('*', "")
+ val = val.replace("*", "")
else:
suffix = lbl_name + "__iexact"
- req = req[:-(len(base_suffix))] + suffix
+ req = req[: -(len(base_suffix))] + suffix
if not reqs:
reqs = Q(**{req: val})
else:
reqs |= Q(**{req: val})
for idx in range(HIERARCHIC_LEVELS):
- req = req[:-(len(suffix))] + 'parent__' + suffix
+ req = req[: -(len(suffix))] + "parent__" + suffix
q = Q(**{req: val})
reqs |= q
# TODO: improve query with "IN ()"?
@@ -1012,20 +1094,20 @@ def _manage_clean_search_field(dct, exclude=None):
# clean quoted search field
if type(dct[k]) != str:
continue
- dct[k] = dct[k].replace('"', '')
+ dct[k] = dct[k].replace('"', "")
dct[k] = _clean_type_val(dct[k])
- if '*' not in dct[k] or not k.endswith('__iexact'):
+ if "*" not in dct[k] or not k.endswith("__iexact"):
continue
value = dct.pop(k).strip()
if value.startswith("*"):
value = value[1:]
if value.endswith("*"):
value = value[:-1]
- base_key = k[:-len('__iexact')]
+ base_key = k[: -len("__iexact")]
if value:
- dct[base_key + '__icontains'] = value
+ dct[base_key + "__icontains"] = value
elif exclude is not None:
- exclude[base_key + '__exact'] = ""
+ exclude[base_key + "__exact"] = ""
def _manage_relation_types(relation_types, dct, query, or_reqs):
@@ -1033,11 +1115,12 @@ def _manage_relation_types(relation_types, dct, query, or_reqs):
vals = relation_types[rtype_prefix]
if not vals:
continue
- vals = list(vals)[0].split(';')
+ vals = list(vals)[0].split(";")
for v in vals:
alt_dct = {
- rtype_prefix + 'right_relations__relation_type__label__iexact':
- v.replace('"', '')}
+ rtype_prefix
+ + "right_relations__relation_type__label__iexact": v.replace('"', "")
+ }
for k in dct:
val = dct[k]
if rtype_prefix:
@@ -1046,12 +1129,13 @@ def _manage_relation_types(relation_types, dct, query, or_reqs):
continue
# tricky: reconstruct the key to make sense - remove the
# prefix from the key
- k = k[0:k.index(rtype_prefix)] + \
- k[k.index(rtype_prefix) + len(rtype_prefix):]
- if k.endswith('year'):
- k += '__exact'
- alt_dct[rtype_prefix + 'right_relations__right_record__' + k] = \
- val
+ k = (
+ k[0 : k.index(rtype_prefix)]
+ + k[k.index(rtype_prefix) + len(rtype_prefix) :]
+ )
+ if k.endswith("year"):
+ k += "__exact"
+ alt_dct[rtype_prefix + "right_relations__right_record__" + k] = val
if not dct:
# fake condition to trick Django (1.4): without it only the
# alt_dct is managed
@@ -1062,11 +1146,9 @@ def _manage_relation_types(relation_types, dct, query, or_reqs):
altor_dct.pop(k)
for j in or_req:
val = or_req[j]
- if j == 'year':
- j = 'year__exact'
- altor_dct[
- rtype_prefix + 'right_relations__right_record__' + j] = \
- val
+ if j == "year":
+ j = "year__exact"
+ altor_dct[rtype_prefix + "right_relations__right_record__" + j] = val
query |= Q(**altor_dct)
return query
@@ -1076,7 +1158,7 @@ def _construct_query(relation_types, dct, or_reqs, and_reqs):
# manage multi value not already managed
for key in list(dct.keys()):
if type(dct[key]) == str and ";" in dct[key]:
- values = [v for v in dct[key].split(';') if v]
+ values = [v for v in dct[key].split(";") if v]
if not values:
dct.pop(key)
continue
@@ -1084,9 +1166,7 @@ def _construct_query(relation_types, dct, or_reqs, and_reqs):
if len(values) == 1:
continue
for v in values[1:]:
- or_reqs.append(
- (key, {key: v})
- )
+ or_reqs.append((key, {key: v}))
for k in list(dct.keys()):
if type(k) not in (list, tuple):
@@ -1116,44 +1196,45 @@ def _construct_query(relation_types, dct, or_reqs, and_reqs):
return query
-def _manage_default_search(dct, request, model, default_name, my_base_request,
- my_relative_session_names):
+def _manage_default_search(
+ dct, request, model, default_name, my_base_request, my_relative_session_names
+):
pinned_search = ""
pin_key = "pin-search-" + default_name
- if pin_key in request.session and \
- request.session[pin_key]: # a search is pinned
+ if pin_key in request.session and request.session[pin_key]: # a search is pinned
pinned_search = request.session[pin_key]
- dct = {'search_vector': request.session[pin_key]}
- elif default_name in request.session and \
- request.session[default_name]: # an item is pinned
+ dct = {"search_vector": request.session[pin_key]}
+ elif (
+ default_name in request.session and request.session[default_name]
+ ): # an item is pinned
value = request.session[default_name]
- if 'basket-' in value:
+ if "basket-" in value:
try:
- dct = {
- "basket__pk": request.session[default_name].split('-')[-1]}
- pinned_search = str(FindBasket.objects.get(
- pk=dct["basket__pk"]))
+ dct = {"basket__pk": request.session[default_name].split("-")[-1]}
+ pinned_search = str(FindBasket.objects.get(pk=dct["basket__pk"]))
except FindBasket.DoesNotExist:
pass
else:
try:
dct = {"pk": request.session[default_name]}
- pinned_search = '"{}"'.format(
- model.objects.get(pk=dct["pk"])
- )
+ pinned_search = '"{}"'.format(model.objects.get(pk=dct["pk"]))
except model.DoesNotExist:
pass
elif dct == (my_base_request or {}):
- if not hasattr(model, 'UP_MODEL_QUERY'):
+ if not hasattr(model, "UP_MODEL_QUERY"):
logger.warning(
"**WARN get_item**: - UP_MODEL_QUERY not defined for "
- "'{}'".format(model))
+ "'{}'".format(model)
+ )
else:
# a parent item may be selected in the default menu
for name, key in my_relative_session_names:
- if name in request.session and request.session[name] \
- and 'basket-' not in request.session[name] \
- and name in CURRENT_ITEM_KEYS_DICT:
+ if (
+ name in request.session
+ and request.session[name]
+ and "basket-" not in request.session[name]
+ and name in CURRENT_ITEM_KEYS_DICT
+ ):
up_model = CURRENT_ITEM_KEYS_DICT[name]
try:
dct.update({key: request.session[name]})
@@ -1161,15 +1242,12 @@ def _manage_default_search(dct, request, model, default_name, my_base_request,
if up_item.SLUG not in model.UP_MODEL_QUERY:
logger.warning(
"**WARN get_item**: - {} not in "
- "UP_MODEL_QUERY for {}'".format(
- up_item.SLUG,
- model))
+ "UP_MODEL_QUERY for {}'".format(up_item.SLUG, model)
+ )
else:
- req_key, up_attr = model.UP_MODEL_QUERY[
- up_item.SLUG]
+ req_key, up_attr = model.UP_MODEL_QUERY[up_item.SLUG]
pinned_search = '{}="{}"'.format(
- req_key,
- getattr(up_item, up_attr)
+ req_key, getattr(up_item, up_attr)
)
break
except up_model.DoesNotExist:
@@ -1190,39 +1268,30 @@ def _format_val(val):
def _format_geojson(rows, link_template):
data = {
- 'type': 'FeatureCollection',
- 'crs': {
- 'type': 'name',
- 'properties': {
- 'name': 'EPSG:4326'
- }
- },
- 'link_template': link_template,
- 'features': [],
- 'no-geo': []
+ "type": "FeatureCollection",
+ "crs": {"type": "name", "properties": {"name": "EPSG:4326"}},
+ "link_template": link_template,
+ "features": [],
+ "no-geo": [],
}
if not rows:
return data
for row in rows:
- feat = {'id': row[0], 'name': row[1]}
+ feat = {"id": row[0], "name": row[1]}
x, y = row[2], row[3]
if not x or not y or x < -180 or x > 180 or y < -90 or y > 90:
- data['no-geo'].append(feat)
+ data["no-geo"].append(feat)
continue
feature = {
- 'type': 'Feature',
- 'properties': feat,
- 'geometry': {
- 'type': 'Point',
- 'coordinates': [x, y]
- }
+ "type": "Feature",
+ "properties": feat,
+ "geometry": {"type": "Point", "coordinates": [x, y]},
}
- data['features'].append(feature)
+ data["features"].append(feature)
return data
-def _get_data_from_query(items, query_table_cols, extra_request_keys,
- point_field=None):
+def _get_data_from_query(items, query_table_cols, extra_request_keys, point_field=None):
for query_keys in query_table_cols:
if not isinstance(query_keys, (tuple, list)):
query_keys = [query_keys]
@@ -1233,39 +1302,43 @@ def _get_data_from_query(items, query_table_cols, extra_request_keys,
# only manage one level for display
query_key = query_key[0]
# clean
- for filtr in ('__icontains', '__contains', '__iexact',
- '__exact'):
+ for filtr in ("__icontains", "__contains", "__iexact", "__exact"):
if query_key.endswith(filtr):
- query_key = query_key[:len(query_key) - len(filtr)]
+ query_key = query_key[: len(query_key) - len(filtr)]
query_key.replace(".", "__") # class style to query
- values = ['id'] + query_table_cols
+ values = ["id"] + query_table_cols
if point_field:
profile = get_current_profile()
precision = profile.point_precision
if precision is not None:
exp_x = ExpressionWrapper(
- Round(Func(point_field, function='ST_X'), precision),
- output_field=FloatField())
+ Round(Func(point_field, function="ST_X"), precision),
+ output_field=FloatField(),
+ )
exp_y = ExpressionWrapper(
- Round(Func(point_field, function='ST_Y'), precision),
- output_field=FloatField())
+ Round(Func(point_field, function="ST_Y"), precision),
+ output_field=FloatField(),
+ )
else:
exp_x = ExpressionWrapper(
- Func(point_field, function='ST_X'), output_field=FloatField())
+ Func(point_field, function="ST_X"), output_field=FloatField()
+ )
exp_y = ExpressionWrapper(
- Func(point_field, function='ST_Y'), output_field=FloatField())
+ Func(point_field, function="ST_Y"), output_field=FloatField()
+ )
items = items.annotate(point_x=exp_x)
items = items.annotate(point_y=exp_y)
- values += ['point_x', 'point_y']
+ values += ["point_x", "point_y"]
if hasattr(items.model, "locked"):
values.append("locked")
values.append("lock_user_id")
return items.values_list(*values)
-def _get_data_from_query_old(items, query_table_cols, request,
- extra_request_keys, do_not_deduplicate=False):
+def _get_data_from_query_old(
+ items, query_table_cols, request, extra_request_keys, do_not_deduplicate=False
+):
c_ids, datas = [], []
has_lock = items and hasattr(items[0], "locked")
@@ -1284,22 +1357,21 @@ def _get_data_from_query_old(items, query_table_cols, request,
k = extra_request_keys[k]
if type(k) in (list, tuple):
k = k[0]
- for filtr in ('__icontains', '__contains', '__iexact',
- '__exact'):
+ for filtr in ("__icontains", "__contains", "__iexact", "__exact"):
if k.endswith(filtr):
- k = k[:len(k) - len(filtr)]
+ k = k[: len(k) - len(filtr)]
vals = [item]
# foreign key may be divided by "." or "__"
splitted_k = []
- for ky in k.split('.'):
- if '__' in ky:
- splitted_k += ky.split('__')
+ for ky in k.split("."):
+ if "__" in ky:
+ splitted_k += ky.split("__")
else:
splitted_k.append(ky)
for ky in splitted_k:
new_vals = []
for val in vals:
- if hasattr(val, 'all'): # manage related objects
+ if hasattr(val, "all"): # manage related objects
val = list(val.all())
for v in val:
v = getattr(v, ky)
@@ -1313,7 +1385,7 @@ def _get_data_from_query_old(items, query_table_cols, request,
pass
vals = new_vals
# manage last related objects
- if vals and hasattr(vals[0], 'all'):
+ if vals and hasattr(vals[0], "all"):
new_vals = []
for val in vals:
new_vals += list(val.all())
@@ -1324,12 +1396,12 @@ def _get_data_from_query_old(items, query_table_cols, request,
new_vals = []
if not vals:
for idx, my_v in enumerate(my_vals):
- new_vals.append("{}{}{}".format(
- my_v, ' - ', ''))
+ new_vals.append("{}{}{}".format(my_v, " - ", ""))
else:
for idx, v in enumerate(vals):
- new_vals.append("{}{}{}".format(
- vals[idx], ' - ', _format_val(v)))
+ new_vals.append(
+ "{}{}{}".format(vals[idx], " - ", _format_val(v))
+ )
my_vals = new_vals[:]
data.append(" & ".join(my_vals) or "")
if has_lock:
@@ -1347,14 +1419,15 @@ def _format_modality(value):
return value
-def _get_json_stats(items, stats_sum_variable, stats_modality_1,
- stats_modality_2, multiply=1):
+def _get_json_stats(
+ items, stats_sum_variable, stats_modality_1, stats_modality_2, multiply=1
+):
if stats_modality_2:
q = items.values(stats_modality_1, stats_modality_2)
else:
q = items.values(stats_modality_1)
- if stats_sum_variable == 'pk':
- q = q.annotate(sum=Count('pk'))
+ if stats_sum_variable == "pk":
+ q = q.annotate(sum=Count("pk"))
else:
q = q.annotate(sum=Sum(stats_sum_variable))
data = []
@@ -1365,32 +1438,47 @@ def _get_json_stats(items, stats_sum_variable, stats_modality_1,
if not data or data[-1][0] != modality_1:
data.append([modality_1, []])
data[-1][1].append(
- (_format_modality(values[stats_modality_2]),
- int((values["sum"] or 0) * multiply))
+ (
+ _format_modality(values[stats_modality_2]),
+ int((values["sum"] or 0) * multiply),
+ )
)
else:
q = q.order_by(stats_modality_1)
for values in q.all():
modality_1 = values[stats_modality_1]
- data.append([_format_modality(modality_1),
- int((values["sum"] or 0) * multiply)])
+ data.append(
+ [_format_modality(modality_1), int((values["sum"] or 0) * multiply)]
+ )
data = json.dumps({"data": data})
- return HttpResponse(data, content_type='application/json')
+ return HttpResponse(data, content_type="application/json")
DEFAULT_ROW_NUMBER = 10
# length is used by ajax DataTables requests
-EXCLUDED_FIELDS = ['length']
-BASE_DATED_FIELDS = ['last_modified']
-
-
-def get_item(model, func_name, default_name, extra_request_keys=None,
- base_request=None, bool_fields=None, reversed_bool_fields=None,
- dated_fields=None, associated_models=None,
- relative_session_names=None, specific_perms=None,
- own_table_cols=None, relation_types_prefix=None,
- do_not_deduplicate=False, model_for_perms=None,
- alt_query_own=None, search_form=None):
+EXCLUDED_FIELDS = ["length"]
+BASE_DATED_FIELDS = ["last_modified"]
+
+
+def get_item(
+ model,
+ func_name,
+ default_name,
+ extra_request_keys=None,
+ base_request=None,
+ bool_fields=None,
+ reversed_bool_fields=None,
+ dated_fields=None,
+ associated_models=None,
+ relative_session_names=None,
+ specific_perms=None,
+ own_table_cols=None,
+ relation_types_prefix=None,
+ do_not_deduplicate=False,
+ model_for_perms=None,
+ alt_query_own=None,
+ search_form=None,
+):
"""
Generic treatment of tables
@@ -1415,26 +1503,34 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
:param search_form: associated search form to manage JSON query keys
:return:
"""
- def func(request, data_type='json', full=False, force_own=False,
- col_names=None, no_link=False, no_limit=False, return_query=False,
- **dct):
+
+ def func(
+ request,
+ data_type="json",
+ full=False,
+ force_own=False,
+ col_names=None,
+ no_link=False,
+ no_limit=False,
+ return_query=False,
+ **dct
+ ):
available_perms = []
if specific_perms:
available_perms = specific_perms[:]
- EMPTY = ''
- if 'type' in dct:
- data_type = dct.pop('type')
+ EMPTY = ""
+ if "type" in dct:
+ data_type = dct.pop("type")
if not data_type:
- data_type = 'json'
+ data_type = "json"
if "json" in data_type:
- EMPTY = '[]'
+ EMPTY = "[]"
- if data_type not in ('json', 'csv', 'json-image', 'json-map',
- 'json-stats'):
- return HttpResponse(EMPTY, content_type='text/plain')
+ if data_type not in ("json", "csv", "json-image", "json-map", "json-stats"):
+ return HttpResponse(EMPTY, content_type="text/plain")
- if data_type == 'json-stats' and len(model.STATISTIC_MODALITIES) < 2:
- return HttpResponse(EMPTY, content_type='text/plain')
+ if data_type == "json-stats" and len(model.STATISTIC_MODALITIES) < 2:
+ return HttpResponse(EMPTY, content_type="text/plain")
model_to_check = model
if model_for_perms:
@@ -1443,22 +1539,26 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
if return_query:
allowed, own = True, False
else:
- allowed, own = check_model_access_control(request, model_to_check,
- available_perms)
+ allowed, own = check_model_access_control(
+ request, model_to_check, available_perms
+ )
if not allowed:
- return HttpResponse(EMPTY, content_type='text/plain')
+ return HttpResponse(EMPTY, content_type="text/plain")
if force_own:
own = True
- if full == 'shortcut' and 'SHORTCUT_SEARCH' in request.session and \
- request.session['SHORTCUT_SEARCH'] == 'own':
+ if (
+ full == "shortcut"
+ and "SHORTCUT_SEARCH" in request.session
+ and request.session["SHORTCUT_SEARCH"] == "own"
+ ):
own = True
query_own = None
if own:
q = models.IshtarUser.objects.filter(user_ptr=request.user)
if not q.count():
- return HttpResponse(EMPTY, content_type='text/plain')
+ return HttpResponse(EMPTY, content_type="text/plain")
if alt_query_own:
query_own = getattr(model, alt_query_own)(q.all()[0])
else:
@@ -1466,7 +1566,7 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
query_parameters = {}
- if hasattr(model, 'get_query_parameters'):
+ if hasattr(model, "get_query_parameters"):
query_parameters = model.get_query_parameters()
# get defaults from model
@@ -1476,7 +1576,7 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
my_extra_request_keys[key] = query_parameters[key].search_query
else:
my_extra_request_keys = copy(extra_request_keys or {})
- if base_request is None and hasattr(model, 'BASE_REQUEST'):
+ if base_request is None and hasattr(model, "BASE_REQUEST"):
if callable(model.BASE_REQUEST):
my_base_request = model.BASE_REQUEST(request)
else:
@@ -1485,50 +1585,55 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
my_base_request = copy(base_request)
else:
my_base_request = {}
- if not bool_fields and hasattr(model, 'BOOL_FIELDS'):
+ if not bool_fields and hasattr(model, "BOOL_FIELDS"):
my_bool_fields = model.BOOL_FIELDS[:]
else:
my_bool_fields = bool_fields[:] if bool_fields else []
- if not reversed_bool_fields and hasattr(model, 'REVERSED_BOOL_FIELDS'):
+ if not reversed_bool_fields and hasattr(model, "REVERSED_BOOL_FIELDS"):
my_reversed_bool_fields = model.REVERSED_BOOL_FIELDS[:]
else:
- my_reversed_bool_fields = reversed_bool_fields[:] \
- if reversed_bool_fields else []
+ my_reversed_bool_fields = (
+ reversed_bool_fields[:] if reversed_bool_fields else []
+ )
many_counted_fields = getattr(model, "MANY_COUNTED_FIELDS", None)
reversed_many_counted_fields = getattr(
- model, "REVERSED_MANY_COUNTED_FIELDS", None)
+ model, "REVERSED_MANY_COUNTED_FIELDS", None
+ )
- if not dated_fields and hasattr(model, 'DATED_FIELDS'):
+ if not dated_fields and hasattr(model, "DATED_FIELDS"):
my_dated_fields = model.DATED_FIELDS[:]
else:
my_dated_fields = dated_fields[:] if dated_fields else []
my_dated_fields += BASE_DATED_FIELDS
- if not associated_models and hasattr(model, 'ASSOCIATED_MODELS'):
+ if not associated_models and hasattr(model, "ASSOCIATED_MODELS"):
my_associated_models = model.ASSOCIATED_MODELS[:]
else:
- my_associated_models = associated_models[:] \
- if associated_models else []
- if not relative_session_names and hasattr(model,
- 'RELATIVE_SESSION_NAMES'):
+ my_associated_models = associated_models[:] if associated_models else []
+ if not relative_session_names and hasattr(model, "RELATIVE_SESSION_NAMES"):
my_relative_session_names = model.RELATIVE_SESSION_NAMES[:]
else:
- my_relative_session_names = relative_session_names[:] \
- if relative_session_names else []
- if not relation_types_prefix and hasattr(model,
- 'RELATION_TYPES_PREFIX'):
+ my_relative_session_names = (
+ relative_session_names[:] if relative_session_names else []
+ )
+ if not relation_types_prefix and hasattr(model, "RELATION_TYPES_PREFIX"):
my_relation_types_prefix = copy(model.RELATION_TYPES_PREFIX)
else:
- my_relation_types_prefix = copy(relation_types_prefix) \
- if relation_types_prefix else {}
+ my_relation_types_prefix = (
+ copy(relation_types_prefix) if relation_types_prefix else {}
+ )
fields = [model._meta.get_field(k) for k in get_all_field_names(model)]
- request_keys = dict([
- (field.name,
- field.name + (hasattr(field, 'rel') and field.rel and '__pk'
- or ''))
- for field in fields])
+ request_keys = dict(
+ [
+ (
+ field.name,
+ field.name + (hasattr(field, "rel") and field.rel and "__pk" or ""),
+ )
+ for field in fields
+ ]
+ )
# add keys of associated models to available request key
for associated_model, key in my_associated_models:
@@ -1538,19 +1643,34 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
associated_model = globals()[associated_model]
associated_fields = [
associated_model._meta.get_field(k)
- for k in get_all_field_names(associated_model)]
+ for k in get_all_field_names(associated_model)
+ ]
request_keys.update(
- dict([(key + "__" + field.name,
- key + "__" + field.name +
- (hasattr(field, 'rel') and field.rel and '__pk' or ''))
- for field in associated_fields]))
+ dict(
+ [
+ (
+ key + "__" + field.name,
+ key
+ + "__"
+ + field.name
+ + (hasattr(field, "rel") and field.rel and "__pk" or ""),
+ )
+ for field in associated_fields
+ ]
+ )
+ )
request_keys.update(my_extra_request_keys)
# manage search on json fields and excluded fields
- if search_form and request and request.user and getattr(
- request.user, 'ishtaruser', None):
- available, excluded_fields, json_fields = \
- search_form.check_custom_form(request.user.ishtaruser)
+ if (
+ search_form
+ and request
+ and request.user
+ and getattr(request.user, "ishtaruser", None)
+ ):
+ available, excluded_fields, json_fields = search_form.check_custom_form(
+ request.user.ishtaruser
+ )
# for now no manage on excluded_fields: should we prevent search on
# some fields regarding the user concerned?
if available:
@@ -1561,23 +1681,24 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
if "query" in dct:
request_items = dct["query"]
request_items["submited"] = True
- elif request.method == 'POST':
+ elif request.method == "POST":
request_items = request.POST
else:
request_items = request.GET
- count = dct.get('count', False)
+ count = dct.get("count", False)
# pager
try:
- row_nb = int(request_items.get('length'))
+ row_nb = int(request_items.get("length"))
except (ValueError, TypeError):
row_nb = DEFAULT_ROW_NUMBER
- if data_type == 'json-map': # other limit for map
+ if data_type == "json-map": # other limit for map
row_nb = settings.ISHTAR_MAP_MAX_ITEMS
- if no_limit or (data_type == 'json-map' and
- request_items.get('no_limit', False)):
+ if no_limit or (
+ data_type == "json-map" and request_items.get("no_limit", False)
+ ):
row_nb = None
dct_request_items = {}
@@ -1587,8 +1708,8 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
if k in EXCLUDED_FIELDS:
continue
key = k[:]
- if key.startswith('searchprefix_'):
- key = key[len('searchprefix_'):]
+ if key.startswith("searchprefix_"):
+ key = key[len("searchprefix_") :]
dct_request_items[key] = request_items[k]
request_items = dct_request_items
@@ -1603,19 +1724,19 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
and_reqs, or_reqs = [], []
exc_and_reqs, exc_or_reqs = [], []
distinct_queries = []
- dct['extras'], dct['and_reqs'], dct['exc_and_reqs'] = [], [], []
+ dct["extras"], dct["and_reqs"], dct["exc_and_reqs"] = [], [], []
- if full == 'shortcut':
+ if full == "shortcut":
if model.SLUG == "warehouse":
- key = 'name__icontains'
+ key = "name__icontains"
else:
- key = 'cached_label__icontains'
- dct[key] = request.GET.get('term', None)
+ key = "cached_label__icontains"
+ dct[key] = request.GET.get("term", None)
try:
- old = 'old' in request_items and int(request_items['old'])
+ old = "old" in request_items and int(request_items["old"])
except ValueError:
- return HttpResponse('[]', content_type='text/plain')
+ return HttpResponse("[]", content_type="text/plain")
for k in request_keys:
val = request_items.get(k)
@@ -1644,21 +1765,28 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
and_reqs.append(reqs)
pinned_search = ""
- base_keys = ['extras', 'and_reqs', 'exc_and_reqs']
+ base_keys = ["extras", "and_reqs", "exc_and_reqs"]
if my_base_request:
base_keys += list(my_base_request)
- has_a_search = any(
- k for k in dct.keys() if k not in my_base_request)
+ has_a_search = any(k for k in dct.keys() if k not in my_base_request)
# manage default and pinned search and not bookmark
- if not has_a_search and not request_items.get("search_vector", "") \
- and full != 'shortcut':
- if data_type == 'csv' and func_name in request.session:
+ if (
+ not has_a_search
+ and not request_items.get("search_vector", "")
+ and full != "shortcut"
+ ):
+ if data_type == "csv" and func_name in request.session:
dct = request.session[func_name]
else:
# default search
dct, pinned_search = _manage_default_search(
- dct, request, model, default_name, my_base_request,
- my_relative_session_names)
+ dct,
+ request,
+ model,
+ default_name,
+ my_base_request,
+ my_relative_session_names,
+ )
elif func_name and request:
request.session[func_name] = dct
@@ -1667,16 +1795,20 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
query_parameters[k] = SearchAltName(k, request_keys[k])
dct, excluded_dct, distinct_queries = _search_manage_search_vector(
- model, dct, excluded_dct, distinct_queries, query_parameters,
+ model,
+ dct,
+ excluded_dct,
+ distinct_queries,
+ query_parameters,
)
search_vector = ""
- if 'search_vector' in dct:
- search_vector = dct.pop('search_vector')
+ if "search_vector" in dct:
+ search_vector = dct.pop("search_vector")
# manage relations types
- if 'relation_types' not in my_relation_types_prefix:
- my_relation_types_prefix['relation_types'] = ''
+ if "relation_types" not in my_relation_types_prefix:
+ my_relation_types_prefix["relation_types"] = ""
relation_types = {}
for rtype_key in my_relation_types_prefix:
relation_types[my_relation_types_prefix[rtype_key]] = set()
@@ -1690,18 +1822,20 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
dct.pop(k)
)
- _manage_bool_fields(model, my_bool_fields, my_reversed_bool_fields,
- dct, or_reqs)
- _manage_bool_fields(model, my_bool_fields, my_reversed_bool_fields,
- excluded_dct, exc_or_reqs)
+ _manage_bool_fields(
+ model, my_bool_fields, my_reversed_bool_fields, dct, or_reqs
+ )
+ _manage_bool_fields(
+ model, my_bool_fields, my_reversed_bool_fields, excluded_dct, exc_or_reqs
+ )
tmp_excluded = {}
_manage_many_counted_fields(
- many_counted_fields, reversed_many_counted_fields,
- dct, tmp_excluded)
+ many_counted_fields, reversed_many_counted_fields, dct, tmp_excluded
+ )
_manage_many_counted_fields(
- many_counted_fields, reversed_many_counted_fields,
- excluded_dct, dct)
+ many_counted_fields, reversed_many_counted_fields, excluded_dct, dct
+ )
if tmp_excluded:
excluded_dct.update(tmp_excluded)
@@ -1715,12 +1849,12 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
_manage_facet_search(model, excluded_dct, exc_and_reqs)
extras = []
- if 'extras' in dct:
- extras = dct.pop('extras')
- if 'and_reqs' in dct:
- and_reqs += dct.pop('and_reqs')
- if 'exc_and_reqs' in dct:
- exc_and_reqs += dct.pop('exc_and_reqs')
+ if "extras" in dct:
+ extras = dct.pop("extras")
+ if "and_reqs" in dct:
+ and_reqs += dct.pop("and_reqs")
+ if "exc_and_reqs" in dct:
+ exc_and_reqs += dct.pop("exc_and_reqs")
_manage_clean_search_field(dct, excluded_dct)
_manage_clean_search_field(excluded_dct, dct)
@@ -1729,23 +1863,23 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
exc_query = None
if excluded_dct or exc_and_reqs or exc_or_reqs:
exc_query = _construct_query(
- relation_types, excluded_dct, exc_or_reqs, exc_and_reqs)
+ relation_types, excluded_dct, exc_or_reqs, exc_and_reqs
+ )
if query_own:
query = query & query_own
# manage hierarchic in shortcut menu
- if full == 'shortcut':
+ if full == "shortcut":
ASSOCIATED_ITEMS = {
- Operation: (File, 'associated_file__pk'),
- ContextRecord: (Operation, 'operation__pk'),
- Find: (ContextRecord, 'base_finds__context_record__pk'),
+ Operation: (File, "associated_file__pk"),
+ ContextRecord: (Operation, "operation__pk"),
+ Find: (ContextRecord, "base_finds__context_record__pk"),
}
if model in ASSOCIATED_ITEMS:
upper_model, upper_key = ASSOCIATED_ITEMS[model]
model_name = upper_model.SLUG
- current = model_name in request.session \
- and request.session[model_name]
+ current = model_name in request.session and request.session[model_name]
if current:
dct = {upper_key: current}
query &= Q(**dct)
@@ -1768,7 +1902,7 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
items = items.distinct()
try:
- items_nb = items.values('pk').aggregate(Count('pk'))['pk__count']
+ items_nb = items.values("pk").aggregate(Count("pk"))["pk__count"]
except ProgrammingError:
items_nb = 0
if count:
@@ -1776,21 +1910,27 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
# print(str(items.values("id").query).encode('utf-8'))
if search_vector: # for serialization
- dct['search_vector'] = search_vector
+ dct["search_vector"] = search_vector
# table cols
if own_table_cols:
table_cols = own_table_cols
else:
if full:
- table_cols = [field.name for field in model._meta.fields
- if field.name not in PRIVATE_FIELDS]
- table_cols += [field.name for field in model._meta.many_to_many
- if field.name not in PRIVATE_FIELDS]
- if hasattr(model, 'EXTRA_FULL_FIELDS'):
+ table_cols = [
+ field.name
+ for field in model._meta.fields
+ if field.name not in PRIVATE_FIELDS
+ ]
+ table_cols += [
+ field.name
+ for field in model._meta.many_to_many
+ if field.name not in PRIVATE_FIELDS
+ ]
+ if hasattr(model, "EXTRA_FULL_FIELDS"):
table_cols += model.EXTRA_FULL_FIELDS
else:
- tb_key = (getattr(model, 'SLUG', None), 'TABLE_COLS')
+ tb_key = (getattr(model, "SLUG", None), "TABLE_COLS")
if tb_key in settings.TABLE_COLS:
table_cols = settings.TABLE_COLS[tb_key]
else:
@@ -1803,20 +1943,27 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
elif data_type == "json-stats":
stats_modality_1 = request_items.get("stats_modality_1", None)
stats_modality_2 = request_items.get("stats_modality_2", None)
- if not stats_modality_1 or \
- stats_modality_1 not in model.STATISTIC_MODALITIES:
+ if (
+ not stats_modality_1
+ or stats_modality_1 not in model.STATISTIC_MODALITIES
+ ):
stats_modality_1 = model.STATISTIC_MODALITIES[0]
if stats_modality_2 not in model.STATISTIC_MODALITIES:
stats_modality_2 = None
- stats_sum_variable = request_items.get('stats_sum_variable', None)
+ stats_sum_variable = request_items.get("stats_sum_variable", None)
stats_sum_variable_keys = list(model.STATISTIC_SUM_VARIABLE.keys())
- if not stats_sum_variable or \
- stats_sum_variable not in stats_sum_variable_keys:
+ if (
+ not stats_sum_variable
+ or stats_sum_variable not in stats_sum_variable_keys
+ ):
stats_sum_variable = stats_sum_variable_keys[0]
multiply = model.STATISTIC_SUM_VARIABLE[stats_sum_variable][1]
return _get_json_stats(
- items, stats_sum_variable, stats_modality_1, stats_modality_2,
- multiply=multiply
+ items,
+ stats_sum_variable,
+ stats_modality_1,
+ stats_modality_2,
+ multiply=multiply,
)
query_table_cols = []
@@ -1824,52 +1971,53 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
if type(cols) not in (list, tuple):
cols = [cols]
for col in cols:
- query_table_cols += col.split('|')
+ query_table_cols += col.split("|")
# contextual (full, simple, etc.) col
- contxt = full and 'full' or 'simple'
- if hasattr(model, 'CONTEXTUAL_TABLE_COLS') and \
- contxt in model.CONTEXTUAL_TABLE_COLS:
+ contxt = full and "full" or "simple"
+ if (
+ hasattr(model, "CONTEXTUAL_TABLE_COLS")
+ and contxt in model.CONTEXTUAL_TABLE_COLS
+ ):
for idx, col in enumerate(table_cols):
if col in model.CONTEXTUAL_TABLE_COLS[contxt]:
- query_table_cols[idx] = \
- model.CONTEXTUAL_TABLE_COLS[contxt][col]
+ query_table_cols[idx] = model.CONTEXTUAL_TABLE_COLS[contxt][col]
- if data_type in ('json-image', 'json-map') or full == 'shortcut':
+ if data_type in ("json-image", "json-map") or full == "shortcut":
if model.SLUG == "warehouse":
- query_table_cols.append('name')
- table_cols.append('name')
+ query_table_cols.append("name")
+ table_cols.append("name")
else:
- query_table_cols.append('cached_label')
- table_cols.append('cached_label')
- if data_type == 'json-image':
- query_table_cols.append('main_image__thumbnail')
- table_cols.append('main_image__thumbnail')
- query_table_cols.append('main_image__image')
- table_cols.append('main_image__image')
- elif data_type == 'json-map':
+ query_table_cols.append("cached_label")
+ table_cols.append("cached_label")
+ if data_type == "json-image":
+ query_table_cols.append("main_image__thumbnail")
+ table_cols.append("main_image__thumbnail")
+ query_table_cols.append("main_image__image")
+ table_cols.append("main_image__image")
+ elif data_type == "json-map":
if model.SLUG == "find":
- query_table_cols.append('base_finds__point_2d')
- table_cols.append('base_finds__point_2d')
+ query_table_cols.append("base_finds__point_2d")
+ table_cols.append("base_finds__point_2d")
else:
- query_table_cols.append('point_2d')
- table_cols.append('point_2d')
+ query_table_cols.append("point_2d")
+ table_cols.append("point_2d")
# manage sort tables
manual_sort_key = None
sorts = {}
for k in request_items:
- if not k.startswith('order['):
+ if not k.startswith("order["):
continue
- num = int(k.split(']')[0][len("order["):])
+ num = int(k.split("]")[0][len("order[") :])
if num not in sorts:
- sorts[num] = ['', ''] # sign, col_num
- if k.endswith('[dir]'):
+ sorts[num] = ["", ""] # sign, col_num
+ if k.endswith("[dir]"):
order = request_items[k]
- sign = order and order == 'desc' and "-" or ''
+ sign = order and order == "desc" and "-" or ""
sorts[num][0] = sign
- if k.endswith('[column]'):
+ if k.endswith("[column]"):
sorts[num][1] = request_items[k]
sign = ""
if not sorts and model._meta.ordering:
@@ -1890,14 +2038,16 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
ks = [ks]
for k in ks:
if k.endswith("__pk"):
- k = k[:-len("__pk")] + "__label"
+ k = k[: -len("__pk")] + "__label"
if k.endswith("towns"):
k = k + "__cached_label"
- if k.endswith("__icontains") or \
- k.endswith("__contains") or \
- k.endswith("__iexact") or \
- k.endswith("__exact"):
- k = '__'.join(k.split('__')[:-1])
+ if (
+ k.endswith("__icontains")
+ or k.endswith("__contains")
+ or k.endswith("__iexact")
+ or k.endswith("__exact")
+ ):
+ k = "__".join(k.split("__")[:-1])
# if '__' in k:
# k = k.split('__')[0]
orders.append(signe + k)
@@ -1909,7 +2059,9 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
manual_sort_key = k
logger.warning(
"**WARN get_item - {}**: manual sort key '{}'".format(
- func_name, k))
+ func_name, k
+ )
+ )
break
if not manual_sort_key:
items = items.order_by(*orders)
@@ -1919,14 +2071,14 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
page_nb = 1
if row_nb and data_type.startswith("json"):
try:
- start = int(request_items.get('start'))
+ start = int(request_items.get("start"))
page_nb = start // row_nb + 1
assert page_nb >= 1
except (TypeError, ValueError, AssertionError):
start = 0
page_nb = 1
end = int(page_nb * row_nb)
- if full == 'shortcut':
+ if full == "shortcut":
start = 0
end = 20
@@ -1938,19 +2090,21 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
if old:
items = [item.get_previous(old) for item in items]
- if data_type == 'json-map':
+ if data_type == "json-map":
point_field = query_table_cols.pop()
datas = _get_data_from_query(
- items, query_table_cols, my_extra_request_keys,
- point_field=point_field)
- elif data_type != "csv" and getattr(
- model, "NEW_QUERY_ENGINE", False):
- datas = _get_data_from_query(
- items, query_table_cols, my_extra_request_keys)
+ items, query_table_cols, my_extra_request_keys, point_field=point_field
+ )
+ elif data_type != "csv" and getattr(model, "NEW_QUERY_ENGINE", False):
+ datas = _get_data_from_query(items, query_table_cols, my_extra_request_keys)
else:
datas = _get_data_from_query_old(
- items, query_table_cols, request, my_extra_request_keys,
- do_not_deduplicate)
+ items,
+ query_table_cols,
+ request,
+ my_extra_request_keys,
+ do_not_deduplicate,
+ )
if manual_sort_key:
# +1 because the id is added as a first col
@@ -1959,56 +2113,60 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
idx_col = query_table_cols.index(manual_sort_key) + 1
else:
for idx, col in enumerate(query_table_cols):
- if type(col) in (list, tuple) and \
- manual_sort_key in col:
+ if type(col) in (list, tuple) and manual_sort_key in col:
idx_col = idx + 1
if idx_col is not None:
datas = sorted(datas, key=lambda x: x[idx_col])
- if sign == '-':
+ if sign == "-":
datas = reversed(datas)
datas = list(datas)[start:end]
- link_template = \
- "<a class='display_details' href='#' " \
- "onclick='load_window(\"{}\")'>" \
- "<i class=\"fa fa-info-circle\" aria-hidden=\"true\"></i><lock></a>"
+ link_template = (
+ "<a class='display_details' href='#' "
+ "onclick='load_window(\"{}\")'>"
+ '<i class="fa fa-info-circle" aria-hidden="true"></i><lock></a>'
+ )
link_ext_template = '<a href="{}" target="_blank">{}</a>'
lock = '&nbsp;<i class="fa fa-lock text-danger" aria-hidden="true"></i>'
- own_lock = '&nbsp;<i class="fa fa-lock text-success" ' \
- 'aria-hidden="true"></i>'
+ own_lock = '&nbsp;<i class="fa fa-lock text-success" ' 'aria-hidden="true"></i>'
has_locks = hasattr(model, "locked")
current_user_id = request.user and request.user.id
if data_type.startswith("json"):
rows = []
- if data_type == 'json-map':
+ if data_type == "json-map":
lnk = link_template.format(
- reverse('show-' + default_name, args=[999999, '']),
+ reverse("show-" + default_name, args=[999999, ""]),
)
- lnk = lnk.replace('999999', "<pk>")
+ lnk = lnk.replace("999999", "<pk>")
if not has_locks:
- lnk = lnk.replace('<lock>', "")
+ lnk = lnk.replace("<lock>", "")
data = json.dumps(_format_geojson(datas, lnk))
- return HttpResponse(data, content_type='application/json')
+ return HttpResponse(data, content_type="application/json")
for data in datas:
res = {
- 'id': data[0],
+ "id": data[0],
}
if not no_link:
try:
lnk_template = link_template
lnk = lnk_template.format(
- reverse('show-' + default_name, args=[data[0], '']))
+ reverse("show-" + default_name, args=[data[0], ""])
+ )
if has_locks and data[-2]:
if data[-1] == current_user_id:
- lnk = lnk.replace('<lock>', own_lock)
+ lnk = lnk.replace("<lock>", own_lock)
else:
- lnk = lnk.replace('<lock>', lock)
+ lnk = lnk.replace("<lock>", lock)
else:
- lnk = lnk.replace('<lock>', "")
+ lnk = lnk.replace("<lock>", "")
except NoReverseMatch:
logger.warning(
- '**WARN "show-' + default_name + '" args ('
- + str(data[0]) + ") url not available")
- lnk = ''
+ '**WARN "show-'
+ + default_name
+ + '" args ('
+ + str(data[0])
+ + ") url not available"
+ )
+ lnk = ""
res["link"] = lnk
for idx, value in enumerate(data[1:]):
if not value or idx >= len(table_cols):
@@ -2019,55 +2177,59 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
tab_cols = []
# foreign key may be divided by "." or "__"
for tc in table_col:
- if '.' in tc:
- tab_cols += tc.split('.')
- elif '__' in tc:
- tab_cols += tc.split('__')
+ if "." in tc:
+ tab_cols += tc.split(".")
+ elif "__" in tc:
+ tab_cols += tc.split("__")
else:
tab_cols.append(tc)
k = "__".join(tab_cols)
if k.endswith("__image") or k.endswith("__thumbnail"):
- if not value.startswith(settings.MEDIA_ROOT) and not \
- value.startswith("http://") and not \
- value.startswith("https://"):
+ if (
+ not value.startswith(settings.MEDIA_ROOT)
+ and not value.startswith("http://")
+ and not value.startswith("https://")
+ ):
value = settings.MEDIA_URL + value
- if hasattr(model, 'COL_LINK') and k in model.COL_LINK:
+ if hasattr(model, "COL_LINK") and k in model.COL_LINK:
value = link_ext_template.format(value, value)
if isinstance(value, datetime.date):
- value = value.strftime('%Y-%m-%d')
+ value = value.strftime("%Y-%m-%d")
if isinstance(value, datetime.datetime):
- value = value.strftime('%Y-%m-%d %H:%M:%S')
+ value = value.strftime("%Y-%m-%d %H:%M:%S")
res[k] = value
- if full == 'shortcut':
- if 'cached_label' in res:
- res['value'] = res.pop('cached_label')
- elif 'name' in res:
- res['value'] = res.pop('name')
+ if full == "shortcut":
+ if "cached_label" in res:
+ res["value"] = res.pop("cached_label")
+ elif "name" in res:
+ res["value"] = res.pop("name")
rows.append(res)
- if full == 'shortcut':
+ if full == "shortcut":
data = json.dumps(rows)
else:
total = (
- items_nb // row_nb + (1 if items_nb % row_nb else 0)
- ) if row_nb else items_nb
- data = json.dumps({
- "recordsTotal": items_nb,
- "recordsFiltered": items_nb,
- "rows": rows,
- "table-cols": table_cols,
- "pinned-search": pinned_search,
- "page": page_nb,
- "total": total,
- })
- return HttpResponse(data, content_type='application/json')
+ (items_nb // row_nb + (1 if items_nb % row_nb else 0))
+ if row_nb
+ else items_nb
+ )
+ data = json.dumps(
+ {
+ "recordsTotal": items_nb,
+ "recordsFiltered": items_nb,
+ "rows": rows,
+ "table-cols": table_cols,
+ "pinned-search": pinned_search,
+ "page": page_nb,
+ "total": total,
+ }
+ )
+ return HttpResponse(data, content_type="application/json")
elif data_type == "csv":
- response = HttpResponse(content_type='text/csv', charset=ENCODING)
+ response = HttpResponse(content_type="text/csv", charset=ENCODING)
n = datetime.datetime.now()
- filename = '%s_%s.csv' % (
- default_name, n.strftime('%Y%m%d-%H%M%S'))
- response['Content-Disposition'] = 'attachment; filename=%s' \
- % filename
+ filename = "%s_%s.csv" % (default_name, n.strftime("%Y%m%d-%H%M%S"))
+ response["Content-Disposition"] = "attachment; filename=%s" % filename
writer = csv.writer(response, **CSV_OPTIONS)
if col_names:
col_names = [name for name in col_names]
@@ -2076,8 +2238,7 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
for field_name in table_cols:
if type(field_name) in (list, tuple):
field_name = " & ".join(field_name)
- if hasattr(model, 'COL_LABELS') and \
- field_name in model.COL_LABELS:
+ if hasattr(model, "COL_LABELS") and field_name in model.COL_LABELS:
field = model.COL_LABELS[field_name]
col_names.append(str(field))
continue
@@ -2090,7 +2251,8 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
"**WARN get_item - csv export**: no col name "
"for {}\nadd explicit label to "
"COL_LABELS attribute of "
- "{}".format(field_name, model))
+ "{}".format(field_name, model)
+ )
continue
col_names.append(str(field.verbose_name))
writer.writerow(col_names)
@@ -2102,8 +2264,7 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
break
val = data[1:][idx + delta]
if col_name and "|" in col_name[0]:
- for delta_idx in range(
- len(col_name[0].split('|')) - 1):
+ for delta_idx in range(len(col_name[0].split("|")) - 1):
delta += 1
val += data[1:][idx + delta]
row.append(val)
@@ -2115,10 +2276,9 @@ def get_item(model, func_name, default_name, extra_request_keys=None,
try:
vals.append(v.encode(ENCODING).decode(ENCODING))
except UnicodeEncodeError:
- vals.append(unidecode(v).encode(ENCODING).decode(
- ENCODING))
+ vals.append(unidecode(v).encode(ENCODING).decode(ENCODING))
writer.writerow(vals)
return response
- return HttpResponse('{}', content_type='text/plain')
+ return HttpResponse("{}", content_type="text/plain")
return func