diff options
Diffstat (limited to 'ishtar_common/views_item.py')
-rw-r--r-- | ishtar_common/views_item.py | 1392 |
1 files changed, 776 insertions, 616 deletions
diff --git a/ishtar_common/views_item.py b/ishtar_common/views_item.py index 0b803b00e..31a16b672 100644 --- a/ishtar_common/views_item.py +++ b/ishtar_common/views_item.py @@ -18,55 +18,74 @@ from django.contrib.staticfiles.templatetags.staticfiles import static from django.core.cache import cache from django.core.exceptions import ObjectDoesNotExist from django.core.urlresolvers import reverse, NoReverseMatch -from django.db.models import Q, Count, Sum, ImageField, Func, \ - ExpressionWrapper, FloatField, FileField +from django.db.models import ( + Q, + Count, + Sum, + ImageField, + Func, + ExpressionWrapper, + FloatField, + FileField, +) from django.db.models.fields import FieldDoesNotExist from django.db.utils import ProgrammingError from django.forms.models import model_to_dict from django.http import HttpResponse from django.shortcuts import render from django.template import loader -from django.utils.translation import ugettext, ugettext_lazy as _, \ - activate, deactivate, pgettext_lazy +from django.utils.translation import ( + ugettext, + ugettext_lazy as _, + activate, + deactivate, + pgettext_lazy, +) from tidylib import tidy_document as tidy from unidecode import unidecode from weasyprint import HTML, CSS from weasyprint.fonts import FontConfiguration -from ishtar_common.utils import check_model_access_control, CSV_OPTIONS, \ - get_all_field_names, Round, PRIVATE_FIELDS -from ishtar_common.models import get_current_profile, \ - GeneralType, SearchAltName +from ishtar_common.utils import ( + check_model_access_control, + CSV_OPTIONS, + get_all_field_names, + Round, + PRIVATE_FIELDS, +) +from ishtar_common.models import get_current_profile, GeneralType, SearchAltName from ishtar_common.models_common import HistoryError from .menus import Menu from . import models from archaeological_files.models import File -from archaeological_operations.models import Operation, ArchaeologicalSite, \ - AdministrativeAct +from archaeological_operations.models import ( + Operation, + ArchaeologicalSite, + AdministrativeAct, +) from archaeological_context_records.models import ContextRecord -from archaeological_finds.models import Find, FindBasket, Treatment, \ - TreatmentFile +from archaeological_finds.models import Find, FindBasket, Treatment, TreatmentFile from archaeological_warehouse.models import Warehouse logger = logging.getLogger(__name__) -ENCODING = settings.ENCODING or 'utf-8' +ENCODING = settings.ENCODING or "utf-8" CURRENT_ITEM_KEYS = ( - ('file', File), - ('operation', Operation), - ('site', ArchaeologicalSite), - ('contextrecord', ContextRecord), - ('warehouse', Warehouse), - ('find', Find), - ('treatmentfile', TreatmentFile), - ('treatment', Treatment), - ('administrativeact', AdministrativeAct), - ('administrativeactop', AdministrativeAct), - ('administrativeactfile', AdministrativeAct), - ('administrativeacttreatment', AdministrativeAct), - ('administrativeacttreatmentfile', AdministrativeAct), + ("file", File), + ("operation", Operation), + ("site", ArchaeologicalSite), + ("contextrecord", ContextRecord), + ("warehouse", Warehouse), + ("find", Find), + ("treatmentfile", TreatmentFile), + ("treatment", Treatment), + ("administrativeact", AdministrativeAct), + ("administrativeactop", AdministrativeAct), + ("administrativeactfile", AdministrativeAct), + ("administrativeacttreatment", AdministrativeAct), + ("administrativeacttreatmentfile", AdministrativeAct), ) CURRENT_ITEM_KEYS_DICT = dict(CURRENT_ITEM_KEYS) @@ -74,14 +93,15 @@ CURRENT_ITEM_KEYS_DICT = dict(CURRENT_ITEM_KEYS) def get_autocomplete_queries(request, label_attributes, extra=None): if not label_attributes: return [Q(pk__isnull=True)] - base_q = request.GET.get('term') or "" + base_q = request.GET.get("term") or "" queries = [] - splited_q = base_q.split(' ') + splited_q = base_q.split(" ") for value_prefix, query_suffix, query_endswith in ( - ('', '__startswith', True), # starts with - (' ', '__icontains', True), # contain a word which starts with - ('', '__endswith', False), # ends with - ('', '__icontains', False)): # contains + ("", "__startswith", True), # starts with + (" ", "__icontains", True), # contain a word which starts with + ("", "__endswith", False), # ends with + ("", "__icontains", False), + ): # contains alt_queries = [None] if len(splited_q) == 1 and query_endswith: alt_queries = ["__endswith", None] @@ -92,15 +112,11 @@ def get_autocomplete_queries(request, label_attributes, extra=None): for q in splited_q: if not q: continue - sub_q = Q( - **{label_attributes[0] + query_suffix: value_prefix + q}) + sub_q = Q(**{label_attributes[0] + query_suffix: value_prefix + q}) if alt_query: - sub_q &= Q( - **{label_attributes[0] + alt_query: q} - ) + sub_q &= Q(**{label_attributes[0] + alt_query: q}) for other_label in label_attributes[1:]: - sub_q = sub_q | Q( - **{other_label + query_suffix: value_prefix + q}) + sub_q = sub_q | Q(**{other_label + query_suffix: value_prefix + q}) query = query & sub_q queries.append(query) return queries @@ -112,10 +128,8 @@ def get_autocomplete_item(model, extra=None): def func(request, current_right=None, limit=20): result = OrderedDict() - for query in get_autocomplete_queries(request, ['cached_label'], - extra=extra): - objects = model.objects.filter(query).values( - 'cached_label', 'id')[:limit] + for query in get_autocomplete_queries(request, ["cached_label"], extra=extra): + objects = model.objects.filter(query).values("cached_label", "id")[:limit] for obj in objects: if obj["id"] not in list(result.keys()): result[obj["id"]] = obj["cached_label"] @@ -124,9 +138,11 @@ def get_autocomplete_item(model, extra=None): break if not limit: break - data = json.dumps([{'id': obj[0], 'value': obj[1]} - for obj in list(result.items())]) - return HttpResponse(data, content_type='text/plain') + data = json.dumps( + [{"id": obj[0], "value": obj[1]} for obj in list(result.items())] + ) + return HttpResponse(data, content_type="text/plain") + return func @@ -138,17 +154,20 @@ def check_permission(request, action_slug, obj_id=None): return True if obj_id: return MAIN_MENU.items[action_slug].is_available( - request.user, obj_id, session=request.session) + request.user, obj_id, session=request.session + ) return MAIN_MENU.items[action_slug].can_be_available( - request.user, session=request.session) + request.user, session=request.session + ) -def new_qa_item(model, frm, many=False, - template="ishtar/forms/qa_new_item.html", page_name=""): - def func(request, parent_name, limits=''): +def new_qa_item( + model, frm, many=False, template="ishtar/forms/qa_new_item.html", page_name="" +): + def func(request, parent_name, limits=""): model_name = model._meta.object_name not_permitted_msg = ugettext("Operation not permitted.") - if not check_permission(request, 'add_' + model_name.lower()): + if not check_permission(request, "add_" + model_name.lower()): return HttpResponse(not_permitted_msg) slug = model.SLUG if model.SLUG == "site": @@ -156,29 +175,32 @@ def new_qa_item(model, frm, many=False, url_slug = "new-" + slug current_page_name = page_name[:] if not current_page_name: - current_page_name = _('New %s' % model_name.lower()) - dct = {'page_name': str(current_page_name), - 'url': reverse(url_slug, args=[parent_name]), - 'slug': slug, - 'parent_name': parent_name, - 'many': many} - if request.method == 'POST': - dct['form'] = frm(request.POST, limits=limits) - if dct['form'].is_valid(): - new_item = dct['form'].save(request.user) + current_page_name = _("New %s" % model_name.lower()) + dct = { + "page_name": str(current_page_name), + "url": reverse(url_slug, args=[parent_name]), + "slug": slug, + "parent_name": parent_name, + "many": many, + } + if request.method == "POST": + dct["form"] = frm(request.POST, limits=limits) + if dct["form"].is_valid(): + new_item = dct["form"].save(request.user) lbl = str(new_item) if not lbl and hasattr(new_item, "_generate_cached_label"): lbl = new_item._generate_cached_label() - dct['new_item_label'] = lbl - dct['new_item_pk'] = new_item.pk - dct['parent_pk'] = parent_name - if dct['parent_pk'] and '_select_' in dct['parent_pk']: - parents = dct['parent_pk'].split('_') - dct['parent_pk'] = "_".join([parents[0]] + parents[2:]) + dct["new_item_label"] = lbl + dct["new_item_pk"] = new_item.pk + dct["parent_pk"] = parent_name + if dct["parent_pk"] and "_select_" in dct["parent_pk"]: + parents = dct["parent_pk"].split("_") + dct["parent_pk"] = "_".join([parents[0]] + parents[2:]) return render(request, template, dct) else: - dct['form'] = frm(limits=limits) + dct["form"] = frm(limits=limits) return render(request, template, dct) + return func @@ -186,8 +208,7 @@ def get_short_html_detail(model): def func(request, pk): model_name = model._meta.object_name not_permitted_msg = ugettext("Operation not permitted.") - if not check_permission(request, 'view_' + model_name.lower(), - pk): + if not check_permission(request, "view_" + model_name.lower(), pk): return HttpResponse(not_permitted_msg) try: item = model.objects.get(pk=pk) @@ -195,6 +216,7 @@ def get_short_html_detail(model): return HttpResponse(not_permitted_msg) html = item.get_short_html_detail() return HttpResponse(html) + return func @@ -203,8 +225,7 @@ def modify_qa_item(model, frm): template = "ishtar/forms/qa_new_item.html" model_name = model._meta.object_name not_permitted_msg = ugettext("Operation not permitted.") - if not check_permission(request, 'change_' + model_name.lower(), - pk): + if not check_permission(request, "change_" + model_name.lower(), pk): return HttpResponse(not_permitted_msg) slug = model.SLUG if model.SLUG == "site": @@ -214,43 +235,46 @@ def modify_qa_item(model, frm): except model.DoesNotExist: return HttpResponse(not_permitted_msg) url_slug = "modify-" + slug - dct = {'page_name': str(_('Modify a %s' % model_name.lower())), - 'url': reverse(url_slug, args=[parent_name, pk]), - 'slug': slug, - "modify": True, - 'parent_name': parent_name} - if request.method == 'POST': - dct['form'] = frm(request.POST) - if dct['form'].is_valid(): - new_item = dct['form'].save(request.user, item) + dct = { + "page_name": str(_("Modify a %s" % model_name.lower())), + "url": reverse(url_slug, args=[parent_name, pk]), + "slug": slug, + "modify": True, + "parent_name": parent_name, + } + if request.method == "POST": + dct["form"] = frm(request.POST) + if dct["form"].is_valid(): + new_item = dct["form"].save(request.user, item) lbl = str(new_item) if not lbl and hasattr(new_item, "_generate_cached_label"): lbl = new_item._generate_cached_label() - dct['new_item_label'] = lbl - dct['new_item_pk'] = new_item.pk - dct['parent_pk'] = parent_name - if dct['parent_pk'] and '_select_' in dct['parent_pk']: - parents = dct['parent_pk'].split('_') - dct['parent_pk'] = "_".join([parents[0]] + parents[2:]) + dct["new_item_label"] = lbl + dct["new_item_pk"] = new_item.pk + dct["parent_pk"] = parent_name + if dct["parent_pk"] and "_select_" in dct["parent_pk"]: + parents = dct["parent_pk"].split("_") + dct["parent_pk"] = "_".join([parents[0]] + parents[2:]) return render(request, template, dct) else: data = model_to_dict(item) for k in list(data.keys()): - if data[k] and isinstance(data[k], list) and hasattr( - data[k][0], "pk"): + if data[k] and isinstance(data[k], list) and hasattr(data[k][0], "pk"): data[k] = [i.pk for i in data[k]] - dct['form'] = frm(initial=data) + dct["form"] = frm(initial=data) return render(request, template, dct) + return func def display_item(model, extra_dct=None, show_url=None): def func(request, pk, **dct): if show_url: - dct['show_url'] = "/{}{}/".format(show_url, pk) + dct["show_url"] = "/{}{}/".format(show_url, pk) else: - dct['show_url'] = "/show-{}/{}/".format(model.SLUG, pk) - return render(request, 'ishtar/display_item.html', dct) + dct["show_url"] = "/show-{}/{}/".format(model.SLUG, pk) + return render(request, "ishtar/display_item.html", dct) + return func @@ -261,38 +285,43 @@ def show_item(model, name, extra_dct=None, model_for_perms=None): check_model = model_for_perms allowed, own = check_model_access_control(request, check_model) if not allowed: - return HttpResponse('', content_type="application/xhtml") + return HttpResponse("", content_type="application/xhtml") q = model.objects if own: - if not hasattr(request.user, 'ishtaruser'): - return HttpResponse('') + if not hasattr(request.user, "ishtaruser"): + return HttpResponse("") query_own = model.get_query_owns(request.user.ishtaruser) if query_own: q = q.filter(query_own).distinct() try: item = q.get(pk=pk) except (ObjectDoesNotExist, ValueError): - return HttpResponse('') - doc_type = 'type' in dct and dct.pop('type') - url_name = "/".join(reverse('show-' + name, args=['0', ''] - ).split('/')[:-2]) + "/" + return HttpResponse("") + doc_type = "type" in dct and dct.pop("type") + url_name = ( + "/".join(reverse("show-" + name, args=["0", ""]).split("/")[:-2]) + "/" + ) profile = get_current_profile() - dct['PROFILE'] = profile - dct['CURRENCY'] = profile.currency - dct['ENCODING'] = settings.ENCODING - dct['DOT_GENERATION'] = settings.DOT_BINARY and profile.relation_graph - dct['current_window_url'] = url_name + dct["PROFILE"] = profile + dct["CURRENCY"] = profile.currency + dct["ENCODING"] = settings.ENCODING + dct["DOT_GENERATION"] = settings.DOT_BINARY and profile.relation_graph + dct["current_window_url"] = url_name date = None - if 'date' in dct: - date = dct.pop('date') - dct['sheet_id'] = "%s-%d" % (name, item.pk) - dct['window_id'] = "%s-%d-%s" % ( - name, item.pk, datetime.datetime.now().strftime('%M%s')) + if "date" in dct: + date = dct.pop("date") + dct["sheet_id"] = "%s-%d" % (name, item.pk) + dct["window_id"] = "%s-%d-%s" % ( + name, + item.pk, + datetime.datetime.now().strftime("%M%s"), + ) # list current perms - if hasattr(request.user, 'ishtaruser') and request.user.ishtaruser: + if hasattr(request.user, "ishtaruser") and request.user.ishtaruser: cache_key = "{}-{}-{}".format( - settings.PROJECT_SLUG, "current-perms", + settings.PROJECT_SLUG, + "current-perms", request.session.session_key, ) permissions = cache.get(cache_key) @@ -308,80 +337,96 @@ def show_item(model, name, extra_dct=None, model_for_perms=None): for perm in permissions: dct["permission_" + perm] = True - if hasattr(item, 'history') and request.user.is_superuser: + if hasattr(item, "history") and request.user.is_superuser: if date: try: - date = datetime.datetime.strptime(date, - '%Y-%m-%dT%H:%M:%S.%f') + date = datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%f") if item.get_last_history_date() != date: item = item.get_previous(date=date) assert item is not None - dct['previous'] = item._previous - dct['next'] = item._next + dct["previous"] = item._previous + dct["next"] = item._next else: date = None except (ValueError, AssertionError): - return HttpResponse('', content_type='text/plain') + return HttpResponse("", content_type="text/plain") if not date: historized = item.history.all() if historized: item.history_date = historized[0].history_date if len(historized) > 1: - dct['previous'] = historized[1].history_date - if doc_type in ("odt", "pdf") and hasattr(item, 'qrcode') \ - and (not item.qrcode or not item.qrcode.name): + dct["previous"] = historized[1].history_date + if ( + doc_type in ("odt", "pdf") + and hasattr(item, "qrcode") + and (not item.qrcode or not item.qrcode.name) + ): item.generate_qrcode(request=request) - dct['item'], dct['item_name'] = item, name + dct["item"], dct["item_name"] = item, name # add context if extra_dct: dct.update(extra_dct(request, item)) context_instance = deepcopy(dct) - context_instance['output'] = 'html' - if hasattr(item, 'history_object'): + context_instance["output"] = "html" + if hasattr(item, "history_object"): filename = item.history_object.associated_filename else: filename = item.associated_filename if doc_type == "odt" and settings.ODT_TEMPLATE: - tpl = loader.get_template('ishtar/sheet_%s.html' % name) - context_instance['output'] = 'ODT' + tpl = loader.get_template("ishtar/sheet_%s.html" % name) + context_instance["output"] = "ODT" content = tpl.render(context_instance, request) - tidy_options = {'output-xhtml': 1, 'indent': 1, - 'tidy-mark': 0, 'doctype': 'auto', - 'add-xml-decl': 1, 'wrap': 1} + tidy_options = { + "output-xhtml": 1, + "indent": 1, + "tidy-mark": 0, + "doctype": "auto", + "add-xml-decl": 1, + "wrap": 1, + } html, errors = tidy(content, options=tidy_options) html = html.replace(" ", " ") - html = re.sub('<pre([^>]*)>\n', '<pre\\1>', html) + html = re.sub("<pre([^>]*)>\n", "<pre\\1>", html) odt = NamedTemporaryFile() html_source = NamedTemporaryFile() - with open(html_source.name, 'w') as html_file: + with open(html_source.name, "w") as html_file: html_file.write(html) - pandoc_args = ["pandoc", "-f", "html", "-t", "odt", - "-o", odt.name, html_source.name] + pandoc_args = [ + "pandoc", + "-f", + "html", + "-t", + "odt", + "-o", + odt.name, + html_source.name, + ] try: - subprocess.check_call(pandoc_args, stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL) + subprocess.check_call( + pandoc_args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL + ) except subprocess.CalledProcessError: - return HttpResponse(content, - content_type="application/xhtml") + return HttpResponse(content, content_type="application/xhtml") response = HttpResponse( - content_type='application/vnd.oasis.opendocument.text') - response['Content-Disposition'] = \ - 'attachment; filename={}.odt'.format(filename) - with open(odt.name, 'rb') as odt_file: + content_type="application/vnd.oasis.opendocument.text" + ) + response["Content-Disposition"] = "attachment; filename={}.odt".format( + filename + ) + with open(odt.name, "rb") as odt_file: response.write(odt_file.read()) return response - elif doc_type == 'pdf': - base_url = "/".join( - request.build_absolute_uri().split("/")[0:3] - ) + elif doc_type == "pdf": + base_url = "/".join(request.build_absolute_uri().split("/")[0:3]) - tpl = loader.get_template('ishtar/sheet_%s_pdf.html' % name) - context_instance['output'] = 'PDF' + tpl = loader.get_template("ishtar/sheet_%s_pdf.html" % name) + context_instance["output"] = "PDF" html = tpl.render(context_instance, request) font_config = FontConfiguration() - css = CSS(string=''' + css = CSS( + string=""" @font-face { font-family: Gentium; src: url(%s); @@ -389,20 +434,21 @@ def show_item(model, name, extra_dct=None, model_for_perms=None): body{ font-family: Gentium } - ''' % (base_url + static("gentium/GentiumPlus-R.ttf"))) - css2 = CSS(filename=settings.STATIC_ROOT + '/media/style_basic.css') - pdf = HTML( - string=html, base_url=base_url - ).write_pdf( - stylesheets=[css, css2], font_config=font_config) - response = HttpResponse(pdf, content_type='application/pdf') - response['Content-Disposition'] = 'attachment; filename=%s.pdf' % \ - filename + """ + % (base_url + static("gentium/GentiumPlus-R.ttf")) + ) + css2 = CSS(filename=settings.STATIC_ROOT + "/media/style_basic.css") + pdf = HTML(string=html, base_url=base_url).write_pdf( + stylesheets=[css, css2], font_config=font_config + ) + response = HttpResponse(pdf, content_type="application/pdf") + response["Content-Disposition"] = "attachment; filename=%s.pdf" % filename return response else: - tpl = loader.get_template('ishtar/sheet_%s_window.html' % name) + tpl = loader.get_template("ishtar/sheet_%s_window.html" % name) content = tpl.render(context_instance, request) return HttpResponse(content, content_type="application/xhtml") + return func @@ -410,21 +456,29 @@ def revert_item(model): def func(request, pk, date, **dct): try: item = model.objects.get(pk=pk) - date = datetime.datetime.strptime(date, '%Y-%m-%dT%H:%M:%S.%f') + date = datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%f") item.rollback(date) except (ObjectDoesNotExist, ValueError, HistoryError): - return HttpResponse(None, content_type='text/plain') - return HttpResponse("True", content_type='text/plain') + return HttpResponse(None, content_type="text/plain") + return HttpResponse("True", content_type="text/plain") + return func HIERARCHIC_LEVELS = 5 -HIERARCHIC_FIELDS = ['periods', 'period', 'unit', 'material_types', - 'material_type', 'conservatory_state', 'object_types'] +HIERARCHIC_FIELDS = [ + "periods", + "period", + "unit", + "material_types", + "material_type", + "conservatory_state", + "object_types", +] def _get_values(request, val): - if hasattr(val, 'all'): # manage related objects + if hasattr(val, "all"): # manage related objects vals = list(val.all()) else: vals = [val] @@ -433,9 +487,13 @@ def _get_values(request, val): if callable(v): v = v() try: - if hasattr(v, 'url') and v.url: - v = (request.is_secure() and - 'https' or 'http') + '://' + request.get_host() + v.url + if hasattr(v, "url") and v.url: + v = ( + (request.is_secure() and "https" or "http") + + "://" + + request.get_host() + + v.url + ) except ValueError: pass new_vals.append(v) @@ -453,8 +511,11 @@ def _push_to_list(obj, current_group, depth): except IndexError: # tolerant to parentheses mismatch pass - if current_group and type(obj) in (str, str) and \ - type(current_group[-1]) in (str, str): + if ( + current_group + and type(obj) in (str, str) + and type(current_group[-1]) in (str, str) + ): current_group[-1] += obj else: current_group.append(obj) @@ -488,10 +549,10 @@ def _parse_parentheses(s): if char == '"': inside_quote = not inside_quote if not inside_quote: - if char == '(': + if char == "(": _push_to_list([], groups, depth) depth += 1 - elif char == ')': + elif char == ")": if depth > 0: depth -= 1 else: @@ -507,8 +568,9 @@ RESERVED_CHAR = ["|", "&"] RE_FACET = re.compile('([-a-zA-Z]+)="([^"]+)"(?:;"([^"]+)")*') -def _parse_query_string(string, query_parameters, current_dct, exc_dct, - extra_distinct_q): +def _parse_query_string( + string, query_parameters, current_dct, exc_dct, extra_distinct_q +): string = string.strip().lower() match = RE_FACET.search(string) @@ -541,21 +603,20 @@ def _parse_query_string(string, query_parameters, current_dct, exc_dct, is_true = not is_true cfltr, cexclude, cextra = term(is_true=is_true) if cfltr: - if 'and_reqs' not in dct: - dct['and_reqs'] = [] - dct['and_reqs'].append(cfltr) + if "and_reqs" not in dct: + dct["and_reqs"] = [] + dct["and_reqs"].append(cfltr) if cexclude: - if 'exc_and_reqs' not in dct: - dct['exc_and_reqs'] = [] - dct['exc_and_reqs'].append(cexclude) + if "exc_and_reqs" not in dct: + dct["exc_and_reqs"] = [] + dct["exc_and_reqs"].append(cexclude) if cextra: - dct['extras'].append(cextra) + dct["extras"].append(cextra) else: if query_parameters[base_term].distinct_query: extra_distinct_q.append({}) dct = extra_distinct_q[-1] - if not query_parameters[base_term].distinct_query and \ - excluded: + if not query_parameters[base_term].distinct_query and excluded: dct = exc_dct if query_parameters[base_term].extra_query: dct.update(query_parameters[base_term].extra_query) @@ -565,11 +626,10 @@ def _parse_query_string(string, query_parameters, current_dct, exc_dct, dct[term] = query if query_parameters[base_term].distinct_query: for k in dct: # clean " - dct[k] = dct[k].replace('"', '') + dct[k] = dct[k].replace('"', "") # distinct query wait for a query _manage_clean_search_field(dct) - extra_distinct_q[-1] = \ - ~Q(**dct) if excluded else Q(**dct) + extra_distinct_q[-1] = ~Q(**dct) if excluded else Q(**dct) return "" for reserved_char in FORBIDDEN_CHAR: string = string.replace(reserved_char, "") @@ -578,22 +638,23 @@ def _parse_query_string(string, query_parameters, current_dct, exc_dct, string = string.replace(reserved_char, "") if not string: return "" - if string.endswith('*'): + if string.endswith("*"): if len(string.strip()) == 1: return "" - string = string[:-1] + ':*' + string = string[:-1] + ":*" elif string not in ("&", "|", "!", "-"): # like search by default - string = string + ':*' - if string.startswith('-'): + string = string + ":*" + if string.startswith("-"): if len(string.strip()) == 1: return "" string = "!" + string[1:] return string -def _parse_parentheses_groups(groups, query_parameters, current_dct=None, - exc_dct=None, extra_distinct_q=None): +def _parse_parentheses_groups( + groups, query_parameters, current_dct=None, exc_dct=None, extra_distinct_q=None +): """ Transform parentheses groups to query @@ -612,8 +673,7 @@ def _parse_parentheses_groups(groups, query_parameters, current_dct=None, extra_distinct_q = [] if type(groups) is not list: string = groups.strip() - if string.startswith('"') and string.endswith('"') and \ - string.count('"') == 2: + if string.startswith('"') and string.endswith('"') and string.count('"') == 2: string = string[1:-1] # split into many groups if spaces @@ -624,9 +684,11 @@ def _parse_parentheses_groups(groups, query_parameters, current_dct=None, previous_quote = None while found != -1: if previous_quote is not None: - string = string[0:previous_quote] + \ - string[previous_quote:found].replace(' ', SEP) + \ - string[found:] + string = ( + string[0:previous_quote] + + string[previous_quote:found].replace(" ", SEP) + + string[found:] + ) previous_quote = None # SEP is larger than a space found = string.find('"', current_index) @@ -637,20 +699,29 @@ def _parse_parentheses_groups(groups, query_parameters, current_dct=None, string_groups = [gp.replace(SEP, " ") for gp in string.split(" ")] if len(string_groups) == 1: - return _parse_query_string( - string_groups[0], query_parameters, current_dct, exc_dct, - extra_distinct_q - ), current_dct, exc_dct, extra_distinct_q + return ( + _parse_query_string( + string_groups[0], + query_parameters, + current_dct, + exc_dct, + extra_distinct_q, + ), + current_dct, + exc_dct, + extra_distinct_q, + ) return _parse_parentheses_groups( - string_groups, query_parameters, current_dct, exc_dct, - extra_distinct_q) + string_groups, query_parameters, current_dct, exc_dct, extra_distinct_q + ) if not groups: # empty list return "", current_dct, exc_dct, extra_distinct_q query = "(" previous_sep, has_item = None, False for item in groups: q, current_dct, exc_dct, extra_distinct_q = _parse_parentheses_groups( - item, query_parameters, current_dct, exc_dct, extra_distinct_q) + item, query_parameters, current_dct, exc_dct, extra_distinct_q + ) q = q.strip() if not q: continue @@ -671,16 +742,22 @@ def _parse_parentheses_groups(groups, query_parameters, current_dct=None, return query, current_dct, exc_dct, extra_distinct_q -def _search_manage_search_vector(model, dct, exc_dct, distinct_queries, - query_parameters): - if 'search_vector' not in dct \ - or not model._meta.managed: # is a view - no search_vector +def _search_manage_search_vector( + model, dct, exc_dct, distinct_queries, query_parameters +): + if ( + "search_vector" not in dct or not model._meta.managed + ): # is a view - no search_vector return dct, exc_dct, distinct_queries - search_vector = dct['search_vector'] + search_vector = dct["search_vector"] parentheses_groups = _parse_parentheses(search_vector) - search_query, extra_dct, extra_exc_dct, extra_distinct_q = \ - _parse_parentheses_groups(parentheses_groups, query_parameters) + ( + search_query, + extra_dct, + extra_exc_dct, + extra_distinct_q, + ) = _parse_parentheses_groups(parentheses_groups, query_parameters) dct.update(extra_dct) distinct_queries += extra_distinct_q @@ -688,15 +765,18 @@ def _search_manage_search_vector(model, dct, exc_dct, distinct_queries, if not search_query: return dct, exc_dct, distinct_queries # remove inside parenthesis - search_query = search_query.replace('(', '').replace(')', '').strip() + search_query = search_query.replace("(", "").replace(")", "").strip() if search_query: - if 'extras' not in dct: - dct['extras'] = [] - dct['extras'].append( - {'where': [model._meta.db_table + - ".search_vector @@ (to_tsquery(%s, %s)) = true"], - 'params': [settings.ISHTAR_SEARCH_LANGUAGE, - search_query]} + if "extras" not in dct: + dct["extras"] = [] + dct["extras"].append( + { + "where": [ + model._meta.db_table + + ".search_vector @@ (to_tsquery(%s, %s)) = true" + ], + "params": [settings.ISHTAR_SEARCH_LANGUAGE, search_query], + } ) return dct, exc_dct, distinct_queries @@ -709,7 +789,7 @@ def _manage_bool_fields(model, bool_fields, reversed_bool_fields, dct, or_reqs): elif dct[k] == "1": dct.pop(k) continue - dct[k] = dct[k].replace('"', '') + dct[k] = dct[k].replace('"', "") if dct[k] in ["2", "yes", str(_("Yes")).lower(), "True"]: dct[k] = True else: @@ -717,7 +797,7 @@ def _manage_bool_fields(model, bool_fields, reversed_bool_fields, dct, or_reqs): if k in reversed_bool_fields: dct[k] = not dct[k] # check also for empty value with image field - field_names = k.split('__') + field_names = k.split("__") # TODO: can be improved in later version of Django try: c_field = model._meta.get_field(field_names[0]) @@ -725,15 +805,15 @@ def _manage_bool_fields(model, bool_fields, reversed_bool_fields, dct, or_reqs): if not hasattr(c_field, "related_model"): return c_field = c_field.related_model._meta.get_field(field_name) - if k.endswith('__isnull') and \ - (isinstance(c_field, (ImageField, FileField)) - or field_names[-2] == "associated_url"): - key = "__".join(k.split('__')[:-1]) + if k.endswith("__isnull") and ( + isinstance(c_field, (ImageField, FileField)) + or field_names[-2] == "associated_url" + ): + key = "__".join(k.split("__")[:-1]) if dct[k]: - or_reqs.append( - (k, {key + '__exact': ''})) + or_reqs.append((k, {key + "__exact": ""})) else: - dct[key + '__regex'] = '.{1}.*' + dct[key + "__regex"] = ".{1}.*" except FieldDoesNotExist: pass @@ -746,7 +826,7 @@ def _manage_many_counted_fields(fields, reversed_fields, dct, excluded_dct): elif dct[k] == "1": dct.pop(k) continue - dct[k] = dct[k].replace('"', '') + dct[k] = dct[k].replace('"', "") dct[k] = True if dct[k] in ["2", "yes", str(_("Yes")).lower()] else None if reversed_fields and k in reversed_fields: dct[k] = True if not dct[k] else None @@ -758,7 +838,7 @@ def _manage_many_counted_fields(fields, reversed_fields, dct, excluded_dct): today_lbl = pgettext_lazy("key for text search", "today") -TODAYS = ['today'] +TODAYS = ["today"] for language_code, language_lbl in settings.LANGUAGES: activate(language_code) @@ -776,12 +856,12 @@ def _manage_dated_fields(dated_fields, dct): if not dct[k]: dct.pop(k) continue - value = dct[k].replace('"', '').strip() + value = dct[k].replace('"', "").strip() has_today = False for today in TODAYS: if value.startswith(today): base_date = datetime.date.today() - value = value[len(today):].replace(' ', '') + value = value[len(today) :].replace(" ", "") if value and value[0] in ("-", "+"): sign = value[0] try: @@ -790,27 +870,26 @@ def _manage_dated_fields(dated_fields, dct): days = 0 if days: if sign == "-": - base_date = base_date - datetime.timedelta( - days=days) + base_date = base_date - datetime.timedelta(days=days) else: - base_date = base_date + datetime.timedelta( - days=days) - dct[k] = base_date.strftime('%Y-%m-%d') + base_date = base_date + datetime.timedelta(days=days) + dct[k] = base_date.strftime("%Y-%m-%d") has_today = True break if has_today: continue items = [] if "/" in value: - items = list(reversed(value.split('/'))) + items = list(reversed(value.split("/"))) elif "-" in value: # already date formated - items = value.split('-') + items = value.split("-") if len(items) != 3: dct.pop(k) return try: - dct[k] = datetime.datetime( - *map(lambda x: int(x), items)).strftime('%Y-%m-%d') + dct[k] = datetime.datetime(*map(lambda x: int(x), items)).strftime( + "%Y-%m-%d" + ) except ValueError: dct.pop(k) @@ -838,8 +917,7 @@ def _manage_facet_search(model, dct, and_reqs): k = base_k else: k = base_k + "__pk" - if k not in dct or not dct[k].startswith('"') \ - or not dct[k].startswith('"'): + if k not in dct or not dct[k].startswith('"') or not dct[k].startswith('"'): continue val = _clean_type_val(dct.pop(k)) if '";"' in val: @@ -858,9 +936,12 @@ def _manage_facet_search(model, dct, and_reqs): lbl_name = "__cached_label__" except: pass - suffix = "{}icontains".format(lbl_name) if "%" in val else \ - "{}iexact".format(lbl_name) - query = val[1:-1].replace('*', "") + suffix = ( + "{}icontains".format(lbl_name) + if "%" in val + else "{}iexact".format(lbl_name) + ) + query = val[1:-1].replace("*", "") if not reqs: reqs = Q(**{base_k + suffix: query}) else: @@ -868,13 +949,12 @@ def _manage_facet_search(model, dct, and_reqs): if reqs: and_reqs.append(reqs) - POST_PROCESS_REQUEST = getattr(model, 'POST_PROCESS_REQUEST', None) + POST_PROCESS_REQUEST = getattr(model, "POST_PROCESS_REQUEST", None) if not POST_PROCESS_REQUEST: return for k in dct: if k in POST_PROCESS_REQUEST and dct[k]: - dct[k] = getattr(model, POST_PROCESS_REQUEST[k])( - dct[k].replace('"', '')) + dct[k] = getattr(model, POST_PROCESS_REQUEST[k])(dct[k].replace('"', "")) def _manage_hierarchic_fields(model, dct, and_reqs): @@ -885,12 +965,11 @@ def _manage_hierarchic_fields(model, dct, and_reqs): if type(reqs) not in (list, tuple): reqs = [reqs] for req in reqs: - if req.endswith('areas__pk') \ - or req.endswith('areas__label__iexact'): - if req.endswith('pk'): - suffix = 'pk' - elif req.endswith('label__iexact'): - suffix = 'label__iexact' + if req.endswith("areas__pk") or req.endswith("areas__label__iexact"): + if req.endswith("pk"): + suffix = "pk" + elif req.endswith("label__iexact"): + suffix = "label__iexact" else: continue @@ -900,38 +979,41 @@ def _manage_hierarchic_fields(model, dct, and_reqs): reqs = Q(**{req: val}) for idx in range(HIERARCHIC_LEVELS): - req = req[:-(len(suffix))] + 'parent__' + suffix + req = req[: -(len(suffix))] + "parent__" + suffix q = Q(**{req: val}) reqs |= q and_reqs.append(reqs) # TODO: improve query with "IN ()"? continue - if req.endswith('town__pk') or req.endswith('towns__pk') \ - or req.endswith('town__cached_label__iexact') \ - or req.endswith('towns__cached_label__iexact'): - - if req.endswith('pk'): - suffix = 'pk' - elif req.endswith('cached_label__iexact'): - suffix = 'cached_label__iexact' + if ( + req.endswith("town__pk") + or req.endswith("towns__pk") + or req.endswith("town__cached_label__iexact") + or req.endswith("towns__cached_label__iexact") + ): + + if req.endswith("pk"): + suffix = "pk" + elif req.endswith("cached_label__iexact"): + suffix = "cached_label__iexact" else: continue val = _clean_type_val(dct.pop(req)).strip('"') if val.startswith('"') and val.endswith('"'): val = val[1:-1] - vals = [v.replace('"', '') for v in val.split(';')] + vals = [v.replace('"', "") for v in val.split(";")] main_req = None for val in vals: reqs = Q(**{req: val}) nreq = base_req = req[:] for idx in range(HIERARCHIC_LEVELS): - nreq = nreq[:-(len(suffix))] + 'parents__' + suffix + nreq = nreq[: -(len(suffix))] + "parents__" + suffix q = Q(**{nreq: val}) reqs |= q nreq = base_req[:] for idx in range(HIERARCHIC_LEVELS): - nreq = nreq[:-(len(suffix))] + 'children__' + suffix + nreq = nreq[: -(len(suffix))] + "children__" + suffix q = Q(**{nreq: val}) reqs |= q if not main_req: @@ -946,8 +1028,7 @@ def _manage_hierarchic_fields(model, dct, and_reqs): lbl_name = "label" try: rel = getattr(model, k_hr).field.related_model - if not hasattr(rel, "label") and hasattr(rel, - "cached_label"): + if not hasattr(rel, "label") and hasattr(rel, "cached_label"): lbl_name = "cached_label" except: pass @@ -963,8 +1044,9 @@ def _manage_hierarchic_fields(model, dct, and_reqs): q |= Q(**{r: val}) and_reqs.append(q) break - elif req.endswith(k_hr + '__pk') \ - or req.endswith(k_hr + '__{}__iexact'.format(lbl_name)): + elif req.endswith(k_hr + "__pk") or req.endswith( + k_hr + "__{}__iexact".format(lbl_name) + ): val = _clean_type_val(dct.pop(req)) if '";"' in val: @@ -974,9 +1056,9 @@ def _manage_hierarchic_fields(model, dct, and_reqs): values = [val] base_req = req[:] reqs = None - if req.endswith('pk'): + if req.endswith("pk"): base_suffix = "pk" - elif req.endswith('{}__iexact'.format(lbl_name)): + elif req.endswith("{}__iexact".format(lbl_name)): base_suffix = lbl_name + "__iexact" else: continue @@ -988,17 +1070,17 @@ def _manage_hierarchic_fields(model, dct, and_reqs): # manage search text by label if "*" in val: suffix = lbl_name + "__icontains" - val = val.replace('*', "") + val = val.replace("*", "") else: suffix = lbl_name + "__iexact" - req = req[:-(len(base_suffix))] + suffix + req = req[: -(len(base_suffix))] + suffix if not reqs: reqs = Q(**{req: val}) else: reqs |= Q(**{req: val}) for idx in range(HIERARCHIC_LEVELS): - req = req[:-(len(suffix))] + 'parent__' + suffix + req = req[: -(len(suffix))] + "parent__" + suffix q = Q(**{req: val}) reqs |= q # TODO: improve query with "IN ()"? @@ -1012,20 +1094,20 @@ def _manage_clean_search_field(dct, exclude=None): # clean quoted search field if type(dct[k]) != str: continue - dct[k] = dct[k].replace('"', '') + dct[k] = dct[k].replace('"', "") dct[k] = _clean_type_val(dct[k]) - if '*' not in dct[k] or not k.endswith('__iexact'): + if "*" not in dct[k] or not k.endswith("__iexact"): continue value = dct.pop(k).strip() if value.startswith("*"): value = value[1:] if value.endswith("*"): value = value[:-1] - base_key = k[:-len('__iexact')] + base_key = k[: -len("__iexact")] if value: - dct[base_key + '__icontains'] = value + dct[base_key + "__icontains"] = value elif exclude is not None: - exclude[base_key + '__exact'] = "" + exclude[base_key + "__exact"] = "" def _manage_relation_types(relation_types, dct, query, or_reqs): @@ -1033,11 +1115,12 @@ def _manage_relation_types(relation_types, dct, query, or_reqs): vals = relation_types[rtype_prefix] if not vals: continue - vals = list(vals)[0].split(';') + vals = list(vals)[0].split(";") for v in vals: alt_dct = { - rtype_prefix + 'right_relations__relation_type__label__iexact': - v.replace('"', '')} + rtype_prefix + + "right_relations__relation_type__label__iexact": v.replace('"', "") + } for k in dct: val = dct[k] if rtype_prefix: @@ -1046,12 +1129,13 @@ def _manage_relation_types(relation_types, dct, query, or_reqs): continue # tricky: reconstruct the key to make sense - remove the # prefix from the key - k = k[0:k.index(rtype_prefix)] + \ - k[k.index(rtype_prefix) + len(rtype_prefix):] - if k.endswith('year'): - k += '__exact' - alt_dct[rtype_prefix + 'right_relations__right_record__' + k] = \ - val + k = ( + k[0 : k.index(rtype_prefix)] + + k[k.index(rtype_prefix) + len(rtype_prefix) :] + ) + if k.endswith("year"): + k += "__exact" + alt_dct[rtype_prefix + "right_relations__right_record__" + k] = val if not dct: # fake condition to trick Django (1.4): without it only the # alt_dct is managed @@ -1062,11 +1146,9 @@ def _manage_relation_types(relation_types, dct, query, or_reqs): altor_dct.pop(k) for j in or_req: val = or_req[j] - if j == 'year': - j = 'year__exact' - altor_dct[ - rtype_prefix + 'right_relations__right_record__' + j] = \ - val + if j == "year": + j = "year__exact" + altor_dct[rtype_prefix + "right_relations__right_record__" + j] = val query |= Q(**altor_dct) return query @@ -1076,7 +1158,7 @@ def _construct_query(relation_types, dct, or_reqs, and_reqs): # manage multi value not already managed for key in list(dct.keys()): if type(dct[key]) == str and ";" in dct[key]: - values = [v for v in dct[key].split(';') if v] + values = [v for v in dct[key].split(";") if v] if not values: dct.pop(key) continue @@ -1084,9 +1166,7 @@ def _construct_query(relation_types, dct, or_reqs, and_reqs): if len(values) == 1: continue for v in values[1:]: - or_reqs.append( - (key, {key: v}) - ) + or_reqs.append((key, {key: v})) for k in list(dct.keys()): if type(k) not in (list, tuple): @@ -1116,44 +1196,45 @@ def _construct_query(relation_types, dct, or_reqs, and_reqs): return query -def _manage_default_search(dct, request, model, default_name, my_base_request, - my_relative_session_names): +def _manage_default_search( + dct, request, model, default_name, my_base_request, my_relative_session_names +): pinned_search = "" pin_key = "pin-search-" + default_name - if pin_key in request.session and \ - request.session[pin_key]: # a search is pinned + if pin_key in request.session and request.session[pin_key]: # a search is pinned pinned_search = request.session[pin_key] - dct = {'search_vector': request.session[pin_key]} - elif default_name in request.session and \ - request.session[default_name]: # an item is pinned + dct = {"search_vector": request.session[pin_key]} + elif ( + default_name in request.session and request.session[default_name] + ): # an item is pinned value = request.session[default_name] - if 'basket-' in value: + if "basket-" in value: try: - dct = { - "basket__pk": request.session[default_name].split('-')[-1]} - pinned_search = str(FindBasket.objects.get( - pk=dct["basket__pk"])) + dct = {"basket__pk": request.session[default_name].split("-")[-1]} + pinned_search = str(FindBasket.objects.get(pk=dct["basket__pk"])) except FindBasket.DoesNotExist: pass else: try: dct = {"pk": request.session[default_name]} - pinned_search = '"{}"'.format( - model.objects.get(pk=dct["pk"]) - ) + pinned_search = '"{}"'.format(model.objects.get(pk=dct["pk"])) except model.DoesNotExist: pass elif dct == (my_base_request or {}): - if not hasattr(model, 'UP_MODEL_QUERY'): + if not hasattr(model, "UP_MODEL_QUERY"): logger.warning( "**WARN get_item**: - UP_MODEL_QUERY not defined for " - "'{}'".format(model)) + "'{}'".format(model) + ) else: # a parent item may be selected in the default menu for name, key in my_relative_session_names: - if name in request.session and request.session[name] \ - and 'basket-' not in request.session[name] \ - and name in CURRENT_ITEM_KEYS_DICT: + if ( + name in request.session + and request.session[name] + and "basket-" not in request.session[name] + and name in CURRENT_ITEM_KEYS_DICT + ): up_model = CURRENT_ITEM_KEYS_DICT[name] try: dct.update({key: request.session[name]}) @@ -1161,15 +1242,12 @@ def _manage_default_search(dct, request, model, default_name, my_base_request, if up_item.SLUG not in model.UP_MODEL_QUERY: logger.warning( "**WARN get_item**: - {} not in " - "UP_MODEL_QUERY for {}'".format( - up_item.SLUG, - model)) + "UP_MODEL_QUERY for {}'".format(up_item.SLUG, model) + ) else: - req_key, up_attr = model.UP_MODEL_QUERY[ - up_item.SLUG] + req_key, up_attr = model.UP_MODEL_QUERY[up_item.SLUG] pinned_search = '{}="{}"'.format( - req_key, - getattr(up_item, up_attr) + req_key, getattr(up_item, up_attr) ) break except up_model.DoesNotExist: @@ -1190,39 +1268,30 @@ def _format_val(val): def _format_geojson(rows, link_template): data = { - 'type': 'FeatureCollection', - 'crs': { - 'type': 'name', - 'properties': { - 'name': 'EPSG:4326' - } - }, - 'link_template': link_template, - 'features': [], - 'no-geo': [] + "type": "FeatureCollection", + "crs": {"type": "name", "properties": {"name": "EPSG:4326"}}, + "link_template": link_template, + "features": [], + "no-geo": [], } if not rows: return data for row in rows: - feat = {'id': row[0], 'name': row[1]} + feat = {"id": row[0], "name": row[1]} x, y = row[2], row[3] if not x or not y or x < -180 or x > 180 or y < -90 or y > 90: - data['no-geo'].append(feat) + data["no-geo"].append(feat) continue feature = { - 'type': 'Feature', - 'properties': feat, - 'geometry': { - 'type': 'Point', - 'coordinates': [x, y] - } + "type": "Feature", + "properties": feat, + "geometry": {"type": "Point", "coordinates": [x, y]}, } - data['features'].append(feature) + data["features"].append(feature) return data -def _get_data_from_query(items, query_table_cols, extra_request_keys, - point_field=None): +def _get_data_from_query(items, query_table_cols, extra_request_keys, point_field=None): for query_keys in query_table_cols: if not isinstance(query_keys, (tuple, list)): query_keys = [query_keys] @@ -1233,39 +1302,43 @@ def _get_data_from_query(items, query_table_cols, extra_request_keys, # only manage one level for display query_key = query_key[0] # clean - for filtr in ('__icontains', '__contains', '__iexact', - '__exact'): + for filtr in ("__icontains", "__contains", "__iexact", "__exact"): if query_key.endswith(filtr): - query_key = query_key[:len(query_key) - len(filtr)] + query_key = query_key[: len(query_key) - len(filtr)] query_key.replace(".", "__") # class style to query - values = ['id'] + query_table_cols + values = ["id"] + query_table_cols if point_field: profile = get_current_profile() precision = profile.point_precision if precision is not None: exp_x = ExpressionWrapper( - Round(Func(point_field, function='ST_X'), precision), - output_field=FloatField()) + Round(Func(point_field, function="ST_X"), precision), + output_field=FloatField(), + ) exp_y = ExpressionWrapper( - Round(Func(point_field, function='ST_Y'), precision), - output_field=FloatField()) + Round(Func(point_field, function="ST_Y"), precision), + output_field=FloatField(), + ) else: exp_x = ExpressionWrapper( - Func(point_field, function='ST_X'), output_field=FloatField()) + Func(point_field, function="ST_X"), output_field=FloatField() + ) exp_y = ExpressionWrapper( - Func(point_field, function='ST_Y'), output_field=FloatField()) + Func(point_field, function="ST_Y"), output_field=FloatField() + ) items = items.annotate(point_x=exp_x) items = items.annotate(point_y=exp_y) - values += ['point_x', 'point_y'] + values += ["point_x", "point_y"] if hasattr(items.model, "locked"): values.append("locked") values.append("lock_user_id") return items.values_list(*values) -def _get_data_from_query_old(items, query_table_cols, request, - extra_request_keys, do_not_deduplicate=False): +def _get_data_from_query_old( + items, query_table_cols, request, extra_request_keys, do_not_deduplicate=False +): c_ids, datas = [], [] has_lock = items and hasattr(items[0], "locked") @@ -1284,22 +1357,21 @@ def _get_data_from_query_old(items, query_table_cols, request, k = extra_request_keys[k] if type(k) in (list, tuple): k = k[0] - for filtr in ('__icontains', '__contains', '__iexact', - '__exact'): + for filtr in ("__icontains", "__contains", "__iexact", "__exact"): if k.endswith(filtr): - k = k[:len(k) - len(filtr)] + k = k[: len(k) - len(filtr)] vals = [item] # foreign key may be divided by "." or "__" splitted_k = [] - for ky in k.split('.'): - if '__' in ky: - splitted_k += ky.split('__') + for ky in k.split("."): + if "__" in ky: + splitted_k += ky.split("__") else: splitted_k.append(ky) for ky in splitted_k: new_vals = [] for val in vals: - if hasattr(val, 'all'): # manage related objects + if hasattr(val, "all"): # manage related objects val = list(val.all()) for v in val: v = getattr(v, ky) @@ -1313,7 +1385,7 @@ def _get_data_from_query_old(items, query_table_cols, request, pass vals = new_vals # manage last related objects - if vals and hasattr(vals[0], 'all'): + if vals and hasattr(vals[0], "all"): new_vals = [] for val in vals: new_vals += list(val.all()) @@ -1324,12 +1396,12 @@ def _get_data_from_query_old(items, query_table_cols, request, new_vals = [] if not vals: for idx, my_v in enumerate(my_vals): - new_vals.append("{}{}{}".format( - my_v, ' - ', '')) + new_vals.append("{}{}{}".format(my_v, " - ", "")) else: for idx, v in enumerate(vals): - new_vals.append("{}{}{}".format( - vals[idx], ' - ', _format_val(v))) + new_vals.append( + "{}{}{}".format(vals[idx], " - ", _format_val(v)) + ) my_vals = new_vals[:] data.append(" & ".join(my_vals) or "") if has_lock: @@ -1347,14 +1419,15 @@ def _format_modality(value): return value -def _get_json_stats(items, stats_sum_variable, stats_modality_1, - stats_modality_2, multiply=1): +def _get_json_stats( + items, stats_sum_variable, stats_modality_1, stats_modality_2, multiply=1 +): if stats_modality_2: q = items.values(stats_modality_1, stats_modality_2) else: q = items.values(stats_modality_1) - if stats_sum_variable == 'pk': - q = q.annotate(sum=Count('pk')) + if stats_sum_variable == "pk": + q = q.annotate(sum=Count("pk")) else: q = q.annotate(sum=Sum(stats_sum_variable)) data = [] @@ -1365,32 +1438,47 @@ def _get_json_stats(items, stats_sum_variable, stats_modality_1, if not data or data[-1][0] != modality_1: data.append([modality_1, []]) data[-1][1].append( - (_format_modality(values[stats_modality_2]), - int((values["sum"] or 0) * multiply)) + ( + _format_modality(values[stats_modality_2]), + int((values["sum"] or 0) * multiply), + ) ) else: q = q.order_by(stats_modality_1) for values in q.all(): modality_1 = values[stats_modality_1] - data.append([_format_modality(modality_1), - int((values["sum"] or 0) * multiply)]) + data.append( + [_format_modality(modality_1), int((values["sum"] or 0) * multiply)] + ) data = json.dumps({"data": data}) - return HttpResponse(data, content_type='application/json') + return HttpResponse(data, content_type="application/json") DEFAULT_ROW_NUMBER = 10 # length is used by ajax DataTables requests -EXCLUDED_FIELDS = ['length'] -BASE_DATED_FIELDS = ['last_modified'] - - -def get_item(model, func_name, default_name, extra_request_keys=None, - base_request=None, bool_fields=None, reversed_bool_fields=None, - dated_fields=None, associated_models=None, - relative_session_names=None, specific_perms=None, - own_table_cols=None, relation_types_prefix=None, - do_not_deduplicate=False, model_for_perms=None, - alt_query_own=None, search_form=None): +EXCLUDED_FIELDS = ["length"] +BASE_DATED_FIELDS = ["last_modified"] + + +def get_item( + model, + func_name, + default_name, + extra_request_keys=None, + base_request=None, + bool_fields=None, + reversed_bool_fields=None, + dated_fields=None, + associated_models=None, + relative_session_names=None, + specific_perms=None, + own_table_cols=None, + relation_types_prefix=None, + do_not_deduplicate=False, + model_for_perms=None, + alt_query_own=None, + search_form=None, +): """ Generic treatment of tables @@ -1415,26 +1503,34 @@ def get_item(model, func_name, default_name, extra_request_keys=None, :param search_form: associated search form to manage JSON query keys :return: """ - def func(request, data_type='json', full=False, force_own=False, - col_names=None, no_link=False, no_limit=False, return_query=False, - **dct): + + def func( + request, + data_type="json", + full=False, + force_own=False, + col_names=None, + no_link=False, + no_limit=False, + return_query=False, + **dct + ): available_perms = [] if specific_perms: available_perms = specific_perms[:] - EMPTY = '' - if 'type' in dct: - data_type = dct.pop('type') + EMPTY = "" + if "type" in dct: + data_type = dct.pop("type") if not data_type: - data_type = 'json' + data_type = "json" if "json" in data_type: - EMPTY = '[]' + EMPTY = "[]" - if data_type not in ('json', 'csv', 'json-image', 'json-map', - 'json-stats'): - return HttpResponse(EMPTY, content_type='text/plain') + if data_type not in ("json", "csv", "json-image", "json-map", "json-stats"): + return HttpResponse(EMPTY, content_type="text/plain") - if data_type == 'json-stats' and len(model.STATISTIC_MODALITIES) < 2: - return HttpResponse(EMPTY, content_type='text/plain') + if data_type == "json-stats" and len(model.STATISTIC_MODALITIES) < 2: + return HttpResponse(EMPTY, content_type="text/plain") model_to_check = model if model_for_perms: @@ -1443,22 +1539,26 @@ def get_item(model, func_name, default_name, extra_request_keys=None, if return_query: allowed, own = True, False else: - allowed, own = check_model_access_control(request, model_to_check, - available_perms) + allowed, own = check_model_access_control( + request, model_to_check, available_perms + ) if not allowed: - return HttpResponse(EMPTY, content_type='text/plain') + return HttpResponse(EMPTY, content_type="text/plain") if force_own: own = True - if full == 'shortcut' and 'SHORTCUT_SEARCH' in request.session and \ - request.session['SHORTCUT_SEARCH'] == 'own': + if ( + full == "shortcut" + and "SHORTCUT_SEARCH" in request.session + and request.session["SHORTCUT_SEARCH"] == "own" + ): own = True query_own = None if own: q = models.IshtarUser.objects.filter(user_ptr=request.user) if not q.count(): - return HttpResponse(EMPTY, content_type='text/plain') + return HttpResponse(EMPTY, content_type="text/plain") if alt_query_own: query_own = getattr(model, alt_query_own)(q.all()[0]) else: @@ -1466,7 +1566,7 @@ def get_item(model, func_name, default_name, extra_request_keys=None, query_parameters = {} - if hasattr(model, 'get_query_parameters'): + if hasattr(model, "get_query_parameters"): query_parameters = model.get_query_parameters() # get defaults from model @@ -1476,7 +1576,7 @@ def get_item(model, func_name, default_name, extra_request_keys=None, my_extra_request_keys[key] = query_parameters[key].search_query else: my_extra_request_keys = copy(extra_request_keys or {}) - if base_request is None and hasattr(model, 'BASE_REQUEST'): + if base_request is None and hasattr(model, "BASE_REQUEST"): if callable(model.BASE_REQUEST): my_base_request = model.BASE_REQUEST(request) else: @@ -1485,50 +1585,55 @@ def get_item(model, func_name, default_name, extra_request_keys=None, my_base_request = copy(base_request) else: my_base_request = {} - if not bool_fields and hasattr(model, 'BOOL_FIELDS'): + if not bool_fields and hasattr(model, "BOOL_FIELDS"): my_bool_fields = model.BOOL_FIELDS[:] else: my_bool_fields = bool_fields[:] if bool_fields else [] - if not reversed_bool_fields and hasattr(model, 'REVERSED_BOOL_FIELDS'): + if not reversed_bool_fields and hasattr(model, "REVERSED_BOOL_FIELDS"): my_reversed_bool_fields = model.REVERSED_BOOL_FIELDS[:] else: - my_reversed_bool_fields = reversed_bool_fields[:] \ - if reversed_bool_fields else [] + my_reversed_bool_fields = ( + reversed_bool_fields[:] if reversed_bool_fields else [] + ) many_counted_fields = getattr(model, "MANY_COUNTED_FIELDS", None) reversed_many_counted_fields = getattr( - model, "REVERSED_MANY_COUNTED_FIELDS", None) + model, "REVERSED_MANY_COUNTED_FIELDS", None + ) - if not dated_fields and hasattr(model, 'DATED_FIELDS'): + if not dated_fields and hasattr(model, "DATED_FIELDS"): my_dated_fields = model.DATED_FIELDS[:] else: my_dated_fields = dated_fields[:] if dated_fields else [] my_dated_fields += BASE_DATED_FIELDS - if not associated_models and hasattr(model, 'ASSOCIATED_MODELS'): + if not associated_models and hasattr(model, "ASSOCIATED_MODELS"): my_associated_models = model.ASSOCIATED_MODELS[:] else: - my_associated_models = associated_models[:] \ - if associated_models else [] - if not relative_session_names and hasattr(model, - 'RELATIVE_SESSION_NAMES'): + my_associated_models = associated_models[:] if associated_models else [] + if not relative_session_names and hasattr(model, "RELATIVE_SESSION_NAMES"): my_relative_session_names = model.RELATIVE_SESSION_NAMES[:] else: - my_relative_session_names = relative_session_names[:] \ - if relative_session_names else [] - if not relation_types_prefix and hasattr(model, - 'RELATION_TYPES_PREFIX'): + my_relative_session_names = ( + relative_session_names[:] if relative_session_names else [] + ) + if not relation_types_prefix and hasattr(model, "RELATION_TYPES_PREFIX"): my_relation_types_prefix = copy(model.RELATION_TYPES_PREFIX) else: - my_relation_types_prefix = copy(relation_types_prefix) \ - if relation_types_prefix else {} + my_relation_types_prefix = ( + copy(relation_types_prefix) if relation_types_prefix else {} + ) fields = [model._meta.get_field(k) for k in get_all_field_names(model)] - request_keys = dict([ - (field.name, - field.name + (hasattr(field, 'rel') and field.rel and '__pk' - or '')) - for field in fields]) + request_keys = dict( + [ + ( + field.name, + field.name + (hasattr(field, "rel") and field.rel and "__pk" or ""), + ) + for field in fields + ] + ) # add keys of associated models to available request key for associated_model, key in my_associated_models: @@ -1538,19 +1643,34 @@ def get_item(model, func_name, default_name, extra_request_keys=None, associated_model = globals()[associated_model] associated_fields = [ associated_model._meta.get_field(k) - for k in get_all_field_names(associated_model)] + for k in get_all_field_names(associated_model) + ] request_keys.update( - dict([(key + "__" + field.name, - key + "__" + field.name + - (hasattr(field, 'rel') and field.rel and '__pk' or '')) - for field in associated_fields])) + dict( + [ + ( + key + "__" + field.name, + key + + "__" + + field.name + + (hasattr(field, "rel") and field.rel and "__pk" or ""), + ) + for field in associated_fields + ] + ) + ) request_keys.update(my_extra_request_keys) # manage search on json fields and excluded fields - if search_form and request and request.user and getattr( - request.user, 'ishtaruser', None): - available, excluded_fields, json_fields = \ - search_form.check_custom_form(request.user.ishtaruser) + if ( + search_form + and request + and request.user + and getattr(request.user, "ishtaruser", None) + ): + available, excluded_fields, json_fields = search_form.check_custom_form( + request.user.ishtaruser + ) # for now no manage on excluded_fields: should we prevent search on # some fields regarding the user concerned? if available: @@ -1561,23 +1681,24 @@ def get_item(model, func_name, default_name, extra_request_keys=None, if "query" in dct: request_items = dct["query"] request_items["submited"] = True - elif request.method == 'POST': + elif request.method == "POST": request_items = request.POST else: request_items = request.GET - count = dct.get('count', False) + count = dct.get("count", False) # pager try: - row_nb = int(request_items.get('length')) + row_nb = int(request_items.get("length")) except (ValueError, TypeError): row_nb = DEFAULT_ROW_NUMBER - if data_type == 'json-map': # other limit for map + if data_type == "json-map": # other limit for map row_nb = settings.ISHTAR_MAP_MAX_ITEMS - if no_limit or (data_type == 'json-map' and - request_items.get('no_limit', False)): + if no_limit or ( + data_type == "json-map" and request_items.get("no_limit", False) + ): row_nb = None dct_request_items = {} @@ -1587,8 +1708,8 @@ def get_item(model, func_name, default_name, extra_request_keys=None, if k in EXCLUDED_FIELDS: continue key = k[:] - if key.startswith('searchprefix_'): - key = key[len('searchprefix_'):] + if key.startswith("searchprefix_"): + key = key[len("searchprefix_") :] dct_request_items[key] = request_items[k] request_items = dct_request_items @@ -1603,19 +1724,19 @@ def get_item(model, func_name, default_name, extra_request_keys=None, and_reqs, or_reqs = [], [] exc_and_reqs, exc_or_reqs = [], [] distinct_queries = [] - dct['extras'], dct['and_reqs'], dct['exc_and_reqs'] = [], [], [] + dct["extras"], dct["and_reqs"], dct["exc_and_reqs"] = [], [], [] - if full == 'shortcut': + if full == "shortcut": if model.SLUG == "warehouse": - key = 'name__icontains' + key = "name__icontains" else: - key = 'cached_label__icontains' - dct[key] = request.GET.get('term', None) + key = "cached_label__icontains" + dct[key] = request.GET.get("term", None) try: - old = 'old' in request_items and int(request_items['old']) + old = "old" in request_items and int(request_items["old"]) except ValueError: - return HttpResponse('[]', content_type='text/plain') + return HttpResponse("[]", content_type="text/plain") for k in request_keys: val = request_items.get(k) @@ -1644,21 +1765,28 @@ def get_item(model, func_name, default_name, extra_request_keys=None, and_reqs.append(reqs) pinned_search = "" - base_keys = ['extras', 'and_reqs', 'exc_and_reqs'] + base_keys = ["extras", "and_reqs", "exc_and_reqs"] if my_base_request: base_keys += list(my_base_request) - has_a_search = any( - k for k in dct.keys() if k not in my_base_request) + has_a_search = any(k for k in dct.keys() if k not in my_base_request) # manage default and pinned search and not bookmark - if not has_a_search and not request_items.get("search_vector", "") \ - and full != 'shortcut': - if data_type == 'csv' and func_name in request.session: + if ( + not has_a_search + and not request_items.get("search_vector", "") + and full != "shortcut" + ): + if data_type == "csv" and func_name in request.session: dct = request.session[func_name] else: # default search dct, pinned_search = _manage_default_search( - dct, request, model, default_name, my_base_request, - my_relative_session_names) + dct, + request, + model, + default_name, + my_base_request, + my_relative_session_names, + ) elif func_name and request: request.session[func_name] = dct @@ -1667,16 +1795,20 @@ def get_item(model, func_name, default_name, extra_request_keys=None, query_parameters[k] = SearchAltName(k, request_keys[k]) dct, excluded_dct, distinct_queries = _search_manage_search_vector( - model, dct, excluded_dct, distinct_queries, query_parameters, + model, + dct, + excluded_dct, + distinct_queries, + query_parameters, ) search_vector = "" - if 'search_vector' in dct: - search_vector = dct.pop('search_vector') + if "search_vector" in dct: + search_vector = dct.pop("search_vector") # manage relations types - if 'relation_types' not in my_relation_types_prefix: - my_relation_types_prefix['relation_types'] = '' + if "relation_types" not in my_relation_types_prefix: + my_relation_types_prefix["relation_types"] = "" relation_types = {} for rtype_key in my_relation_types_prefix: relation_types[my_relation_types_prefix[rtype_key]] = set() @@ -1690,18 +1822,20 @@ def get_item(model, func_name, default_name, extra_request_keys=None, dct.pop(k) ) - _manage_bool_fields(model, my_bool_fields, my_reversed_bool_fields, - dct, or_reqs) - _manage_bool_fields(model, my_bool_fields, my_reversed_bool_fields, - excluded_dct, exc_or_reqs) + _manage_bool_fields( + model, my_bool_fields, my_reversed_bool_fields, dct, or_reqs + ) + _manage_bool_fields( + model, my_bool_fields, my_reversed_bool_fields, excluded_dct, exc_or_reqs + ) tmp_excluded = {} _manage_many_counted_fields( - many_counted_fields, reversed_many_counted_fields, - dct, tmp_excluded) + many_counted_fields, reversed_many_counted_fields, dct, tmp_excluded + ) _manage_many_counted_fields( - many_counted_fields, reversed_many_counted_fields, - excluded_dct, dct) + many_counted_fields, reversed_many_counted_fields, excluded_dct, dct + ) if tmp_excluded: excluded_dct.update(tmp_excluded) @@ -1715,12 +1849,12 @@ def get_item(model, func_name, default_name, extra_request_keys=None, _manage_facet_search(model, excluded_dct, exc_and_reqs) extras = [] - if 'extras' in dct: - extras = dct.pop('extras') - if 'and_reqs' in dct: - and_reqs += dct.pop('and_reqs') - if 'exc_and_reqs' in dct: - exc_and_reqs += dct.pop('exc_and_reqs') + if "extras" in dct: + extras = dct.pop("extras") + if "and_reqs" in dct: + and_reqs += dct.pop("and_reqs") + if "exc_and_reqs" in dct: + exc_and_reqs += dct.pop("exc_and_reqs") _manage_clean_search_field(dct, excluded_dct) _manage_clean_search_field(excluded_dct, dct) @@ -1729,23 +1863,23 @@ def get_item(model, func_name, default_name, extra_request_keys=None, exc_query = None if excluded_dct or exc_and_reqs or exc_or_reqs: exc_query = _construct_query( - relation_types, excluded_dct, exc_or_reqs, exc_and_reqs) + relation_types, excluded_dct, exc_or_reqs, exc_and_reqs + ) if query_own: query = query & query_own # manage hierarchic in shortcut menu - if full == 'shortcut': + if full == "shortcut": ASSOCIATED_ITEMS = { - Operation: (File, 'associated_file__pk'), - ContextRecord: (Operation, 'operation__pk'), - Find: (ContextRecord, 'base_finds__context_record__pk'), + Operation: (File, "associated_file__pk"), + ContextRecord: (Operation, "operation__pk"), + Find: (ContextRecord, "base_finds__context_record__pk"), } if model in ASSOCIATED_ITEMS: upper_model, upper_key = ASSOCIATED_ITEMS[model] model_name = upper_model.SLUG - current = model_name in request.session \ - and request.session[model_name] + current = model_name in request.session and request.session[model_name] if current: dct = {upper_key: current} query &= Q(**dct) @@ -1768,7 +1902,7 @@ def get_item(model, func_name, default_name, extra_request_keys=None, items = items.distinct() try: - items_nb = items.values('pk').aggregate(Count('pk'))['pk__count'] + items_nb = items.values("pk").aggregate(Count("pk"))["pk__count"] except ProgrammingError: items_nb = 0 if count: @@ -1776,21 +1910,27 @@ def get_item(model, func_name, default_name, extra_request_keys=None, # print(str(items.values("id").query).encode('utf-8')) if search_vector: # for serialization - dct['search_vector'] = search_vector + dct["search_vector"] = search_vector # table cols if own_table_cols: table_cols = own_table_cols else: if full: - table_cols = [field.name for field in model._meta.fields - if field.name not in PRIVATE_FIELDS] - table_cols += [field.name for field in model._meta.many_to_many - if field.name not in PRIVATE_FIELDS] - if hasattr(model, 'EXTRA_FULL_FIELDS'): + table_cols = [ + field.name + for field in model._meta.fields + if field.name not in PRIVATE_FIELDS + ] + table_cols += [ + field.name + for field in model._meta.many_to_many + if field.name not in PRIVATE_FIELDS + ] + if hasattr(model, "EXTRA_FULL_FIELDS"): table_cols += model.EXTRA_FULL_FIELDS else: - tb_key = (getattr(model, 'SLUG', None), 'TABLE_COLS') + tb_key = (getattr(model, "SLUG", None), "TABLE_COLS") if tb_key in settings.TABLE_COLS: table_cols = settings.TABLE_COLS[tb_key] else: @@ -1803,20 +1943,27 @@ def get_item(model, func_name, default_name, extra_request_keys=None, elif data_type == "json-stats": stats_modality_1 = request_items.get("stats_modality_1", None) stats_modality_2 = request_items.get("stats_modality_2", None) - if not stats_modality_1 or \ - stats_modality_1 not in model.STATISTIC_MODALITIES: + if ( + not stats_modality_1 + or stats_modality_1 not in model.STATISTIC_MODALITIES + ): stats_modality_1 = model.STATISTIC_MODALITIES[0] if stats_modality_2 not in model.STATISTIC_MODALITIES: stats_modality_2 = None - stats_sum_variable = request_items.get('stats_sum_variable', None) + stats_sum_variable = request_items.get("stats_sum_variable", None) stats_sum_variable_keys = list(model.STATISTIC_SUM_VARIABLE.keys()) - if not stats_sum_variable or \ - stats_sum_variable not in stats_sum_variable_keys: + if ( + not stats_sum_variable + or stats_sum_variable not in stats_sum_variable_keys + ): stats_sum_variable = stats_sum_variable_keys[0] multiply = model.STATISTIC_SUM_VARIABLE[stats_sum_variable][1] return _get_json_stats( - items, stats_sum_variable, stats_modality_1, stats_modality_2, - multiply=multiply + items, + stats_sum_variable, + stats_modality_1, + stats_modality_2, + multiply=multiply, ) query_table_cols = [] @@ -1824,52 +1971,53 @@ def get_item(model, func_name, default_name, extra_request_keys=None, if type(cols) not in (list, tuple): cols = [cols] for col in cols: - query_table_cols += col.split('|') + query_table_cols += col.split("|") # contextual (full, simple, etc.) col - contxt = full and 'full' or 'simple' - if hasattr(model, 'CONTEXTUAL_TABLE_COLS') and \ - contxt in model.CONTEXTUAL_TABLE_COLS: + contxt = full and "full" or "simple" + if ( + hasattr(model, "CONTEXTUAL_TABLE_COLS") + and contxt in model.CONTEXTUAL_TABLE_COLS + ): for idx, col in enumerate(table_cols): if col in model.CONTEXTUAL_TABLE_COLS[contxt]: - query_table_cols[idx] = \ - model.CONTEXTUAL_TABLE_COLS[contxt][col] + query_table_cols[idx] = model.CONTEXTUAL_TABLE_COLS[contxt][col] - if data_type in ('json-image', 'json-map') or full == 'shortcut': + if data_type in ("json-image", "json-map") or full == "shortcut": if model.SLUG == "warehouse": - query_table_cols.append('name') - table_cols.append('name') + query_table_cols.append("name") + table_cols.append("name") else: - query_table_cols.append('cached_label') - table_cols.append('cached_label') - if data_type == 'json-image': - query_table_cols.append('main_image__thumbnail') - table_cols.append('main_image__thumbnail') - query_table_cols.append('main_image__image') - table_cols.append('main_image__image') - elif data_type == 'json-map': + query_table_cols.append("cached_label") + table_cols.append("cached_label") + if data_type == "json-image": + query_table_cols.append("main_image__thumbnail") + table_cols.append("main_image__thumbnail") + query_table_cols.append("main_image__image") + table_cols.append("main_image__image") + elif data_type == "json-map": if model.SLUG == "find": - query_table_cols.append('base_finds__point_2d') - table_cols.append('base_finds__point_2d') + query_table_cols.append("base_finds__point_2d") + table_cols.append("base_finds__point_2d") else: - query_table_cols.append('point_2d') - table_cols.append('point_2d') + query_table_cols.append("point_2d") + table_cols.append("point_2d") # manage sort tables manual_sort_key = None sorts = {} for k in request_items: - if not k.startswith('order['): + if not k.startswith("order["): continue - num = int(k.split(']')[0][len("order["):]) + num = int(k.split("]")[0][len("order[") :]) if num not in sorts: - sorts[num] = ['', ''] # sign, col_num - if k.endswith('[dir]'): + sorts[num] = ["", ""] # sign, col_num + if k.endswith("[dir]"): order = request_items[k] - sign = order and order == 'desc' and "-" or '' + sign = order and order == "desc" and "-" or "" sorts[num][0] = sign - if k.endswith('[column]'): + if k.endswith("[column]"): sorts[num][1] = request_items[k] sign = "" if not sorts and model._meta.ordering: @@ -1890,14 +2038,16 @@ def get_item(model, func_name, default_name, extra_request_keys=None, ks = [ks] for k in ks: if k.endswith("__pk"): - k = k[:-len("__pk")] + "__label" + k = k[: -len("__pk")] + "__label" if k.endswith("towns"): k = k + "__cached_label" - if k.endswith("__icontains") or \ - k.endswith("__contains") or \ - k.endswith("__iexact") or \ - k.endswith("__exact"): - k = '__'.join(k.split('__')[:-1]) + if ( + k.endswith("__icontains") + or k.endswith("__contains") + or k.endswith("__iexact") + or k.endswith("__exact") + ): + k = "__".join(k.split("__")[:-1]) # if '__' in k: # k = k.split('__')[0] orders.append(signe + k) @@ -1909,7 +2059,9 @@ def get_item(model, func_name, default_name, extra_request_keys=None, manual_sort_key = k logger.warning( "**WARN get_item - {}**: manual sort key '{}'".format( - func_name, k)) + func_name, k + ) + ) break if not manual_sort_key: items = items.order_by(*orders) @@ -1919,14 +2071,14 @@ def get_item(model, func_name, default_name, extra_request_keys=None, page_nb = 1 if row_nb and data_type.startswith("json"): try: - start = int(request_items.get('start')) + start = int(request_items.get("start")) page_nb = start // row_nb + 1 assert page_nb >= 1 except (TypeError, ValueError, AssertionError): start = 0 page_nb = 1 end = int(page_nb * row_nb) - if full == 'shortcut': + if full == "shortcut": start = 0 end = 20 @@ -1938,19 +2090,21 @@ def get_item(model, func_name, default_name, extra_request_keys=None, if old: items = [item.get_previous(old) for item in items] - if data_type == 'json-map': + if data_type == "json-map": point_field = query_table_cols.pop() datas = _get_data_from_query( - items, query_table_cols, my_extra_request_keys, - point_field=point_field) - elif data_type != "csv" and getattr( - model, "NEW_QUERY_ENGINE", False): - datas = _get_data_from_query( - items, query_table_cols, my_extra_request_keys) + items, query_table_cols, my_extra_request_keys, point_field=point_field + ) + elif data_type != "csv" and getattr(model, "NEW_QUERY_ENGINE", False): + datas = _get_data_from_query(items, query_table_cols, my_extra_request_keys) else: datas = _get_data_from_query_old( - items, query_table_cols, request, my_extra_request_keys, - do_not_deduplicate) + items, + query_table_cols, + request, + my_extra_request_keys, + do_not_deduplicate, + ) if manual_sort_key: # +1 because the id is added as a first col @@ -1959,56 +2113,60 @@ def get_item(model, func_name, default_name, extra_request_keys=None, idx_col = query_table_cols.index(manual_sort_key) + 1 else: for idx, col in enumerate(query_table_cols): - if type(col) in (list, tuple) and \ - manual_sort_key in col: + if type(col) in (list, tuple) and manual_sort_key in col: idx_col = idx + 1 if idx_col is not None: datas = sorted(datas, key=lambda x: x[idx_col]) - if sign == '-': + if sign == "-": datas = reversed(datas) datas = list(datas)[start:end] - link_template = \ - "<a class='display_details' href='#' " \ - "onclick='load_window(\"{}\")'>" \ - "<i class=\"fa fa-info-circle\" aria-hidden=\"true\"></i><lock></a>" + link_template = ( + "<a class='display_details' href='#' " + "onclick='load_window(\"{}\")'>" + '<i class="fa fa-info-circle" aria-hidden="true"></i><lock></a>' + ) link_ext_template = '<a href="{}" target="_blank">{}</a>' lock = ' <i class="fa fa-lock text-danger" aria-hidden="true"></i>' - own_lock = ' <i class="fa fa-lock text-success" ' \ - 'aria-hidden="true"></i>' + own_lock = ' <i class="fa fa-lock text-success" ' 'aria-hidden="true"></i>' has_locks = hasattr(model, "locked") current_user_id = request.user and request.user.id if data_type.startswith("json"): rows = [] - if data_type == 'json-map': + if data_type == "json-map": lnk = link_template.format( - reverse('show-' + default_name, args=[999999, '']), + reverse("show-" + default_name, args=[999999, ""]), ) - lnk = lnk.replace('999999', "<pk>") + lnk = lnk.replace("999999", "<pk>") if not has_locks: - lnk = lnk.replace('<lock>', "") + lnk = lnk.replace("<lock>", "") data = json.dumps(_format_geojson(datas, lnk)) - return HttpResponse(data, content_type='application/json') + return HttpResponse(data, content_type="application/json") for data in datas: res = { - 'id': data[0], + "id": data[0], } if not no_link: try: lnk_template = link_template lnk = lnk_template.format( - reverse('show-' + default_name, args=[data[0], ''])) + reverse("show-" + default_name, args=[data[0], ""]) + ) if has_locks and data[-2]: if data[-1] == current_user_id: - lnk = lnk.replace('<lock>', own_lock) + lnk = lnk.replace("<lock>", own_lock) else: - lnk = lnk.replace('<lock>', lock) + lnk = lnk.replace("<lock>", lock) else: - lnk = lnk.replace('<lock>', "") + lnk = lnk.replace("<lock>", "") except NoReverseMatch: logger.warning( - '**WARN "show-' + default_name + '" args (' - + str(data[0]) + ") url not available") - lnk = '' + '**WARN "show-' + + default_name + + '" args (' + + str(data[0]) + + ") url not available" + ) + lnk = "" res["link"] = lnk for idx, value in enumerate(data[1:]): if not value or idx >= len(table_cols): @@ -2019,55 +2177,59 @@ def get_item(model, func_name, default_name, extra_request_keys=None, tab_cols = [] # foreign key may be divided by "." or "__" for tc in table_col: - if '.' in tc: - tab_cols += tc.split('.') - elif '__' in tc: - tab_cols += tc.split('__') + if "." in tc: + tab_cols += tc.split(".") + elif "__" in tc: + tab_cols += tc.split("__") else: tab_cols.append(tc) k = "__".join(tab_cols) if k.endswith("__image") or k.endswith("__thumbnail"): - if not value.startswith(settings.MEDIA_ROOT) and not \ - value.startswith("http://") and not \ - value.startswith("https://"): + if ( + not value.startswith(settings.MEDIA_ROOT) + and not value.startswith("http://") + and not value.startswith("https://") + ): value = settings.MEDIA_URL + value - if hasattr(model, 'COL_LINK') and k in model.COL_LINK: + if hasattr(model, "COL_LINK") and k in model.COL_LINK: value = link_ext_template.format(value, value) if isinstance(value, datetime.date): - value = value.strftime('%Y-%m-%d') + value = value.strftime("%Y-%m-%d") if isinstance(value, datetime.datetime): - value = value.strftime('%Y-%m-%d %H:%M:%S') + value = value.strftime("%Y-%m-%d %H:%M:%S") res[k] = value - if full == 'shortcut': - if 'cached_label' in res: - res['value'] = res.pop('cached_label') - elif 'name' in res: - res['value'] = res.pop('name') + if full == "shortcut": + if "cached_label" in res: + res["value"] = res.pop("cached_label") + elif "name" in res: + res["value"] = res.pop("name") rows.append(res) - if full == 'shortcut': + if full == "shortcut": data = json.dumps(rows) else: total = ( - items_nb // row_nb + (1 if items_nb % row_nb else 0) - ) if row_nb else items_nb - data = json.dumps({ - "recordsTotal": items_nb, - "recordsFiltered": items_nb, - "rows": rows, - "table-cols": table_cols, - "pinned-search": pinned_search, - "page": page_nb, - "total": total, - }) - return HttpResponse(data, content_type='application/json') + (items_nb // row_nb + (1 if items_nb % row_nb else 0)) + if row_nb + else items_nb + ) + data = json.dumps( + { + "recordsTotal": items_nb, + "recordsFiltered": items_nb, + "rows": rows, + "table-cols": table_cols, + "pinned-search": pinned_search, + "page": page_nb, + "total": total, + } + ) + return HttpResponse(data, content_type="application/json") elif data_type == "csv": - response = HttpResponse(content_type='text/csv', charset=ENCODING) + response = HttpResponse(content_type="text/csv", charset=ENCODING) n = datetime.datetime.now() - filename = '%s_%s.csv' % ( - default_name, n.strftime('%Y%m%d-%H%M%S')) - response['Content-Disposition'] = 'attachment; filename=%s' \ - % filename + filename = "%s_%s.csv" % (default_name, n.strftime("%Y%m%d-%H%M%S")) + response["Content-Disposition"] = "attachment; filename=%s" % filename writer = csv.writer(response, **CSV_OPTIONS) if col_names: col_names = [name for name in col_names] @@ -2076,8 +2238,7 @@ def get_item(model, func_name, default_name, extra_request_keys=None, for field_name in table_cols: if type(field_name) in (list, tuple): field_name = " & ".join(field_name) - if hasattr(model, 'COL_LABELS') and \ - field_name in model.COL_LABELS: + if hasattr(model, "COL_LABELS") and field_name in model.COL_LABELS: field = model.COL_LABELS[field_name] col_names.append(str(field)) continue @@ -2090,7 +2251,8 @@ def get_item(model, func_name, default_name, extra_request_keys=None, "**WARN get_item - csv export**: no col name " "for {}\nadd explicit label to " "COL_LABELS attribute of " - "{}".format(field_name, model)) + "{}".format(field_name, model) + ) continue col_names.append(str(field.verbose_name)) writer.writerow(col_names) @@ -2102,8 +2264,7 @@ def get_item(model, func_name, default_name, extra_request_keys=None, break val = data[1:][idx + delta] if col_name and "|" in col_name[0]: - for delta_idx in range( - len(col_name[0].split('|')) - 1): + for delta_idx in range(len(col_name[0].split("|")) - 1): delta += 1 val += data[1:][idx + delta] row.append(val) @@ -2115,10 +2276,9 @@ def get_item(model, func_name, default_name, extra_request_keys=None, try: vals.append(v.encode(ENCODING).decode(ENCODING)) except UnicodeEncodeError: - vals.append(unidecode(v).encode(ENCODING).decode( - ENCODING)) + vals.append(unidecode(v).encode(ENCODING).decode(ENCODING)) writer.writerow(vals) return response - return HttpResponse('{}', content_type='text/plain') + return HttpResponse("{}", content_type="text/plain") return func |