#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2010-2025 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import csv import datetime import importlib from jinja2 import TemplateSyntaxError import json import logging import os import re import unicodedata import urllib.parse try: from registration import views as registration_views # debian except ModuleNotFoundError: from django_registration import views as registration_views # pip from django.apps import apps from django.conf import settings from django.contrib import messages from django.contrib.auth import logout from django.contrib.auth.decorators import login_required from django.contrib.auth import views as auth_view from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ObjectDoesNotExist from django.core.cache import cache from django.db.models import Q from django.template import loader from django.forms.models import modelformset_factory from django.http import ( HttpResponse, Http404, HttpResponseRedirect, HttpResponseBadRequest, JsonResponse, ) from django.shortcuts import redirect, render, get_object_or_404 from django.urls import reverse, NoReverseMatch from django.utils import translation from django.utils.decorators import method_decorator from django.utils.translation import ugettext, ugettext_lazy as _ from django.views.generic import ListView, TemplateView, View from django.views.generic.edit import CreateView, DeleteView, FormView, UpdateView from extra_views import ModelFormSetView from markdown import markdown from . import models from archaeological_context_records.models import ContextRecord from archaeological_files.models import File from archaeological_finds.models import Find, Treatment, TreatmentFile from archaeological_operations.models import Operation, ArchaeologicalSite from archaeological_warehouse.models import Warehouse from ishtar_common import forms_common as forms from ishtar_common import wizards from ishtar_common.data_importer import ImporterError from ishtar_common.forms import FinalForm, FinalDeleteForm, reverse_lazy from ishtar_common.models import get_current_profile from ishtar_common.models_common import QuickAction from ishtar_common.templatetags.link_to_window import simple_link_to_window from ishtar_common.utils_migrations import HOMEPAGE_TITLE from ishtar_common.utils import ( clean_session_cache, CSV_OPTIONS, get_current_item_keys, get_current_item_keys_dict, get_field_labels_from_path, get_ishtaruser_gdpr_log, get_person_gdpr_log, get_random_item_image_link, get_news_feed, shortify, dict_to_tuple, put_session_message, get_model_by_slug, human_date, ) from ishtar_common.widgets import JQueryAutoComplete from ishtar_common import tasks from .views_item import ( check_permission, display_item, get_autocomplete_query, get_item, get_short_html_detail, modify_qa_item, new_qa_item, show_item, ) convert_document = None if settings.USE_LIBREOFFICE: from ishtar_common.libreoffice import convert_document logger = logging.getLogger(__name__) def status(request): return HttpResponse("OK") def raise_error(request): return 1 / 0 def raise_task_error(request): if settings.USE_BACKGROUND_TASK: tasks.trigger_error.delay() return HttpResponse("OK") def wizard_is_available(wizard, request, model, pk): try: wizard(request) except IndexError: # no step available put_session_message( request.session.session_key, _("You don't have sufficient permissions to do this action."), "warning", ) return q = model.objects.filter(pk=pk) if not q.count(): raise Http404() obj = q.all()[0] if hasattr(obj, "is_locked") and obj.is_locked(request.user): raise Http404() return obj def tiny_redirect(request, url_id): db_id = models.TinyUrl.decode_id(url_id) link_db = get_object_or_404(models.TinyUrl, id=db_id) return redirect(link_db.link) def _count_stats(model): cache_key = f"{settings.PROJECT_SLUG}-stats-{model.__name__}" value = cache.get(cache_key) if value: return value value = model.objects.count() cache.set(cache_key, value, settings.CACHE_TIMEOUT) return value def _get_statistics(profile): stats = [ (_("Operations"), _count_stats(apps.get_model("archaeological_operations", "Operation"))) ] if profile.archaeological_site: stats.append((_("Archaeological sites"), _count_stats(apps.get_model("archaeological_operations", "ArchaeologicalSite")))) if profile.context_record: stats.append((_("Context records"), _count_stats(apps.get_model("archaeological_context_records", "ContextRecord")))) if profile.find: stats.append((_("Finds"), _count_stats(apps.get_model("archaeological_finds", "Find")))) if profile.warehouse: stats.append((_("Warehouses"), _count_stats(apps.get_model("archaeological_warehouse", "Warehouse")))) stats.append((_("Containers"), _count_stats(apps.get_model("archaeological_warehouse", "Container")))) if profile.files: stats.append((_("Archaeological files"), _count_stats(apps.get_model("archaeological_files", "File")))) stats.append((_("Administrative acts"), _count_stats(apps.get_model("archaeological_operations", "AdministrativeAct")))) return stats def index(request): """ Main page """ profile = get_current_profile() welcome_title = profile.homepage_title or str(HOMEPAGE_TITLE) dct = {"warnings": [], "extra_form_modals": wizards.EXTRA_FORM_MODALS, "welcome_title": welcome_title} if settings.PROJECT_SLUG == "default": dct["warnings"].append( _( 'PROJECT_SLUG is set to "default". Change it in your ' "local_settings (or ask your admin to do it)." ) ) if profile.slug == "default": dct["warnings"].append( _( 'The slug of your current profile is set to "default". Change it ' "on the administration page (or ask your admin to do it)." ) ) authenticated = request.user.is_authenticated dct["news_feed"] = get_news_feed() display_random_image = authenticated and profile.homepage_random_image_available if display_random_image: dct["display_random_image"] = True dct["random_image"] = get_random_item_image_link(request) if profile.homepage: dct["homepage"] = markdown(profile.homepage) # remove old hardcoded "{random_image}" from custom templates if "{random_image}" in dct["homepage"]: dct["homepage"] = dct["homepage"].replace("{random_image}", "") else: old_language = translation.get_language() translation.activate(settings.LANGUAGE_CODE) dct["homepage"] = loader.render_to_string("ishtar/homepage_default.html") translation.activate(old_language) dct["homepage"] = dct["homepage"].replace("ISHTAR_DOCUMENT_VERSION", settings.ISHTAR_DOCUMENT_VERSION) display_statistics = profile.homepage_statistics_available and ( authenticated or profile.homepage_statistics_available_offline ) if display_statistics: dct["display_statistics"] = True dct["statistics"] = _get_statistics(profile) try: return render(request, "index.html", dct) except NoReverseMatch: # probably rights exception (rights revoked) logout(request) return render(request, "index.html", dct) def display_news_feed(request): """ Display news feed """ news_feed = "" if hasattr(request.user, "ishtaruser") and \ request.user.ishtaruser and request.user.ishtaruser.display_forum_entries: news_feed = get_news_feed() return render(request, "ishtar/blocks/news_feed.html", {"news_feed": news_feed}) class LoginView(auth_view.LoginView): form_class = forms.AuthenticationForm def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["registration_open"] = settings.REGISTRATION_OPEN return context class LogoutView(auth_view.LogoutView): def get(self, request, *args, **kwargs): # clear cache keys = [] if request.user and hasattr(request.user, "pk") and request.user.pk: keys.append(f"{settings.PROJECT_SLUG}-password_expired-{request.user.pk}") for key in keys: cache.delete(key) return super().get(request, *args, **kwargs) def update_password_last_update(user): try: ishtar_user = models.IshtarUser.objects.get(pk=user.pk) except models.IshtarUser.DoesNotExist: return ishtar_user.password_last_update = datetime.date.today() ishtar_user.save() key = f"{settings.PROJECT_SLUG}-password_expired-{user.pk}" cache.delete(key) class PasswordChangeView(auth_view.PasswordChangeView): form_class = forms.PasswordChangeForm success_url = reverse_lazy('start') template_name = 'registration/form.html' def form_valid(self, form): returned = super().form_valid(form) update_password_last_update(form.user) messages.add_message(self.request, messages.INFO, _("Password changed")) return returned def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["page_title"] = _("Change password") return context class PasswordResetConfirmView(auth_view.PasswordResetConfirmView): form_class = forms.SetPasswordForm success_url = reverse_lazy('login') def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["page_title"] = _("Password reset") return context def form_valid(self, form): returned = super().form_valid(form) update_password_last_update(form.user) messages.add_message(self.request, messages.INFO, _("Password changed")) return returned class RegistrationView(registration_views.RegistrationView): template_name = 'registration/form.html' def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["page_title"] = _("Register") return context def register(self, form): # TODO raise NotImplementedError class PasswordResetView(auth_view.PasswordResetView): template_name = 'registration/form.html' success_url = reverse_lazy('start') def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["page_title"] = _("Reset password") return context def form_valid(self, form): returned = super().form_valid(form) messages.add_message( self.request, messages.INFO, _("Email with password reset instructions has been sent.") ) return returned person_search_wizard = wizards.PersonSearch.as_view( [("general-person_search", forms.PersonFormSelection)], label=_("Person search"), url_name="person_search", ) person_creation_wizard = wizards.PersonWizard.as_view( [ ("identity-person_creation", forms.SimplePersonForm), ("person_type-person_creation", forms.PersonTypeForm), ("final-person_creation", FinalForm), ], label=_("New person"), url_name="person_creation", ) person_modification_wizard = wizards.PersonModifWizard.as_view( [ ("selec-person_modification", forms.PersonFormSelection), ("identity-person_modification", forms.SimplePersonForm), ("person_type-person_creation", forms.PersonTypeForm), ("final-person_modification", FinalForm), ], label=_("Person modification"), url_name="person_modification", ) def person_modify(request, pk): if not wizard_is_available(person_modification_wizard, request, models.Person, pk): return HttpResponseRedirect("/") wizards.PersonModifWizard.session_set_value( request, "selec-person_modification", "pk", pk, reset=True ) return redirect( reverse("person_modification", kwargs={"step": "identity-person_modification"}) ) person_deletion_wizard = wizards.PersonDeletionWizard.as_view( [ ("selec-person_deletion", forms.PersonFormMultiSelection), ("final-person_deletion", FinalDeleteForm), ], label=_("Person deletion"), url_name="person_deletion", ) def person_delete(request, pk): if not wizard_is_available(person_deletion_wizard, request, models.Person, pk): return HttpResponseRedirect("/") wizards.PersonDeletionWizard.session_set_value( request, "selec-person_deletion", "pks", pk, reset=True ) return redirect( reverse("person_deletion", kwargs={"step": "final-person_deletion"}) ) organization_search_wizard = wizards.OrganizationSearch.as_view( [("general-organization_search", forms.OrganizationFormSelection)], label=_("Organization search"), url_name="organization_search", ) organization_creation_wizard = wizards.OrganizationWizard.as_view( [ ("identity-organization_creation", forms.OrganizationForm), ("final-organization_creation", FinalForm), ], label=_("New organization"), url_name="organization_creation", ) organization_modification_wizard = wizards.OrganizationModifWizard.as_view( [ ("selec-organization_modification", forms.OrganizationFormSelection), ("identity-organization_modification", forms.OrganizationForm), ("final-organization_modification", FinalForm), ], label=_("Organization modification"), url_name="organization_modification", ) def organization_modify(request, pk): if not wizard_is_available( organization_modification_wizard, request, models.Organization, pk ): return HttpResponseRedirect("/") wizards.OrganizationModifWizard.session_set_value( request, "selec-organization_modification", "pk", pk, reset=True ) return redirect( reverse( "organization_modification", kwargs={"step": "identity-organization_modification"}, ) ) organization_deletion_wizard = wizards.OrganizationDeletionWizard.as_view( [ ("selec-organization_deletion", forms.OrganizationFormMultiSelection), ("final-organization_deletion", FinalDeleteForm), ], label=_("Organization deletion"), url_name="organization_deletion", ) def organization_delete(request, pk): if not wizard_is_available( organization_deletion_wizard, request, models.Organization, pk ): return HttpResponseRedirect("/") wizard_url = "organization_deletion" wizards.OrganizationDeletionWizard.session_set_value( request, "selec-" + wizard_url, "pks", pk, reset=True ) return redirect(reverse(wizard_url, kwargs={"step": "final-" + wizard_url})) account_wizard_steps = [ ("selec-account_management", forms.PersonUserFormSelection), ("account-account_management", forms.AccountForm), ("profile-account_management", forms.ProfileFormset), ("final-account_management", forms.FinalAccountForm), ] account_management_wizard = wizards.AccountWizard.as_view( account_wizard_steps, label=_("Account management"), url_name="account_management", ) account_deletion_wizard = wizards.IshtarUserDeletionWizard.as_view( [ ("selec-account_deletion", forms.AccountFormMultiSelection), ("final-account_deletion", FinalDeleteForm), ], label=_("Account deletion"), url_name="account_deletion", ) def account_manage(request, pk): if not wizard_is_available(account_management_wizard, request, models.Person, pk): return HttpResponseRedirect("/") wizards.AccountWizard.session_set_value( request, "selec-account_management", "pk", pk, reset=True ) return redirect( reverse("account_management", kwargs={"step": "account-account_management"}) ) def get_autocomplete_generic(model, extra=None): if not extra: extra = {"available": True} def func(request): q = request.GET.get("term") query = Q(**extra) nb = 0 objects = [] has_parent = hasattr(model, "parent") if not q: q = "" else: query_exact = query & Q(label__iexact=q) query_exact = model.objects.filter(query_exact) nb = query_exact.count() objects = list(query_exact.all()) for q in q.split(" "): if not q: continue sub_q = Q(label__icontains=q) if has_parent: sub_q |= Q(parent__label__icontains=q) sub_q |= Q(parent__parent__label__icontains=q) sub_q |= Q(parent__parent__parent__label__icontains=q) query = query & (sub_q) limit = 20 - nb if limit > 0: sort = ["label"] if has_parent: sort = ["parent__label", "label"] objects += list( model.objects.filter(query).order_by(*sort).distinct()[:limit] ) get_label = lambda x: x.full_label() if hasattr(x, "full_label") else str(x) data = json.dumps([{"id": obj.pk, "value": get_label(obj)} for obj in objects]) return HttpResponse(data, content_type="text/plain") return func def hide_shortcut_menu(request): request.session["SHORTCUT_SHOW"] = "off" return HttpResponse("OK", content_type="text/plain") def show_shortcut_menu(request): request.session["SHORTCUT_SHOW"] = "on" return HttpResponse("OK", content_type="text/plain") def activate_all_search(request): request.session["SHORTCUT_SEARCH"] = "all" return HttpResponse("OK", content_type="text/plain") def activate_own_search(request): request.session["SHORTCUT_SEARCH"] = "own" return HttpResponse("OK", content_type="text/plain") def activate_advanced_shortcut_menu(request): if not hasattr(request.user, "ishtaruser"): return HttpResponse("KO", content_type="text/plain") request.user.ishtaruser.advanced_shortcut_menu = True request.user.ishtaruser.save() return HttpResponse("OK", content_type="text/plain") def activate_simple_shortcut_menu(request): if not hasattr(request.user, "ishtaruser"): return HttpResponse("KO", content_type="text/plain") request.user.ishtaruser.advanced_shortcut_menu = False request.user.ishtaruser.save() return HttpResponse("OK", content_type="text/plain") def settings_js(request): return render(request, "ishtar/settings.js", content_type="text/javascript") def shortcut_menu(request): profile = get_current_profile() CURRENT_ITEMS = [] if profile.files: CURRENT_ITEMS.append((_("Archaeological file"), File)) CURRENT_ITEMS.append((_("Operation"), Operation)) if profile.archaeological_site: CURRENT_ITEMS.append((profile.get_site_label(), ArchaeologicalSite)) if profile.context_record: CURRENT_ITEMS.append((_("Context record"), ContextRecord)) if profile.warehouse: CURRENT_ITEMS.append((_("Warehouse"), Warehouse)) if profile.find: CURRENT_ITEMS.append((_("Find"), Find)) if profile.warehouse: CURRENT_ITEMS.append((_("Treatment request"), TreatmentFile)) CURRENT_ITEMS.append((_("Treatment"), Treatment)) if ( hasattr(request.user, "ishtaruser") and request.user.ishtaruser.advanced_shortcut_menu ): dct = { "current_menu": [], "menu": [], "SHORTCUT_SEARCH": request.session["SHORTCUT_SEARCH"] if "SHORTCUT_SEARCH" in request.session else "own", "SHORTCUT_SHOW": request.session["SHORTCUT_SHOW"] if "SHORTCUT_SHOW" in request.session else "on", } current_selected_labels = [] for lbl, model in CURRENT_ITEMS: model_name = model.SLUG current = model_name in request.session and request.session[model_name] if current: try: item = model.objects.get(pk=int(current)) item_label = shortify(str(item), 60) current_selected_labels.append(item_label) except model.DoesNotExist: pass dct["menu"].append( ( lbl, model_name, current or 0, JQueryAutoComplete( reverse("get-" + model.SLUG + "-shortcut"), model ).render( model.SLUG + "-shortcut", value=current, attrs={"id": "current_" + model.SLUG}, ), ) ) dct["current_selected_labels"] = current_selected_labels return render(request, "ishtar/blocks/advanced_shortcut_menu.html", dct) dct = { "current_menu": [], "SHORTCUT_SHOW": request.session["SHORTCUT_SHOW"] if "SHORTCUT_SHOW" in request.session else "off", } current_selected_labels = [] current_selected_item = {} labels = {} for lbl, model in CURRENT_ITEMS: new_selected_item = None model_name = model.SLUG cls = "" current = model_name in request.session and request.session[model_name] items = [] current_items = [] labels[model_name] = {} lbl_key = "cached_label" if model.SLUG == "warehouse": lbl_key = "name" values = ["id", lbl_key] owns = ( model.get_owns( request.user, menu_filtr=current_selected_item, limit=100, values=values, get_short_menu_class=True, ) or [] ) for item, shortmenu_class in owns: pk = str(item["id"]) if shortmenu_class == "basket": pk = "basket-" + pk # prevent duplicates if pk in current_items: continue current_items.append(pk) selected = pk == current item_label = shortify(item[lbl_key], 60) if selected: cls = shortmenu_class new_selected_item = pk labels[model_name][str(pk)] = item_label items.append((pk, item_label, selected, shortmenu_class)) # selected is not in owns - add it to the list if not new_selected_item and current: try: item = model.objects.get(pk=int(current)) new_selected_item = item.pk item_label = shortify(str(item), 60) labels[model_name][str(item.pk)] = item_label items.append( (item.pk, item_label, True, item.get_short_menu_class(item.pk)) ) except (model.DoesNotExist, ValueError): pass if items: dct["current_menu"].append((lbl, model_name, cls, items)) if new_selected_item: current_selected_item[model_name] = new_selected_item if str(new_selected_item) in labels[model_name]: current_selected_labels.append( labels[model_name][str(new_selected_item)] ) dct["current_selected_labels"] = current_selected_labels return render(request, "ishtar/blocks/shortcut_menu.html", dct) def get_current_items(request): currents = {} current_item_keys = get_current_item_keys() for key, model in current_item_keys: currents[key] = None if key in request.session and request.session[key]: try: currents[key] = model.objects.get(pk=int(request.session[key])) except (ValueError, model.DoesNotExist): continue return currents def unpin(request, item_type, cascade=False): current_item_keys_dict = get_current_item_keys_dict() if item_type not in current_item_keys_dict.keys(): logger.warning("unpin unknow type: {}".format(item_type)) return HttpResponse("nok") if "administrativeact" in item_type: request.session[item_type] = "" return HttpResponse("ok") request.session["treatment"] = "" if item_type == "treatment" and not cascade: return HttpResponse("ok") request.session["treatmentfile"] = "" if item_type == "treatmentfile" and not cascade: return HttpResponse("ok") request.session["find"] = "" if item_type == "find" and not cascade: return HttpResponse("ok") request.session["warehouse"] = "" if item_type == "warehouse" and not cascade: return HttpResponse("ok") request.session["contextrecord"] = "" if item_type == "contextrecord" and not cascade: return HttpResponse("ok") request.session["site"] = "" if item_type == "site" and not cascade: return HttpResponse("ok") request.session["operation"] = "" if item_type == "operation" and not cascade: return HttpResponse("ok") request.session["file"] = "" if item_type == "file" and not cascade: return HttpResponse("ok") def update_current_item(request, item_type=None, pk=None): if not item_type or not pk: if not request.is_ajax() and not request.method == "POST": raise Http404 item_type = request.POST["item"] if "value" in request.POST and "item" in request.POST: request.session[item_type] = request.POST["value"] else: request.session[item_type] = str(pk) request.session["SHORTCUT_SEARCH"] = "all" currents = get_current_items(request) # re-init when descending item are not relevant if ( item_type == "file" and currents["file"] and currents["operation"] and currents["operation"].associated_file != currents["file"] ): request.session["operation"] = "" currents["operation"] = None if ( item_type in ("operation", "file") and currents["contextrecord"] and ( not request.session.get("operation", None) or currents["contextrecord"].operation != currents["operation"] ) ): request.session["contextrecord"] = "" currents["contextrecord"] = None from archaeological_finds.models import Find if ( item_type in ("contextrecord", "operation", "file") and currents["find"] and ( not request.session.get("contextrecord", None) or not Find.objects.filter( downstream_treatment__isnull=True, base_finds__context_record__pk=request.session["contextrecord"], pk=currents["find"].pk, ).count() ) ): request.session["find"] = "" currents["find"] = None # re-init ascending item with relevant values if item_type == "find" and currents["find"]: from archaeological_context_records.models import ContextRecord q = ContextRecord.objects.filter(base_finds__find__pk=currents["find"].pk) if q.count(): currents["contextrecord"] = q.all()[0] request.session["contextrecord"] = str(currents["contextrecord"].pk) if item_type in ("find", "contextrecord") and currents["contextrecord"]: currents["operation"] = currents["contextrecord"].operation request.session["operation"] = str(currents["operation"].pk) if item_type in ("find", "contextrecord", "operation") and currents["operation"]: currents["file"] = currents["operation"].associated_file request.session["file"] = str(currents["file"].pk) if currents["file"] else None return HttpResponse("ok") def pin_search(request, item_type): if item_type.startswith("administrativeact"): item_type = "administrativeact" key = "pin-search-" + item_type if not item_type or not ( request.is_ajax() and request.method == "POST" and "value" in request.POST ): raise Http404 request.session[key] = request.POST["value"] if not request.POST["value"]: # empty all unpin(request, item_type, cascade=True) else: unpin(request, item_type) return HttpResponse("ok") def get_by_importer( request, slug, data_type="json", full=False, force_own=False, **dct ): q = models.ImporterType.objects.filter(slug=slug) if not q.count(): res = "" if data_type == "json": res = "{}" return HttpResponse(res, content_type="text/plain") importer = q.all()[0] importer_class = importer.get_importer_class() cols, col_names = importer.get_columns(importer_class=importer_class) if data_type == "csv" or dct.get("type", "") == "csv": obj_name = importer.name else: obj_name = importer_class.OBJECT_CLS.__name__.lower() return get_item(importer_class.OBJECT_CLS, "get_" + obj_name, obj_name, own_table_cols=cols)( request, data_type, full, force_own, col_names=col_names, **dct ) def autocomplete_person_permissive( request, person_types=None, attached_to=None, is_ishtar_user=None ): return autocomplete_person( request, person_types=person_types, attached_to=attached_to, is_ishtar_user=is_ishtar_user, permissive=True, ) def autocomplete_user(request): query = get_autocomplete_query(request, "ishtar_common", "person") if query: return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term") limit = request.GET.get("limit", 20) try: limit = int(limit) except ValueError: return HttpResponseBadRequest() for q in q.split(" "): qu = ( Q(ishtaruser__person__name__icontains=q) | Q(ishtaruser__person__surname__icontains=q) | Q(first_name__icontains=q) | Q(last_name__icontains=q) ) query = query & qu users = models.User.objects.filter(query).distinct()[:limit] values = [] for user in users: try: if user and user.ishtaruser: values.append({"id": user.pk, "value": str(user.ishtaruser)}) except models.User.ishtaruser.RelatedObjectDoesNotExist: pass data = json.dumps(values) return HttpResponse(data, content_type="text/plain") def autocomplete_ishtaruser(request): query = get_autocomplete_query(request, "ishtar_common", "person") if query is None: return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term", "") limit = request.GET.get("limit", 20) try: limit = int(limit) except ValueError: return HttpResponseBadRequest() for q in q.split(" "): qu = ( Q(person__name__unaccent__icontains=q) | Q(person__surname__unaccent__icontains=q) | Q(person__raw_name__unaccent__icontains=q) ) query = query & qu users = models.IshtarUser.objects.filter(query).distinct()[:limit] data = json.dumps([{"id": user.pk, "value": str(user)} for user in users]) return HttpResponse(data, content_type="text/plain") def autocomplete_person( request, person_types=None, attached_to=None, is_ishtar_user=None, permissive=False ): query = get_autocomplete_query(request, "ishtar_common", "person") if query is None: return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term") limit = request.GET.get("limit", 20) try: limit = int(limit) except ValueError: return HttpResponseBadRequest() for q in q.split(" "): qu = ( Q(name__unaccent__icontains=q) | Q(surname__unaccent__icontains=q) | Q(email__unaccent__icontains=q) | Q(attached_to__name__unaccent__icontains=q) ) if permissive: qu = qu | Q(raw_name__unaccent__icontains=q) query = query & qu if attached_to: query = query & Q(attached_to__pk__in=attached_to.split("_")) if person_types and str(person_types) != "0": try: typs = [int(tp) for tp in person_types.split("_") if tp] typ = models.PersonType.objects.filter(pk__in=typs).all() query = query & Q(person_types__in=typ) except (ValueError, ObjectDoesNotExist): pass if is_ishtar_user: query = query & Q(ishtaruser__isnull=False) persons = models.Person.objects.filter(query).distinct()[:limit] data = json.dumps( [{"id": person.pk, "value": str(person)} for person in persons if person] ) return HttpResponse(data, content_type="text/plain") def autocomplete_import(request): query = get_autocomplete_query(request, "ishtar_common", "import") if query is None: return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term") limit = request.GET.get("limit", 20) try: limit = int(limit) except ValueError: return HttpResponseBadRequest() for q in q.split(" "): query = query & (Q(name__unaccent__icontains=q) | Q(group__name__unaccent__icontains=q)) items = models.Import.objects.filter(query).distinct()[:limit] data = [{"id": item.pk, "value": item.name} for item in items if item] return HttpResponse(json.dumps(data), content_type="text/plain") def autocomplete_area(request): if not request.GET.get("term"): return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term") q = unicodedata.normalize("NFKD", q).encode("ascii", "ignore").decode() query = Q() for q in q.split(" "): extra = Q(label__icontains=q) query = query & extra limit = 20 areas = models.Area.objects.filter(query).distinct()[:limit] data = json.dumps( [{"id": area.pk, "value": str(area)} for area in areas] ) return HttpResponse(data, content_type="text/plain") def autocomplete_department(request): if not request.GET.get("term"): return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term") q = unicodedata.normalize("NFKD", q).encode("ascii", "ignore").decode() query = Q() for q in q.split(" "): extra = Q(label__icontains=q) | Q(number__istartswith=q) query = query & extra limit = 20 departments = models.Department.objects.filter(query).distinct()[:limit] data = json.dumps( [{"id": department.pk, "value": str(department)} for department in departments] ) return HttpResponse(data, content_type="text/plain") def autocomplete_town(request): if not request.GET.get("term"): return HttpResponse(content_type="text/plain") q = request.GET.get("term") q = unicodedata.normalize("NFKD", q).encode("ascii", "ignore").decode() query = Q() for q in q.split(" "): extra = Q(name__unaccent__icontains=q) if settings.COUNTRY == "fr": extra |= Q(numero_insee__istartswith=q) query &= extra limit = 20 towns = models.Town.objects.filter(query).distinct()[:limit] data = json.dumps([{"id": town.pk, "value": str(town)} for town in towns]) return HttpResponse(data, content_type="text/plain") def autocomplete_advanced_town(request, department_id=None, state_id=None): if not request.GET.get("term"): return HttpResponse(content_type="text/plain") q = request.GET.get("term") q = unicodedata.normalize("NFKD", q).encode("ascii", "ignore").decode() query = Q() for q in q.split(" "): extra = Q(name__icontains=q) if settings.COUNTRY == "fr": extra = extra | Q(numero_insee__istartswith=q) if not department_id: extra = extra | Q(departement__label__istartswith=q) query = query & extra if department_id: query = query & Q(departement__number__iexact=department_id) if state_id: query = query & Q(departement__state__number__iexact=state_id) limit = 20 towns = models.Town.objects.filter(query).distinct()[:limit] result = [] for town in towns: val = town.name if hasattr(town, "numero_insee"): val += " (%s)" % town.numero_insee result.append({"id": town.pk, "value": val}) data = json.dumps(result) return HttpResponse(data, content_type="text/plain") def autocomplete_document(request): query = get_autocomplete_query(request, "ishtar_common", "document") if query is None: return HttpResponse(content_type="text/plain") q = request.GET.get("term") q = unicodedata.normalize("NFKD", q).encode("ascii", "ignore").decode() fields = [ "title__icontains", "reference__icontains", "internal_reference__icontains", "isbn__icontains", "authors__person__cached_label__icontains", "authors_raw__icontains", ] for q in q.split(" "): qu = Q(**{fields[0]: q}) for field in fields[1:]: qu |= Q(**{field: q}) query = query & qu limit = 20 items = models.Document.objects.filter(query).exclude(title="").distinct()[:limit] data = json.dumps([{"id": item.pk, "value": str(item)} for item in items]) return HttpResponse(data, content_type="text/plain") def department_by_state(request, state_id=""): if not state_id: data = [] else: departments = models.Department.objects.filter(state__number=state_id) data = json.dumps( [ { "id": department.pk, "number": department.number, "value": str(department), } for department in departments ] ) return HttpResponse(data, content_type="text/plain") def autocomplete_organization(request, orga_type=None): query = get_autocomplete_query(request, "ishtar_common", "organization") if query is None: return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term") for q in q.split(" "): extra = Q(cached_label__unaccent__icontains=q) query = query & extra if orga_type: try: typs = [int(tp) for tp in orga_type.split("_") if tp] typ = models.OrganizationType.objects.filter(pk__in=typs).all() query = query & Q(organization_type__in=typ) except (ValueError, ObjectDoesNotExist): pass limit = 15 organizations = models.Organization.objects.filter(query).distinct()[:limit] data = json.dumps([{"id": org.pk, "value": str(org)} for org in organizations]) return HttpResponse(data, content_type="text/plain") def autocomplete_author(request): query = get_autocomplete_query(request, "ishtar_common", "author") if query is None: return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term") for q in q.split(" "): extra = ( Q(person__name__icontains=q) | Q(person__surname__icontains=q) | Q(person__email__icontains=q) | Q(author_type__label__icontains=q) ) query = query & extra limit = 15 authors = models.Author.objects.filter(query).distinct()[:limit] data = json.dumps([{"id": author.pk, "value": str(author)} for author in authors]) return HttpResponse(data, content_type="text/plain") def autocomplete_biographical_note(request): query = get_autocomplete_query(request, "ishtar_common", "person") if query is None: return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term", "") limit = request.GET.get("limit", 20) try: limit = int(limit) except ValueError: return HttpResponseBadRequest() for q in q.split(" "): qu = ( Q(last_name__unaccent__icontains=q) | Q(first_name__unaccent__icontains=q) | Q(denomination__unaccent__icontains=q) ) query = query & qu users = models.BiographicalNote.objects.filter(query).distinct()[:limit] data = json.dumps([{"id": user.pk, "value": str(user)} for user in users]) return HttpResponse(data, content_type="text/plain") new_person = new_qa_item(models.Person, forms.PersonForm, page_name=_("New person"), callback=get_person_gdpr_log) modify_person = modify_qa_item(models.Person, forms.PersonForm, callback=get_person_gdpr_log) detail_person = get_short_html_detail(models.Person) new_person_noorga = new_qa_item( models.Person, forms.NoOrgaPersonForm, page_name=_("New person"), callback=get_person_gdpr_log ) new_organization = new_qa_item( models.Organization, forms.OrganizationForm, page_name=_("New organization") ) show_organization = show_item(models.Organization, "organization") get_organization = get_item(models.Organization, "get_organization", "organization") modify_organization = modify_qa_item(models.Organization, forms.OrganizationForm) detail_organization = get_short_html_detail(models.Organization) new_author = new_qa_item(models.Author, forms.AuthorForm, page_name=_("New author")) show_person = show_item(models.Person, "person", callback=get_person_gdpr_log) get_person = get_item(models.Person, "get_person", "person", callback=get_person_gdpr_log) show_ishtaruser = show_item(models.IshtarUser, "ishtaruser", callback=get_ishtaruser_gdpr_log) show_biographical_note = show_item(models.BiographicalNote, "biographicalnote") new_biographical_note = new_qa_item( models.BiographicalNote, forms.BiographicalNoteForm, page_name=_("New biographical note") ) get_person_for_account = get_item( models.Person, "get_person", "person", own_table_cols=models.Person.TABLE_COLS_ACCOUNT, ) get_ishtaruser = get_item(models.IshtarUser, "get_ishtaruser", "ishtaruser") show_town = show_item(models.Town, "town") show_area = show_item(models.Area, "area") show_import = show_item(models.Import, "import") show_import_group = show_item(models.ImportGroup, "importgroup") ACTION_MODEL_DICT = { 'import': models.Import, 'account': models.IshtarUser, 'document': models.Document, 'person': models.Person, 'orga': models.Organization, 'organization': models.Organization, 'operation': apps.get_model("archaeological_operations", "Operation"), 'administrativact': apps.get_model( "archaeological_operations", "AdministrativeAct"), 'file': apps.get_model("archaeological_files", "File"), 'site': apps.get_model("archaeological_operations", "ArchaeologicalSite"), 'record': apps.get_model("archaeological_context_records", "ContextRecord"), 'find': apps.get_model("archaeological_finds", "Find"), 'treatment': apps.get_model("archaeological_finds", "Treatment"), 'treatmentfle': apps.get_model("archaeological_finds", "TreatmentFile"), 'exhibition': apps.get_model("archaeological_finds", "Exhibition"), 'container': apps.get_model("archaeological_warehouse", "Container"), 'warehouse': apps.get_model("archaeological_warehouse", "Warehouse"), } def action(request, action_slug, obj_id=None, *args, **kwargs): """ Action management """ if not check_permission(request, action_slug): not_permitted_msg = ugettext("Operation not permitted.") if obj_id: model_name = action.split('_')[0].split("-")[0].split("/")[0] if model_name not in ACTION_MODEL_DICT: print(f"ishtar_common/views - action: {model_name} not in ACTION_MODEL_DICT") return HttpResponse(not_permitted_msg) try: obj = ACTION_MODEL_DICT[model_name].objects.get(pk=obj_id) except ACTION_MODEL_DICT[model_name].DoesNotExist: return HttpResponse(not_permitted_msg) if not check_permission(request, action_slug, obj): return HttpResponse(not_permitted_msg) else: return HttpResponse(not_permitted_msg) request.session["CURRENT_ACTION"] = action_slug dct = {} globals_dct = globals() if action_slug in globals_dct: return globals_dct[action_slug](request, dct, obj_id, *args, **kwargs) return render(request, "index.html", dct) def reset_wizards(request): # dynamically execute each reset_wizards of each ishtar app for app in settings.INSTALLED_APPS: if app == "ishtar_common": # no need for infinite recursion continue try: module = __import__(app) except ImportError: continue if hasattr(module, "views") and hasattr(module.views, "reset_wizards"): module.views.reset_wizards(request) return redirect(reverse("start")) ITEM_PER_PAGE = 20 def merge_action(model, form, key, name_key="name", callback=None): def merge(request, page=1): current_url = key + "_merge" if not page: page = 1 page = int(page) FormSet = modelformset_factory( model.merge_candidate.through, form=form, formset=forms.MergeFormSet, extra=0, ) q = model.merge_candidate.through.objects count = q.count() max_page = count // ITEM_PER_PAGE if count % ITEM_PER_PAGE != 0: max_page += 1 context = { "current_url": current_url, "current_page": page, "max_page": max_page, } if page < max_page: context["next_page"] = page + 1 if page > 1: context["previous_page"] = page - 1 item_nb = (page - 1) * ITEM_PER_PAGE item_nb_1 = item_nb + ITEM_PER_PAGE from_key = "from_" + key to_key = "to_" + key queryset = q.all().order_by(from_key + "__" + name_key)[item_nb:item_nb_1] FormSet.from_key = from_key FormSet.to_key = to_key if request.method == "POST": context["formset"] = FormSet(request.POST, queryset=queryset) if context["formset"].is_valid(): context["formset"].merge(callback=callback, request=request) return redirect(reverse(current_url, kwargs={"page": page})) else: context["formset"] = FormSet(queryset=queryset) return render(request, "ishtar/merge_" + key + ".html", context) return merge def regenerate_external_id(request): if not request.user.ishtaruser.is_ishtaradmin: raise Http404() model = None for key in request.GET: model = get_model_by_slug(key) if model: break if not model: raise Http404() try: item = model.objects.get(pk=request.GET[model.SLUG]) except model.DoesNotExist: raise Http404() item.regenerate_external_id() return HttpResponseRedirect(reverse("success")) def regenerate_permissions(request, user_id): if not request.user.ishtaruser.is_ishtaradmin: raise Http404() try: ishtaruser = models.IshtarUser.objects.get(pk=user_id) except models.IshtarUser.DoesNotExist: raise Http404() ishtaruser.generate_permission() return HttpResponseRedirect(reverse("success")) person_merge = merge_action(models.Person, forms.MergePersonForm, "person", callback=get_person_gdpr_log) organization_merge = merge_action( models.Organization, forms.MergeOrganizationForm, "organization" ) class IshtarMixin: page_name = "" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["page_name"] = self.page_name return context class JSONResponseMixin: """ Render a JSON response. """ def render_to_response(self, context, **response_kwargs): return JsonResponse(self.get_data(context), **response_kwargs) def get_data(self, context): return context class LoginRequiredMixin(object): @method_decorator(login_required) def dispatch(self, request, *args, **kwargs): return super(LoginRequiredMixin, self).dispatch(request, *args, **kwargs) class AdminLoginRequiredMixin(LoginRequiredMixin): def dispatch(self, request, *args, **kwargs): if not request.user.is_staff: return redirect(reverse("start")) return super(AdminLoginRequiredMixin, self).dispatch(request, *args, **kwargs) class ChangelogView(IshtarMixin, LoginRequiredMixin, TemplateView): template_name = "ishtar/changelog.html" page_name = _("Changelog") current_url = "changelog" def update_read_version(self): if not self.request.user or not hasattr(self.request.user, "ishtaruser") \ or not self.request.user.ishtaruser: return cache_key = f"{settings.PROJECT_SLUG}-news-version" current_version = cache.get(cache_key) if not current_version: return ishtar_user = models.IshtarUser.objects.get(pk=self.request.user.ishtaruser.pk) if not ishtar_user.latest_news_version \ or ishtar_user.latest_news_version != current_version: ishtar_user.latest_news_version = current_version ishtar_user.save() def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) changelog_dir = os.path.join( settings.SHARE_BASE_PATH, "changelog", settings.LANGUAGE_CODE.split('-')[0] ) if not os.path.exists(changelog_dir): raise Http404() page_number = int(kwargs.get("page", 0) or 0) or 1 if page_number == 1: self.update_read_version() list_dir = list([fle for fle in os.listdir(changelog_dir) if fle.startswith("changelog_") and fle.endswith(".md")]) current_file = "" for idx, fle in enumerate(reversed(sorted(list_dir))): if (idx + 1) == page_number: current_file = fle break if not current_file: raise Http404() changelog_file = os.path.join(changelog_dir, current_file) VERSION_RE = re.compile(r"

(.*) - (\d{4})-(\d{2})-(\d{2})

") with open(changelog_file, "r", encoding="utf-8") as changelog: changelog_base = markdown(changelog.read()) changelog_full = "" for line in changelog_base.split("\n"): line = line.strip() if not line: continue #

v4.0.42 - 2023-01-25

m = VERSION_RE.match(line) if not m: changelog_full += line continue version, year, month, day = m.groups() try: d = datetime.date(int(year), int(month), int(day)) except ValueError: changelog_full += line continue changelog_full += f"
" changelog_full += f"{human_date(d)}" changelog_full += f"{version}
" context["changelog"] = changelog_full if page_number > 1: context["next"] = page_number - 1 if len(list_dir) > page_number: context["previous"] = page_number + 1 return context class ProfileEdit(LoginRequiredMixin, FormView): template_name = "ishtar/forms/profile.html" form_class = forms.ProfilePersonForm def dispatch(self, request, *args, **kwargs): if kwargs.get("pk"): try: profile = models.UserProfile.objects.get( pk=kwargs["pk"], person__ishtaruser__user_ptr=request.user ) except models.UserProfile.DoesNotExist: # cannot edit a profile that is not yours... return redirect(reverse("index")) current_changed = False # the profile edited became the current profile if not profile.current: current_changed = True profile.current = True # force post-save in case of many current profile profile.save() if current_changed: clean_session_cache(request.session) if "EXTERNAL_SOURCES" in request.session: del request.session["EXTERNAL_SOURCES"] return super(ProfileEdit, self).dispatch(request, *args, **kwargs) def get_success_url(self): return reverse("profile") def get_form_kwargs(self): kwargs = super(ProfileEdit, self).get_form_kwargs() kwargs["user"] = self.request.user return kwargs def get_context_data(self, **kwargs): data = super(ProfileEdit, self).get_context_data(**kwargs) data["page_name"] = _("Current profile") return data def form_valid(self, form): form.save(self.request.session) return HttpResponseRedirect(self.get_success_url()) class DynamicModelView: def get_model(self, kwargs): app = kwargs.get("app").replace("-", "_") model_name = "".join( [part.capitalize() for part in kwargs.get("model_name").split("-")] ) try: return apps.get_model(app, model_name) except LookupError: raise Http404() class QRCodeView(DynamicModelView, IshtarMixin, LoginRequiredMixin, View): def get(self, request, *args, **kwargs): model = self.get_model(kwargs) try: item = model.objects.get(pk=kwargs.get("pk")) except model.DoesNotExist: raise Http404() if not hasattr(item, "qrcode"): raise Http404() if not item.qrcode or not item.qrcode.name: item.generate_qrcode(request=self.request) if not item.qrcode or not item.qrcode.name: # generation has failed raise Http404() with open(settings.MEDIA_ROOT + os.sep + item.qrcode.name, "rb") as f: return HttpResponse(f.read(), content_type="image/png") class GenerateView(IshtarMixin, LoginRequiredMixin, View): def get_template(self, template_slug): try: return models.DocumentTemplate.objects.get( slug=template_slug, available=True, for_labels=False ) except models.DocumentTemplate.DoesNotExist: raise Http404() def publish(self, tpl, objects): return tpl.publish(objects[0]) def get_items(self, request, model): item_pk = self.kwargs.get("item_pk") try: object = model.objects.get(pk=item_pk) if not object.can_view(request): raise Http404() except model.DoesNotExist: raise Http404() return [object] def get(self, request, *args, **kwargs): template_slug = kwargs.get("template_slug") tpl = self.get_template(template_slug) app, __, model_name = tpl.associated_model.klass.split(".") model = apps.get_model(app, model_name) objects = self.get_items(request, model) if not objects: return HttpResponse(content_type="text/plain") document = self.publish(tpl, objects) if not document: return HttpResponse(content_type="text/plain") base_extension = tpl.template.name.split(".")[-1].lower() extension = tpl.export_format if not extension: extension = base_extension if not settings.USE_LIBREOFFICE and extension not in ("ods", "odt"): return HttpResponseBadRequest("Invalid format type - need LibreOffice daemon") # bad configuration content_type = models.EXPORT_FORMATS_CONTENT_TYPE.get( extension, "application/vnd.oasis.opendocument.text" ) if tpl.export_format and convert_document: document = convert_document(document, tpl.export_format) with open(document, "rb") as f: response = HttpResponse(f.read(), content_type=content_type) response["Content-Disposition"] = "attachment; filename={}".format( document.split(os.sep)[-1] ) return response # TODO def generate_label_view(): pass class GenerateLabelView(GenerateView): def get_template(self, template_slug): try: return models.DocumentTemplate.objects.get( slug=template_slug, available=True, for_labels=True ) except models.DocumentTemplate.DoesNotExist: raise Http404() def publish(self, tpl, objects): return tpl.publish_labels(objects) def get_items(self, request, model): # rights are managed by get_item get_list = get_item(model, None, model.SLUG, own_table_cols=["id"])( request, no_link=True, no_limit=True ) item_list = json.loads(get_list.content.decode("utf-8"))["rows"] try: objects = [model.objects.get(pk=int(dct["id"])) for dct in item_list] except model.DoesNotExist: raise Http404() return objects """ # TODO v4: suppression class GlobalVarEdit(IshtarMixin, AdminLoginRequiredMixin, ModelFormSetView): template_name = "ishtar/formset.html" model = models.GlobalVar factory_kwargs = {"extra": 1, "can_delete": True} page_name = _("Global variables") fields = ["slug", "value", "description"] """ class BaseImportView(IshtarMixin, LoginRequiredMixin): template_name = "ishtar/form.html" model = models.Import form_class = forms.NewImportForm def get_success_url(self): if self.object.has_pre_import_form: return reverse("import_pre_import_form", args=[self.object.pk]) return reverse("current_imports") def get_form_kwargs(self): kwargs = super().get_form_kwargs() kwargs["user"] = self.request.user return kwargs def form_valid(self, form): user = models.IshtarUser.objects.get(pk=self.request.user.pk) self.object = form.save(user=user) return HttpResponseRedirect(self.get_success_url()) class NewImportView(BaseImportView, CreateView): page_name = _("Import: create (table)") class ImportPermissionMixin: permission_full = "ishtar_common.change_import" permission_own = "ishtar_common.change_own_import" def dispatch(self, request, *args, **kwargs): import_pk = self.kwargs["pk"] user = request.user if not user or not user.ishtaruser: return redirect("/") ishtaruser = user.ishtaruser model = models.ImportGroup if self.kwargs.get("group", None) else models.Import q = model.query_can_access(user, perm=self.permission_full).filter(pk=import_pk) if not ishtaruser.has_permission("ishtaradmin") and \ not ishtaruser.has_permission(self.permission_full): if not ishtaruser.has_permission(self.permission_own): return redirect("/") q = q.filter(Q(importer_type__users__pk=user.ishtaruser.pk)) if not q.count(): return redirect("/") returned = super().dispatch(request, *args, **kwargs) return returned class EditImportView(ImportPermissionMixin, BaseImportView, UpdateView): page_name = _("Import: edit (table)") class NewImportGISView(NewImportView): template_name = "ishtar/form.html" model = models.Import form_class = forms.NewImportGISForm page_name = _("Import: create (GIS)") class NewImportGroupView(NewImportView): template_name = "ishtar/form.html" model = models.Import form_class = forms.NewImportGroupForm page_name = _("Import: create (group)") class ImportPreFormView(IshtarMixin, LoginRequiredMixin, FormView): template_name = "ishtar/form.html" form_class = forms.PreImportForm page_name = _("Import: pre-form") def get_success_url(self): return reverse("current_imports") def dispatch(self, request, *args, **kwargs): self.user = models.IshtarUser.objects.get(pk=self.request.user.pk) try: self.import_item = models.Import.objects.get(pk=self.kwargs["import_id"]) except models.Import.DoesNotExist: raise Http404() if not self.import_item.is_available(self.user): raise Http404() if not self.import_item.importer_type.columns.filter(col_number__lte=0).count(): # no pre-form fields raise Http404() return super().dispatch(request, *args, **kwargs) def get_form_kwargs(self): kwargs = super().get_form_kwargs() kwargs["import_item"] = self.import_item return kwargs def get_context_data(self, **kwargs): data = super().get_context_data(**kwargs) data["page_name"] = self.page_name return data def form_valid(self, form): self.object = form.save() return HttpResponseRedirect(self.get_success_url()) def get_permissions_for_actions( user, imprt, owns, can_edit_all, can_delete_all, can_edit_own, can_delete_own): can_edit, can_delete = False, False is_own = None if can_edit_own or can_delete_own: # need to check owner if imprt.importer_type_id not in owns: # "is_own" only query once by importer type owns[imprt.importer_type.pk] = imprt.importer_type.is_own(user.ishtaruser) is_own = owns[imprt.importer_type_id] if can_edit_all or (can_edit_own and is_own): can_edit = True if can_delete_all or (can_delete_own and is_own): can_delete = True return can_edit, can_delete class ImportListView(IshtarMixin, LoginRequiredMixin, ListView): template_name = "ishtar/import_list.html" model = models.Import page_name = _("Current imports") current_url = "current_imports" pagination = False page_step = 20 def _queryset_filter(self, query): return query.exclude(state="AC") def get_queryset(self): user = self.request.user if not user.pk or not user.ishtaruser: raise Http404() q1 = self._queryset_filter( self.model.query_can_access( user, ["ishtar_common.view_import", "ishtar_common.change_import"] ) ) q1 = q1.filter(group__isnull=True).order_by("-end_date", "-creation_date", "-pk") q2 = self._queryset_filter( models.ImportGroup.query_can_access( user, ["ishtar_common.view_import", "ishtar_common.change_import"] ) ) q2 = q2.order_by("-end_date", "-creation_date", "-pk") values = list(reversed( sorted( list(q1) + list(q2), key=lambda x: (x.end_date or x.creation_date) ) )) can_edit_all, can_delete_all, can_edit_own, can_delete_own = \ models.Import.get_permissions_for_actions(user) imports = [] owns = {} for imprt in values: can_edit, can_delete = get_permissions_for_actions( user, imprt, owns, can_edit_all, can_delete_all, can_edit_own, can_delete_own ) imprt.action_list = imprt.get_actions( can_edit=can_edit, can_delete=can_delete ) imports.append(imprt) self.imports_len = len(imports) self.current_page = 0 if self.imports_len > self.page_step and self.pagination: self.current_page = int(self.request.GET.get("page", 1)) self.page_number = int((self.imports_len - 1) / self.page_step) + 1 min_page = (self.current_page - 1) * self.page_step max_page = (self.current_page * self.page_step) imports = imports[min_page:max_page] return imports def post(self, request, *args, **kwargs): can_edit_all, can_delete_all, can_edit_own, can_delete_own = \ models.Import.get_permissions_for_actions(request.user) owns = {} for field in request.POST: if not field.startswith("import-action-") or not request.POST[field]: continue model = models.Import is_group = field.startswith("import-action-group-") if is_group: model = models.ImportGroup # prevent forged forms try: imprt = model.objects.get(pk=int(field.split("-")[-1])) except (models.Import.DoesNotExist, ValueError): continue can_edit, can_delete = get_permissions_for_actions( request.user, imprt, owns, can_edit_all, can_delete_all, can_edit_own, can_delete_own ) action = request.POST[field] if can_delete and action == "D": url = "import_group_delete" if is_group else "import_delete" return HttpResponseRedirect( reverse(url, kwargs={"pk": imprt.pk}) ) elif can_edit and action == "ED": url = "edit_import_group" if is_group else "edit_import" return HttpResponseRedirect( reverse(url, kwargs={"pk": imprt.pk}) ) elif can_edit and action == "A": imprt.initialize( user=self.request.user.ishtaruser, session_key=request.session.session_key, ) elif can_edit and action == "I": if settings.USE_BACKGROUND_TASK: imprt.delayed_importation(request, request.session.session_key) else: try: imprt.importation() except ImporterError as e: imprt.state = "FE" imprt.end_date = datetime.datetime.now() imprt.save() put_session_message( request.session.session_key, f"{imprt} - {e}", "warning", ) elif can_edit and action == "CH": if settings.USE_BACKGROUND_TASK: imprt.delayed_check_modified(request.session.session_key) else: imprt.check_modified() elif can_edit and action == "IS": if imprt.current_line is None: imprt.current_line = imprt.skip_lines imprt.save() return HttpResponseRedirect( reverse( "import_step_by_step", args=[imprt.pk, imprt.current_line + 1] ) ) elif can_edit and action == "AC": imprt.archive() elif can_edit and action in ("F", "FE"): imprt.unarchive(action) return HttpResponseRedirect(reverse(self.current_url)) def get_context_data(self, **kwargs): dct = super().get_context_data(**kwargs) if self.imports_len > self.page_step and self.pagination: dct["current_page"] = self.current_page dct["page_range"] = (n + 1 for n in range(self.page_number)) add_import_perm = self.request.user.ishtaruser.has_permission( "ishtar_common.add_import" ) import_type_table = models.ImporterType.objects.filter( available=True, is_import=True, type='tab' ) import_type_gis = models.ImporterType.objects.filter( available=True, is_import=True, type='gis' ) import_type_group = models.ImporterGroup.objects.filter(available=True) ishtaruser = self.request.user.ishtaruser if not add_import_perm and ishtaruser.has_permission( "ishtar_common.add_own_import"): import_type_table = import_type_table.filter( users__pk=self.request.user.ishtaruser.pk ) import_type_gis = import_type_gis.filter( users__pk=self.request.user.ishtaruser.pk ) import_type_group = import_type_group.filter( users__pk=self.request.user.ishtaruser.pk ) add_import_perm = True has_import_table, has_import_gis, has_import_group = False, False, False if add_import_perm: if import_type_table.count(): has_import_table = True if import_type_gis.count(): has_import_gis = True if import_type_group.count(): has_import_group = True dct.update({ "has_import_table": has_import_table, "has_import_gis": has_import_gis, "has_import_group": has_import_group, "can_create_import": has_import_table or has_import_gis or has_import_group, }) return dct class ImportStepByStepView(IshtarMixin, LoginRequiredMixin, TemplateView): template_name = "ishtar/import_step_by_step.html" page_name = _("Import step by step") current_url = "import_step_by_step" def get_import(self): try: self.imprt_obj = models.Import.objects.get(pk=int(self.kwargs["pk"])) except (models.Import.DoesNotExist, ValueError): raise Http404 if not self.request.user.is_superuser: # user can only edit his own imports user = models.IshtarUser.objects.get(pk=self.request.user.pk) if self.imprt_obj.user != user: raise Http404 if not hasattr(self, "current_line_number"): self.current_line_number = int(self.kwargs["line_number"]) - 1 def update_csv(self, request): prefix = "col-" submited_line = [ (int(k[len(prefix) :]), request.POST[k]) for k in request.POST if k.startswith(prefix) ] updated_line = [value for line, value in sorted(submited_line)] filename = self.imprt_obj.imported_file.path with open(filename, "r", encoding=self.imprt_obj.encoding) as f: reader = csv.reader(f) lines = [] for idx, line in enumerate(reader): if idx == self.current_line_number: line = updated_line line = [ v.encode(self.imprt_obj.encoding, errors="replace") for v in line ] lines.append(line) with open(filename, "w") as f: writer = csv.writer(f, **CSV_OPTIONS) writer.writerows(lines) def import_line(self, request, *args, **kwargs): try: self.imprt, data = self.imprt_obj.importation( line_to_process=self.current_line_number, return_importer_and_data=True ) except IOError as e: self.errors = [str(e)] return super(ImportStepByStepView, self).get(request, *args, **kwargs) if self.imprt_obj.get_number_of_lines() >= self.current_line_number: self.current_line_number += 1 else: self.current_line_number = None self.imprt_obj.current_line = self.current_line_number self.imprt_obj.save() return self.current_line_number def post(self, request, *args, **kwargs): if not request.POST or request.POST.get("valid", None) not in ( "change-csv", "import", "change-page", ): return self.get(request, *args, **kwargs) self.get_import() if request.POST.get("valid") == "change-page": return HttpResponseRedirect( reverse( "import_step_by_step", args=[self.imprt_obj.pk, request.POST.get("line-to-go", None)], ) ) if request.POST.get("valid") == "change-csv": self.update_csv(request) return self.get(request, *args, **kwargs) if not self.import_line(request, *args, **kwargs): return HttpResponseRedirect(reverse("current_imports")) else: return HttpResponseRedirect( reverse( "import_step_by_step", args=[self.imprt_obj.pk, self.current_line_number + 1], ) ) def get(self, request, *args, **kwargs): self.get_import() self.imprt = None self.errors, self.new_data = None, None try: self.imprt, data = self.imprt_obj.importation( simulate=True, line_to_process=self.current_line_number, return_importer_and_data=True, ) except IOError as e: self.errors = [None, None, str(e)] return super(ImportStepByStepView, self).get(request, *args, **kwargs) if not data or not data[0]: self.errors = self.imprt.errors if not self.errors: self.errors = [("", "", _("No data provided"))] else: self.new_data = data[:] return super(ImportStepByStepView, self).get(request, *args, **kwargs) def get_pagination(self, dct): pagination_step = 5 only_modified = not self.kwargs.get("all_pages", False) dct["all"] = not only_modified dct["import_url"] = ( "import_step_by_step" if only_modified else "import_step_by_step_all" ) line_nb = self.imprt_obj.get_number_of_lines() total_line_nb = self.imprt_obj.skip_lines + line_nb delta = 0 already_imported = [] if self.imprt_obj.imported_line_numbers: already_imported = self.imprt_obj.imported_line_numbers.split(",") changes = [] if self.imprt_obj.changed_line_numbers: changes = self.imprt_obj.changed_line_numbers.split(",") dct["page_is_last"] = self.current_line_number == line_nb # label, number, enabled, is_imported, has_changes dct["page_numbers"] = [] # first pass for the delta current = 0 for idx in range(self.imprt_obj.skip_lines, total_line_nb): imported = str(idx) in already_imported changed = str(idx) in changes if only_modified and (imported or not changed): continue current += 1 if idx == self.current_line_number - 1: delta = int(current / pagination_step) current, has_next, previous = 0, False, None for idx in range(self.imprt_obj.skip_lines, total_line_nb): if current >= ((delta + 1) * pagination_step): has_next = idx break imported = str(idx) in already_imported changed = str(idx) in changes if only_modified and (imported or not changed): continue current += 1 if current <= (delta * pagination_step): previous = idx continue nb = idx + 1 dct["page_numbers"].append((nb, nb, True, imported, changed)) if previous: dct["page_numbers"].insert( 0, (_("Previous"), previous + 1, True, False, True) ) else: dct["page_numbers"].insert( 0, (_("Previous"), self.imprt_obj.skip_lines, False, False, True) ) if has_next: dct["page_numbers"].append((_("Next"), has_next + 1, True, False, True)) else: dct["page_numbers"].append((_("Next"), total_line_nb, False, False, True)) def get_context_data(self, **kwargs): dct = super(ImportStepByStepView, self).get_context_data(**kwargs) dct["import"] = self.imprt_obj dct["line_number_displayed"] = self.current_line_number + 1 dct["line_is_imported"] = self.imprt_obj.line_is_imported( self.current_line_number ) self.get_pagination(dct) dct["errors"] = self.errors if self.errors: if self.imprt.current_csv_line: headers = [ f.label if f else _("Not imported") for f in self.imprt.get_formaters() ] dct["values"] = zip( range(1, len(headers) + 1), headers, self.imprt.current_csv_line ) return dct headers, self.path_to_column, interpreted_values = [], {}, [] for idx, formater in enumerate(self.imprt.get_formaters()): if not formater: headers.append(_("Not imported")) interpreted_values.append("–") continue lbl = formater.label if formater.comment: lbl += ' ' ' {} {} ' "".format(col, _("Col. "), col) ) value_dct[label] = dct[k] return value_dct def list_to_html(self, lst): if not lst: return _("* empty *") return ( "
  • " + "
  • ".join( [self.get_value(item) for item in lst] ) + "
" ) def get_value(self, item): if hasattr(item, "SHOW_URL"): return "{}{}".format(str(item), simple_link_to_window(item)) if hasattr(item, "explicit_label"): return item.explicit_label if item in (None, [], [None]): return _("* empty *") if isinstance(item, list): return self.list_to_html(item) return str(item) class ImportListTableView(ImportListView): template_name = "ishtar/import_table.html" current_url = "current_imports_table" def get_context_data(self, **kwargs): dct = super(ImportListTableView, self).get_context_data(**kwargs) dct["AJAX"] = True dct["MESSAGES"] = [] request = self.request if "messages" in request.session and request.session["messages"]: for message, message_type in request.session["messages"]: dct["MESSAGES"].append((message, message_type)) request.session["messages"] = [] if ( "current_import_id" in request.session and request.session["current_import_id"] ): dct["refreshed_pks"] = request.session.pop("current_import_id") return dct class ImportOldListView(ImportListView): page_name = _("Old imports") current_url = "old_imports" pagination = True def _queryset_filter(self, query): return query.filter(state="AC") def get_context_data(self, **kwargs): data = super().get_context_data(**kwargs) data["ARCHIVE_PAGE"] = True return data def import_get_status(request, current_right=None): response = {"group": [], "import": []} import_ids = request.POST.getlist("items[]", []) for import_id in import_ids: model = models.Import key = "import" if import_id.startswith("group-"): model = models.ImportGroup key = "group" idx = import_id.split("-")[-1] q = model.objects.filter(pk=idx) if not q.count(): continue item = q.all()[0] item_dct = { "id": item.id, "full_id": ("group-" if key == "group" else "") + str(item.id), "state": item.state, "status": str(item.status), "has_error": item.has_error } if key == "import": if item.current_line and not item.number_of_line: item.get_number_of_lines() item_dct.update({ "current_line": item.current_line, "number_of_line": item.number_of_line, "progress_percent": item.progress_percent, }) can_edit_all, can_delete_all, can_edit_own, can_delete_own = \ models.Import.get_permissions_for_actions(request.user) can_edit, can_delete = get_permissions_for_actions( request.user, item, {}, can_edit_all, can_delete_all, can_edit_own, can_delete_own ) item_dct["actions"] = [ (key, str(lbl)) for key, lbl in item.get_actions(can_edit=can_edit, can_delete=can_delete) ] response[key].append(item_dct) data = json.dumps(response) return HttpResponse(data, content_type="application/json") class ImportMatchView(ImportPermissionMixin, IshtarMixin, LoginRequiredMixin, ModelFormSetView): template_name = "ishtar/formset_import_match.html" model = models.TargetKey page_name = _("Link unmatched items") factory_kwargs = { "extra": 0, } form_class = forms.TargetKeyForm formset_class = forms.TargetKeyFormset max_fields = 250 permission_full = "change_import" permission_own = "change_own_import" def get_formset_kwargs(self): kwargs = super().get_formset_kwargs() kwargs["user"] = self.request.user return kwargs def full_queryset(self): return self.model.objects.filter( is_set=False, associated_import=self.kwargs["pk"] ) def get_queryset(self): return self.full_queryset()[: self.max_fields] def get_context_data(self, **kwargs): data = super().get_context_data(**kwargs) total = self.full_queryset().count() if total > self.max_fields: data["MAX_FIELDS_REACHED"] = self.max_fields data["TOTAL"] = total return data def get_success_url(self): return reverse("import_link_unmatched", args=[self.kwargs["pk"]]) class ImportDeleteView(ImportPermissionMixin, IshtarMixin, LoginRequiredMixin, DeleteView): template_name = "ishtar/import_delete.html" model = models.Import page_name = _("Delete import") permission_full = "delete_import" permission_own = "delete_own_import" def get_success_url(self): return reverse("current_imports") class ImportGroupDeleteView(ImportPermissionMixin, IshtarMixin, LoginRequiredMixin, DeleteView): template_name = "ishtar/import_delete.html" model = models.ImportGroup page_name = _("Delete import") permission_full = "delete_import" permission_own = "delete_own_import" def dispatch(self, request, *args, **kwargs): self.kwargs['group'] = True return super().dispatch(request, *args, **kwargs) def get_success_url(self): return reverse("current_imports") class ImportCSVView(ImportPermissionMixin, IshtarMixin, LoginRequiredMixin, TemplateView): template_name = "ishtar/blocks/view_import_csv.html" ATTRIBUTES = { "source": "get_imported_values", "error": "error_file", "result": "result_file", "match": "match_file" } TITLES = { "source": ("fa fa-file-text-o", _("Source")), "error": ("text-danger fa fa-exclamation-triangle", _("Error")), "result": ("fa fa-th", _("Result")), "match": ("fa fa-arrows-h", _("Match")), } permission_full = "ishtar_common.view_import" permission_own = "ishtar_common.view_own_import" def get(self, request, *args, **kwargs): user = self.request.user if not user.pk: raise Http404() self.is_group = kwargs.get("group", None) model = models.ImportGroup if self.is_group else models.Import q = model.query_can_access( self.request.user, perm=self.permission_full ).filter(pk=kwargs.get("pk", -1)) if not q.count(): raise Http404() self.import_item = q.all()[0] attribute = self.ATTRIBUTES[kwargs["target"]] self.csv_file = getattr(self.import_item, attribute) if callable(self.csv_file): self.csv_file = self.csv_file() if not self.csv_file: raise Http404() return super().get(request, *args, **kwargs) def get_context_data(self, **kwargs): data = super().get_context_data(**kwargs) encoding = "utf-8" if self.kwargs["target"] == "source" and self.import_item.encoding: encoding = self.import_item.encoding data["icon"], data["target"] = self.TITLES[self.kwargs["target"]] data["title"] = str(self.import_item) line_errors = [] has_line_errors = False if self.kwargs["target"] == "error": has_line_errors = True q = models.ImportLineError.objects.filter(import_item=self.import_item) if not q.count(): self.import_item.parse_error_file() line_errors = models.ImportLineError.objects.filter( import_item=self.import_item).values_list("pk", "ignored") data["has_line_errors"] = has_line_errors data["content"] = [] header = [] delimiter = "," if self.kwargs["target"] == "source": delimiter = self.import_item.csv_sep if not self.is_group: q = models.ImporterColumn.objects.filter(importer_type=self.import_item.importer_type).order_by("col_number") cols = list(q.values_list("col_number", "label")) col_dict = dict(cols) if cols: for idx in range(cols[-1][0]): if (idx + 1) in col_dict: header.append(col_dict[idx + 1]) else: header.append("-") data["header"] = header with open(self.csv_file.path, "r", encoding=encoding) as f: reader = csv.reader(f, delimiter=delimiter) for idx, line in enumerate(reader): if self.kwargs["target"] == "source": if idx < self.import_item.skip_lines: if not header: data["header"] = line continue elif not idx: data["header"] = line continue if line_errors and len(line_errors) >= idx: line = [line_errors[idx - 1]] + line data["content"].append(line) data["window_id"] = "csv-view-" + (self.kwargs.get("group", "") or "") + str(self.import_item.pk) data["import_id"] = self.import_item.pk return data def line_error(request, line_id, current_right=None): """ Set or unset ignored state of a csv error file """ user = request.user if not user.pk: raise Http404() q = models.ImportLineError.objects.filter(pk=line_id) if not q.count(): return line = q.all()[0] q = models.Import.query_can_access( request.user, perm="ishtar_common.change_import" ).filter(pk=line.import_item_id) if not q.count(): raise Http404() line.ignored = not line.ignored line.save() return HttpResponse(content_type="text/plain") class PersonCreate(LoginRequiredMixin, CreateView): model = models.Person form_class = forms.BasePersonForm template_name = "ishtar/person_form.html" def form_valid(self, form): returned = super().form_valid(form) get_person_gdpr_log("new_item", self.request, None, self.model.objects.filter(pk=self.object.pk)) return returned def get_success_url(self): return reverse("person_edit", args=[self.object.pk]) class PersonEdit(LoginRequiredMixin, UpdateView): model = models.Person form_class = forms.BasePersonForm template_name = "ishtar/person_form.html" def form_valid(self, form): returned = super().form_valid(form) get_person_gdpr_log("modify_item", self.request, None, self.model.objects.filter(pk=self.object.pk)) return returned def get_success_url(self): return reverse("person_edit", args=[self.object.pk]) class ManualMergeMixin(object): def form_valid(self, form): self.items = form.get_items() return super(ManualMergeMixin, self).form_valid(form) def get_success_url(self): return reverse( self.redir_url, args=["_".join([str(item.pk) for item in self.items])] ) class PersonManualMerge(ManualMergeMixin, IshtarMixin, LoginRequiredMixin, FormView): form_class = forms.PersonMergeFormSelection template_name = "ishtar/form.html" page_name = _("Merge persons") current_url = "person-manual-merge" redir_url = "person_manual_merge_items" class ManualMergeItemsMixin: item_type = None def get_form_kwargs(self): kwargs = super(ManualMergeItemsMixin, self).get_form_kwargs() kwargs["items"] = self.kwargs["pks"].split("_") return kwargs def form_valid(self, form): callback = None if self.item_type: callback = get_person_gdpr_log self.item = form.merge(callback=callback, request=self.request) return super(ManualMergeItemsMixin, self).form_valid(form) def get_success_url(self): return reverse("display-item", args=[self.item_type, self.item.pk]) class PersonManualMergeItems( ManualMergeItemsMixin, IshtarMixin, LoginRequiredMixin, FormView ): form_class = forms.PersonMergeIntoForm template_name = "ishtar/form.html" page_name = _("Select the main person") current_url = "person-manual-merge-items" item_type = "person" class OrgaManualMerge(ManualMergeMixin, IshtarMixin, LoginRequiredMixin, FormView): form_class = forms.OrgaMergeFormSelection template_name = "ishtar/form.html" page_name = _("Merge organization") current_url = "orga-manual-merge" redir_url = "orga_manual_merge_items" class OrgaManualMergeItems( ManualMergeItemsMixin, IshtarMixin, LoginRequiredMixin, FormView ): form_class = forms.OrgaMergeIntoForm template_name = "ishtar/form.html" page_name = _("Select the main organization") current_url = "orga-manual-merge-items" item_type = "organization" class OrganizationCreate(LoginRequiredMixin, CreateView): model = models.Organization form_class = forms.BaseOrganizationForm template_name = "ishtar/organization_form.html" form_prefix = "orga" def get_form_kwargs(self): kwargs = super(OrganizationCreate, self).get_form_kwargs() if hasattr(self.form_class, "form_prefix"): kwargs.update({"prefix": self.form_class.form_prefix}) return kwargs def get_success_url(self): return reverse("organization_edit", args=[self.object.pk]) class OrganizationEdit(LoginRequiredMixin, UpdateView): model = models.Organization form_class = forms.BaseOrganizationForm template_name = "ishtar/organization_form.html" def get_form_kwargs(self): kwargs = super(OrganizationEdit, self).get_form_kwargs() if hasattr(self.form_class, "form_prefix"): kwargs.update({"prefix": self.form_class.form_prefix}) return kwargs def get_success_url(self): return reverse("organization_edit", args=[self.object.pk]) class OrganizationPersonCreate(LoginRequiredMixin, CreateView): model = models.Person form_class = forms.BaseOrganizationPersonForm template_name = "ishtar/organization_person_form.html" relative_label = _("Corporation manager") def get_context_data(self, *args, **kwargs): data = super(OrganizationPersonCreate, self).get_context_data(*args, **kwargs) data["relative_label"] = self.relative_label return data def get_success_url(self): return reverse("organization_person_edit", args=[self.object.pk]) def form_valid(self, form): returned = super().form_valid(form) get_person_gdpr_log("new_item", self.request, None, self.model.objects.filter(pk=self.object.pk)) return returned class OrganizationPersonEdit(LoginRequiredMixin, UpdateView): model = models.Person form_class = forms.BaseOrganizationPersonForm template_name = "ishtar/organization_person_form.html" relative_label = _("Corporation manager") def get_context_data(self, *args, **kwargs): data = super(OrganizationPersonEdit, self).get_context_data(*args, **kwargs) data["relative_label"] = self.relative_label return data def get_success_url(self): return reverse("organization_person_edit", args=[self.object.pk]) def form_valid(self, form): returned = super().form_valid(form) get_person_gdpr_log("modify_item", self.request, None, self.model.objects.filter(pk=self.object.pk)) return returned # documents new_document_tag = new_qa_item( models.DocumentTag, forms.AddDocumentTagForm, page_name=_("New tag") ) autocomplete_documenttag = get_autocomplete_generic(models.DocumentTag) show_document = show_item(models.Document, "document") get_document = get_item( models.Document, "get_document", "document", search_form=forms.DocumentSelect ) display_document = display_item(models.Document) document_search_wizard = wizards.DocumentSearch.as_view( [("selec-document_search", forms.DocumentFormSelection)], label=_("Document: search"), url_name="search-document", ) class DocumentFormMixin(IshtarMixin, LoginRequiredMixin): form_class = forms.DocumentForm template_name = "ishtar/forms/base_related_items.html" model = models.Document def get_context_data(self, **kwargs): data = super(DocumentFormMixin, self).get_context_data(**kwargs) data["extra_form_modals"] = self.form_class.extra_form_modals return data def get_success_url(self): return reverse("edit-document") + "?open_item={}".format(self.object.pk) class DocumentCreateView(DocumentFormMixin, CreateView): page_name = _("Document creation") def get_form_kwargs(self): kwargs = super(DocumentCreateView, self).get_form_kwargs() initial = kwargs.get("initial", {}) source = None if self.request.GET.get("source_pk", None): try: source = models.Document.objects.get( pk=self.request.GET.get("source_pk") ) except models.Document.DoesNotExist: raise Http404() for related_key in models.Document.RELATED_MODELS_ALT: model = models.Document._meta.get_field(related_key).related_model if model.SLUG in self.request.GET: try: item = model.objects.get(pk=self.request.GET[model.SLUG]) except model.DoesNotExist: continue initial[related_key] = str(item.pk) if source: for k in models.Document.RELATED_MODELS: value = getattr(source, k) if hasattr(value, "all"): value = ",".join([str(v.pk) for v in value.all()]) if hasattr(value, "pk"): value = value.pk initial[k] = value initial["source"] = source.pk if initial: kwargs["initial"] = initial kwargs["user"] = self.request.user return kwargs def form_valid(self, form): returned = super().form_valid(form) ct = ContentType.objects.get_for_model(self.object) for profile in self.request.user.ishtaruser.person.profiles.all(): for permission_type in ("view", "change", "delete"): profile.generate_permission( ct, permission_type, obj_id=self.object.pk ) return returned class DocumentSelectView(IshtarMixin, LoginRequiredMixin, FormView): form_class = forms.DocumentFormSelection template_name = "ishtar/form.html" redir_url = "edit-document" def form_valid(self, form): self.pk = form.cleaned_data["pk"] return super(DocumentSelectView, self).form_valid(form) def get_form_kwargs(self): kwargs = super(DocumentSelectView, self).get_form_kwargs() kwargs["user"] = self.request.user return kwargs def get_context_data(self, **kwargs): data = super(DocumentSelectView, self).get_context_data(**kwargs) if self.request.GET and "open_item" in self.request.GET: data["open_url"] = ( reverse("show-document", args=[self.request.GET["open_item"]]) + "/" ) return data def get_success_url(self): return reverse(self.redir_url, args=[self.pk]) class DocumentEditView(DocumentFormMixin, UpdateView): page_name = _("Document modification") def get_form_kwargs(self): kwargs = super(DocumentEditView, self).get_form_kwargs() try: document = models.Document.objects.get(pk=self.kwargs.get("pk")) except models.Document.DoesNotExist: raise Http404() if not check_permission(self.request, "ishtar_common.change_document", document): raise Http404() initial = {} for k in ( list(self.form_class.base_fields.keys()) + models.Document.RELATED_MODELS ): value = getattr(document, k) if hasattr(value, "all"): value = ",".join([str(v.pk) for v in value.all()]) if hasattr(value, "pk"): value = value.pk initial[k] = value # main image initialisation if document.image: kwargs["main_items_fields"] = {} for k in models.Document.RELATED_MODELS: kwargs["main_items_fields"][k] = [] for related_item in getattr(document, k).all(): key = "{}_{}_main_image".format(k, related_item.pk) kwargs["main_items_fields"][k].append( (key, "{} - {}".format(_("Main image for"), related_item)) ) if related_item.main_image == document: initial[key] = True kwargs["initial"] = initial kwargs["user"] = self.request.user self.document = document return kwargs def get_context_data(self, **kwargs): kwargs = super(DocumentEditView, self).get_context_data(**kwargs) rel = self.document.cache_related_label if len(rel) == 1000: # truncated rel += " (...)" kwargs["item_related_label"] = rel return kwargs document_deletion_steps = [ ("selec-document_deletion", forms.DocumentFormMultiSelection), ("final-document_deletion", FinalDeleteForm), ] document_deletion_wizard = wizards.DocumentDeletionWizard.as_view( document_deletion_steps, label=_("Document deletion"), url_name="document_deletion", ) def document_delete(request, pk): if not wizard_is_available(document_deletion_wizard, request, models.Document, pk): return HttpResponseRedirect("/") wizard_url = "document_deletion" wizards.DocumentDeletionWizard.session_set_value( request, "selec-" + wizard_url, "pks", pk, reset=True ) return redirect(reverse(wizard_url, kwargs={"step": "final-" + wizard_url})) def get_bookmark(request, pk): try: sq = models.SearchQuery.objects.get( pk=pk, profile__person__ishtaruser__user_ptr=request.user ) except models.SearchQuery.DoesNotExist: raise Http404() slug = sq.content_type.model_class().SLUG return redirect(reverse(slug + "_search") + "?bookmark={}".format(sq.pk)) def gen_generate_doc(model): def func(request, pk, template_pk=None): ishtaruser = getattr(request.user, "ishtaruser", None) if not ishtaruser: return HttpResponse(content_type="text/plain") meta = model._meta perm = f"{meta.app_label}.view_{meta.model_name}" if not ishtaruser.has_permission(perm): return HttpResponse(content_type="text/plain") try: item = model.objects.get(pk=pk) doc = item.publish(template_pk) except model.DoesNotExist: doc = None except TemplateSyntaxError as e: dct = { "error_title": _("Error on your template"), "error": str(e), "back_url": reverse("display-item", args=[item.SLUG, pk]), } template = loader.get_template("error.html") return HttpResponse(template.render(dct, request)) if doc: MIMES = { "odt": "application/vnd.oasis.opendocument.text", "ods": "application/vnd.oasis.opendocument.spreadsheet", } ext = doc.split(".")[-1] doc_name = item.get_filename() + "." + ext mimetype = "text/csv" if ext in MIMES: mimetype = MIMES[ext] with open(doc, "rb") as d: response = HttpResponse(d, content_type=mimetype) response["Content-Disposition"] = "attachment; filename=%s" % doc_name return response return HttpResponse(content_type="text/plain") return func class SearchQueryMixin(object): """ Manage content type and profile init """ def dispatch(self, request, *args, **kwargs): if not request.user.pk: raise Http404() try: self.profile = models.UserProfile.objects.get( current=True, person__ishtaruser__user_ptr=request.user ) except models.UserProfile.DoesNotExist: # no current profile raise Http404() self.app_label = kwargs.get("app_label") self.model = kwargs.get("model") model = self.model if model == "site": model = "archaeologicalsite" try: self.content_type = ContentType.objects.get( app_label=self.app_label.replace("-", "_"), model=model.replace("-", "_"), ) except ContentType.DoesNotExist: raise Http404() return super(SearchQueryMixin, self).dispatch(request, *args, **kwargs) class SearchQueryEdit(SearchQueryMixin, LoginRequiredMixin, FormView): template_name = "ishtar/forms/search_query.html" form_class = forms.SearchQueryForm def get_form_kwargs(self): kwargs = super(SearchQueryEdit, self).get_form_kwargs() kwargs["profile"] = self.profile kwargs["content_type"] = self.content_type return kwargs def get_context_data(self, **kwargs): data = super(SearchQueryEdit, self).get_context_data(**kwargs) data["app_label"] = self.app_label data["model"] = self.model return data def form_valid(self, form): form.save() return HttpResponseRedirect(self.get_success_url()) def get_success_url(self): return reverse("success", args=["bookmark"]) class SuccessView(TemplateView): template_name = "ishtar/forms/success.html" def get_context_data(self, **kwargs): data = super().get_context_data(**kwargs) msg = self.request.GET.get("message") if msg: data["message"] = urllib.parse.unquote(msg) return data class BookmarkList( SearchQueryMixin, JSONResponseMixin, LoginRequiredMixin, TemplateView ): def get_data(self, context): q = models.SearchQuery.objects.filter( content_type=self.content_type, profile=self.profile ) return { "bookmarks": [ {"label": sq.label, "query": sq.query, "id": sq.id} for sq in q.all() ] } class QRCodeForSearchView(LoginRequiredMixin, FormView): template_name = "ishtar/forms/qrcode_for_search.html" form_class = forms.QRSearchForm def form_valid(self, form): context_data = self.get_context_data() context_data["qr_code"] = form.save() return self.render_to_response(context_data) class SearchQueryDelete(LoginRequiredMixin, DeleteView): model = models.SearchQuery template_name = "ishtar/forms/bookmark_delete.html" page_name = _("Delete bookmark") def dispatch(self, request, *args, **kwargs): if not request.user.pk: raise Http404() try: self.profile = models.UserProfile.objects.get( current=True, person__ishtaruser__user_ptr=request.user ) except models.UserProfile.DoesNotExist: # no current profile raise Http404() try: self.search_query = models.SearchQuery.objects.get( profile=self.profile, pk=kwargs["pk"] ) except models.SearchQuery.DoesNotExist: raise Http404() return super(SearchQueryDelete, self).dispatch(request, *args, **kwargs) def get_context_data(self, **kwargs): data = super(SearchQueryDelete, self).get_context_data(**kwargs) data["modal_size"] = "small" data["page_name"] = _("Bookmark - Delete") data["action_name"] = _("Delete") data["item"] = self.search_query.label data["url"] = reverse("bookmark-delete", args=[self.search_query.pk]) return data def get_success_url(self): return reverse("success", args=["bookmark"]) class AlertList(JSONResponseMixin, LoginRequiredMixin, TemplateView): def dispatch(self, request, *args, **kwargs): if not request.user.pk: raise Http404() try: self.profile = models.UserProfile.objects.get( current=True, person__ishtaruser__user_ptr=request.user ) except models.UserProfile.DoesNotExist: # no current profile raise Http404() return super(AlertList, self).dispatch(request, *args, **kwargs) def get_data(self, context): q = models.SearchQuery.objects.filter(profile=self.profile, is_alert=True) alerts = [] for sq in q.all(): model = sq.content_type.model_class() module = model.__module__.split(".")[0] views = importlib.import_module(module + ".views") try: get_view = getattr(views, "get_" + model.SLUG) except AttributeError: continue nb = get_view(self.request, query={"search_vector": sq.query}, count=True) alerts.append({"label": sq.label, "query_id": sq.pk, "number": nb}) return {"alerts": alerts} class QANotAvailable(IshtarMixin, LoginRequiredMixin, TemplateView): template_name = "ishtar/forms/qa_message.html" modal_size = "small" contexts = { "locked-by-others": _("Some items have been locked by other " "users."), "locked": _("Some items are locked."), } def get_context_data(self, **kwargs): data = super(QANotAvailable, self).get_context_data(**kwargs) data["page_name"] = _("Not available") data["message"] = _("Action not available for these items.") if self.kwargs.get("context"): context = self.kwargs.get("context") if context in self.contexts: data["message"] += " {}".format(self.contexts[context]) return data class QAItemForm(IshtarMixin, LoginRequiredMixin, FormView): template_name = "ishtar/forms/qa_form.html" model = None base_url = None form_class = None page_name = "" success_url = "/success/" modal_size = None # large, small or None (medium) icon = "fa fa-pencil" action_name = None permissions = [] def get_quick_action(self): quick_action = self.model.get_quick_action_by_url(self.base_url) if quick_action: return quick_action # if not listed in QUICK_ACTIONS return QuickAction( url=self.base_url, icon_class=self.icon, text=self.page_name, rights=self.permissions ) def pre_dispatch(self, request, *args, **kwargs): if not self.model: if "model" in kwargs: self.model = kwargs["model"] else: raise NotImplementedError("No attribute model defined.") pks = kwargs.get("pks") if isinstance(pks, int): pks = [pks] else: pks = [int(pk) for pk in kwargs.get("pks").split("-")] self.items = list(self.model.objects.filter(pk__in=pks)) if not self.items: raise Http404() # check availability quick_action = self.get_quick_action() if not quick_action: raise Http404() if not quick_action.is_available(user=request.user): for item in self.items: if not quick_action.is_available( user=request.user, obj=item ): raise Http404() self.url = request.get_full_path() def dispatch(self, request, *args, **kwargs): redirected = self.pre_dispatch(request, *args, **kwargs) if redirected: return redirected return super(QAItemForm, self).dispatch(request, *args, **kwargs) def get_form_kwargs(self): kwargs = super(QAItemForm, self).get_form_kwargs() kwargs["items"] = self.items return kwargs def get_page_name(self): page_name = "" if self.icon: page_name = f'  ' return page_name + "{} – {}".format( self.model._meta.verbose_name, self.page_name ) def get_context_data(self, **kwargs): data = super().get_context_data(**kwargs) data["url"] = self.url data["items"] = self.items data["modal_size"] = self.modal_size data["page_name"] = self.get_page_name() if self.action_name: data["action_name"] = self.action_name return data class QAItemEditForm(QAItemForm): form_class_multi = None modal_size = "large" def get_quick_action(self): return self.model.QA_EDIT def pre_dispatch(self, request, *args, **kwargs): self.confirm = kwargs.get("confirm", False) and True redirected = super(QAItemEditForm, self).pre_dispatch(request, *args, **kwargs) if redirected: return redirected if hasattr(self.model, "is_locked"): for item in self.items: if item.is_locked(request.user): redirected = HttpResponseRedirect( reverse("qa-not-available", args=["locked"]) ) return redirected def get_form_class(self): if len(self.items) > 1 and self.form_class_multi: return self.form_class_multi return self.form_class def get_form_kwargs(self): kwargs = super(QAItemEditForm, self).get_form_kwargs() kwargs["confirm"] = self.confirm return kwargs def get_page_name(self): page_name = "" if self.icon: page_name = f'  ' if not self.page_name: return page_name + "{} – {}".format( self.model._meta.verbose_name, self.model.QA_EDIT.text ) else: return page_name + "{} – {}".format( self.model._meta.verbose_name, self.page_name ) def get_context_data(self, **kwargs): data = super().get_context_data(**kwargs) if self.confirm: if "confirm" not in self.url: data["url"] = self.url.split("?")[0] + "confirm/" data["confirm"] = True data["action_name"] = _("Confirm") return data def form_valid(self, form): if not self.confirm: self.confirm = True return self.render_to_response(self.get_context_data(form=self.get_form())) return self.form_save(form) def form_save(self, form): message = form.save(self.items, self.request.user) extra_args = "" if message: extra_args = "?message=" + urllib.parse.quote(str(message)) return HttpResponseRedirect(reverse("success") + extra_args) class QABaseLockView(QAItemForm): form_class = forms.QALockForm page_name = _("Lock/unlock") icon = "fa fa-lock" def pre_dispatch(self, request, *args, **kwargs): super(QABaseLockView, self).pre_dispatch(request, *args, **kwargs) if [ True for item in self.items if item.lock_user and item.lock_user != request.user ]: url = reverse("qa-not-available", args=["locked-by-others"]) return HttpResponseRedirect(url) def form_valid(self, form): form.save(self.items, self.request.user) return HttpResponseRedirect(reverse("success")) class QALinkView(QAItemForm): form_class = forms.QALinkForm page_name = _("Link items") icon = "fa fa-link" def pre_dispatch(self, request, *args, **kwargs): self.base_url = kwargs["url"] super().pre_dispatch(request, *args, **kwargs) if not request.user.ishtaruser.is_ishtaradmin: url = reverse("qa-not-available") return HttpResponseRedirect(url) def form_valid(self, form): form.save(self.items, self.request.user) return HttpResponseRedirect(reverse("success")) class QAOrganizationForm(QAItemEditForm): model = models.Organization form_class = forms.QAOrganizationFormMulti class QAPersonForm(QAItemEditForm): model = models.Person form_class = forms.QAPersonFormMulti def form_save(self, form): get_person_gdpr_log("modify_item", self.request, None, self.model.objects.filter(pk__in=[item.pk for item in self.items])) form.save(self.items, self.request.user) return HttpResponseRedirect(reverse("success")) class QABiographicalNoteForm(QAItemForm): model = models.BiographicalNote form_class = forms.BiographicalNoteEditForm page_name = _("Modify") def get_quick_action(self): return self.model.QA_EDIT def get_form_kwargs(self): kwargs = super().get_form_kwargs() kwargs["user"] = self.request.user return kwargs def form_valid(self, form): return self.form_save(form) def form_save(self, form): form.save(self.request.user, self.items[0]) return HttpResponseRedirect(reverse("success")) class QADocumentForm(QAItemEditForm): model = models.Document form_class = forms.QADocumentFormMulti class QADocumentDuplicateFormView(QAItemForm): template_name = "ishtar/forms/qa_document_duplicate.html" model = models.Document page_name = _("Duplicate") form_class = forms.QADocumentDuplicateForm base_url = "document-qa-duplicate" def get_form_kwargs(self): kwargs = super(QADocumentDuplicateFormView, self).get_form_kwargs() kwargs["user"] = self.request.user return kwargs def form_valid(self, form): new = form.save() if form.cleaned_data.get("open_edit", False): redir = reverse("edit-document", args=[new.pk]).replace("/", "|") return HttpResponseRedirect(reverse("success", args=["redirect", redir])) return HttpResponseRedirect(reverse("success")) def get_context_data(self, **kwargs): data = super(QADocumentDuplicateFormView, self).get_context_data(**kwargs) data["action_name"] = _("Duplicate") return data class QADocumentPackagingFormView(QAItemForm): template_name = "ishtar/forms/qa_document_packaging.html" model = models.Document form_class = forms.QADocumentPackagingForm page_name = _("Packaging") base_url = "document-qa-packaging" def dispatch(self, request, *args, **kwargs): returned = super(QADocumentPackagingFormView, self).dispatch( request, *args, **kwargs ) """ for item in self.items: if item.is_locked(request.user): return HttpResponseRedirect(reverse("qa-not-available")) """ return returned def get_form_kwargs(self): kwargs = super(QADocumentPackagingFormView, self).get_form_kwargs() kwargs["user"] = self.request.user kwargs["prefix"] = "qa-packaging" return kwargs def form_valid(self, form): form.save(self.items, self.request.user) return HttpResponseRedirect(reverse("success")) class DisplayItemView(IshtarMixin, TemplateView): template_name = "ishtar/display_item.html" SHOW_VIEWS = {"document": show_document} ASSOCIATED_MODEL = {"document": models.Document} def dispatch(self, request, *args, **kwargs): if not self.request.user.is_authenticated: return auth_view.redirect_to_login(reverse("display-item", kwargs=kwargs)) return super(DisplayItemView, self).dispatch(request, *args, **kwargs) def get_context_data(self, *args, **kwargs): data = super(DisplayItemView, self).get_context_data(*args, **kwargs) pk = kwargs.get("pk") item_type = kwargs.get("item_type") if item_type in self.SHOW_VIEWS: data["view_content"] = self.SHOW_VIEWS[item_type](self.request, pk).content.decode("utf-8") if item_type in self.ASSOCIATED_MODEL and hasattr( self.ASSOCIATED_MODEL[item_type], "extra_meta" ): model = self.ASSOCIATED_MODEL[item_type] try: data["extra_meta"] = model.objects.get(pk=pk).extra_meta except model.DoesNotExist: pass else: data["show_url"] = "/show-{}/{}/".format(item_type, pk) return data class GeoPreCreateView(IshtarMixin, LoginRequiredMixin, FormView): page_name = _("Geo item creation") form_class = forms.PreGISForm template_name = "ishtar/forms/base_form.html" def get(self, request, *args, **kwargs): self.back_url = request.GET.get("back_url") return super().get(request, *args, **kwargs) def post(self, request, *args, **kwargs): self.back_url = request.POST.get("back_url") return super().post(request, *args, **kwargs) def get_form_kwargs(self): kwargs = super().get_form_kwargs() kwargs["back_url"] = self.back_url return kwargs def form_valid(self, form): model_source = self.kwargs.get("model_source") source_pk = self.kwargs.get("source_pk") find_id = None if model_source == "find": try: find = Find.objects.get(pk=source_pk) except Find.DoesNotExist: raise Http404() bf = find.get_first_base_find() if not bf: raise Http404() model_source = "basefind" find_id = source_pk source_pk = bf.pk success_url = reverse( "create-geo", kwargs={ "app_source": self.kwargs.get("app_source"), "model_source": model_source, "source_pk": source_pk, "geom_type": form.cleaned_data.get("geom_type"), } ) if self.back_url: success_url += "?back_url=" + urllib.parse.quote(self.back_url) if find_id: success_url += "&" if self.back_url else "?" success_url += f"find_id={find_id}" return HttpResponseRedirect(success_url) class GeoFormMixin(IshtarMixin, LoginRequiredMixin): form_class = forms.GISForm template_name = "ishtar/forms/geo_form.html" model = models.GeoVectorData def get(self, request, *args, **kwargs): self.back_url = request.GET.get("back_url") self.find_id = request.GET.get("find_id") return super().get(request, *args, **kwargs) def post(self, request, *args, **kwargs): self.back_url = request.POST.get("back_url") self.find_id = request.POST.get("find_id") return super().post(request, *args, **kwargs) def get_context_data(self, **kwargs): data = super(GeoFormMixin, self).get_context_data(**kwargs) data["extra_form_modals"] = self.form_class.extra_form_modals return data def get_success_url(self): if not self.back_url: for rel_model_key in models.GeoVectorData.RELATED_MODELS: # should in a logic order from largest to close # town before operation, operation before context record... rel = getattr(self.object, rel_model_key) if rel.count(): rel_item = rel.all()[0] if not hasattr(rel_item, "SLUG"): continue return reverse("display-item", kwargs={"item_type": rel_item.SLUG, "pk": rel_item.pk}) return reverse("edit-geo", kwargs={"pk": self.object.pk}) back_url = self.back_url if self.find_id: back_url = back_url.split("=")[0] + "=" + str(self.find_id) return back_url def get_form_kwargs(self): kwargs = super().get_form_kwargs() kwargs["back_url"] = self.back_url kwargs["find_id"] = self.find_id return kwargs class GeoCreateView(GeoFormMixin, CreateView): page_name = _("Geo item creation") def get_form_kwargs(self): if not hasattr(self.request.user, "ishtaruser"): raise Http404() ishtaruser = self.request.user.ishtaruser kwargs = super(GeoCreateView, self).get_form_kwargs() try: content_type = ContentType.objects.get( app_label=self.kwargs.get("app_source"), model=self.kwargs.get("model_source") ) except ContentType.DoesNotExist: raise Http404() model = content_type.model_class() try: obj = model.objects.get(pk=self.kwargs.get("source_pk")) except model.DoesNotExist: raise Http404() # check permission to add and view attached item attached_meta = model._meta perm_attached = f"{attached_meta.app_label}.view_{attached_meta.model_name}" perm_own_attached = f"{attached_meta.app_label}.view_own_{attached_meta.model_name}" if not ishtaruser.has_permission( "ishtar_common.add_geovectordata") or ( not ishtaruser.has_permission(perm_attached) and not ishtaruser.has_permission(perm_own_attached, obj=obj)): # check permission to view own attached item raise Http404() kwargs["main_items_fields"] = {} for k in models.GeoVectorData.RELATED_MODELS: kwargs["main_items_fields"][k] = [] if k.endswith(obj.SLUG): kwargs["initial"][k] = [obj.pk] kwargs["initial"]["name"] = obj.short_label \ if hasattr(obj, "short_label") else str(obj) key = f"{k}_{obj.pk}_main_item" kwargs["main_items_fields"][k].append( (key, "{} - {}".format(_("Main geo item for"), obj)) ) kwargs["source_content_type"] = content_type.pk kwargs["source_id"] = obj.pk kwargs["geom_type"] = self.kwargs.get("geom_type") main_geodata = obj.main_geodata if main_geodata and main_geodata.cached_x and main_geodata.cached_y: kwargs["default_center"] = (main_geodata.cached_x, main_geodata.cached_y) kwargs["user"] = self.request.user return kwargs class GeoEditView(GeoFormMixin, UpdateView): page_name = _("Geo item modification") def get_form_kwargs(self): kwargs = super(GeoEditView, self).get_form_kwargs() try: geo = models.GeoVectorData.objects.get(pk=self.kwargs.get("pk")) except models.GeoVectorData.DoesNotExist: raise Http404() if not check_permission(self.request, "ishtar_common.change_geovectordata", geo): raise Http404() initial = {} for k in ( list(self.form_class.base_fields.keys()) + models.GeoVectorData.RELATED_MODELS ): value = getattr(geo, k) if hasattr(value, "all"): value = ",".join([str(v.pk) for v in value.all().order_by("pk")]) if hasattr(value, "pk"): value = value.pk initial[k] = value kwargs["main_items_fields"] = {} kwargs["too_many"] = {} LIMIT = 10 for k in models.GeoVectorData.RELATED_MODELS: kwargs["main_items_fields"][k] = [] values = [] for idx, related_item in enumerate(getattr(geo, k).all()): if idx >= LIMIT: if k not in kwargs["too_many"]: kwargs["too_many"][k] = [] kwargs["too_many"][k].append(related_item.pk) continue pk = str(related_item.pk) values.append(pk) key = "{}_{}_main_item".format(k, pk) kwargs["main_items_fields"][k].append( (key, "{} - {}".format(_("Main geo item for"), related_item)) ) if related_item.main_geodata == geo: initial[key] = True initial[k] = ",".join(values) kwargs["initial"] = initial kwargs["user"] = self.request.user return kwargs class GeoDeleteView(IshtarMixin, LoginRequiredMixin, DeleteView): template_name = "ishtar/forms/geo_delete_form.html" model = models.GeoVectorData page_name = _("Delete geographic item") def get(self, request, *args, **kwargs): self.back_url = request.GET.get("back_url", None) return super().get(request, *args, **kwargs) def get_context_data(self, **kwargs): context_data = super().get_context_data(**kwargs) if self.back_url: context_data["back_url"] = self.back_url return context_data def get_success_url(self): if self.request.POST.get("back_url", None): return self.request.POST.get("back_url", None) return reverse("start")