#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2010-2025 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import csv import datetime import importlib from jinja2 import TemplateSyntaxError import json import logging import os import re import tempfile import unicodedata import urllib.parse try: from registration import views as registration_views # debian except ModuleNotFoundError: from django_registration import views as registration_views # pip from django.apps import apps from django.conf import settings from django.contrib import messages from django.contrib.auth import logout from django.contrib.auth.decorators import login_required from django.contrib.auth import views as auth_view from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ObjectDoesNotExist, PermissionDenied from django.core.cache import cache from django.db.models import Q from django.template import loader from django.forms.models import modelformset_factory from django.http import ( HttpResponse, Http404, HttpResponseRedirect, HttpResponseBadRequest, JsonResponse, ) from django.shortcuts import redirect, render, get_object_or_404 from django.urls import reverse, NoReverseMatch from django.utils import translation from django.utils.decorators import method_decorator from django.utils.translation import ugettext, ugettext_lazy as _ from django.views.generic import ListView, TemplateView, View from django.views.generic.edit import CreateView, DeleteView, FormView, UpdateView from extra_views import ModelFormSetView from markdown import markdown from . import models from archaeological_context_records.models import ContextRecord from archaeological_files.models import File from archaeological_finds.models import Find, Treatment, TreatmentFile from archaeological_operations.models import Operation, ArchaeologicalSite from archaeological_warehouse.models import Warehouse from ishtar_common import forms_common as forms from ishtar_common import wizards from ishtar_common.data_importer import ImporterError from ishtar_common.forms import FinalForm, FinalDeleteForm, reverse_lazy from ishtar_common.models import get_current_profile from ishtar_common.models_common import QuickAction from ishtar_common.templatetags.link_to_window import simple_link_to_window from ishtar_common.utils_migrations import HOMEPAGE_TITLE from ishtar_common.utils import ( BSMessage, clean_session_cache, CSV_OPTIONS, get_current_item_keys, get_current_item_keys_dict, get_field_labels_from_path, get_ishtaruser_gdpr_log, get_person_gdpr_log, get_random_item_image_link, get_news_feed, shortify, dict_to_tuple, put_session_message, get_model_by_slug, human_date, ) from ishtar_common.widgets import JQueryAutoComplete from ishtar_common import tasks from .views_item import ( check_permission, display_item, get_autocomplete_query, get_item, get_short_html_detail, modify_qa_item, new_qa_item, show_item, ) convert_document = None if settings.USE_LIBREOFFICE: from ishtar_common.libreoffice import convert_document logger = logging.getLogger(__name__) def status(request): return HttpResponse("OK") def raise_error(request): return 1 / 0 def raise_task_error(request): if settings.USE_BACKGROUND_TASK: tasks.trigger_error.delay() return HttpResponse("OK") def wizard_is_available(wizard, request, model, pk): try: wizard(request) except IndexError: # no step available put_session_message( request.session.session_key, _("You don't have sufficient permissions to do this action."), "warning", ) return q = model.objects.filter(pk=pk) if not q.count(): raise Http404() obj = q.all()[0] if hasattr(obj, "is_locked") and obj.is_locked(request.user): raise Http404() return obj def tiny_redirect(request, url_id): db_id = models.TinyUrl.decode_id(url_id) link_db = get_object_or_404(models.TinyUrl, id=db_id) return redirect(link_db.link) def _count_stats(model): cache_key = f"{settings.PROJECT_SLUG}-stats-{model.__name__}" value = cache.get(cache_key) if value: return value value = model.objects.count() cache.set(cache_key, value, settings.CACHE_TIMEOUT) return value def _get_statistics(profile): stats = [ (_("Operations"), _count_stats(apps.get_model("archaeological_operations", "Operation"))) ] if profile.archaeological_site: stats.append((_("Archaeological sites"), _count_stats(apps.get_model("archaeological_operations", "ArchaeologicalSite")))) if profile.context_record: stats.append((_("Context records"), _count_stats(apps.get_model("archaeological_context_records", "ContextRecord")))) if profile.find: stats.append((_("Finds"), _count_stats(apps.get_model("archaeological_finds", "Find")))) if profile.warehouse: stats.append((_("Warehouses"), _count_stats(apps.get_model("archaeological_warehouse", "Warehouse")))) stats.append((_("Containers"), _count_stats(apps.get_model("archaeological_warehouse", "Container")))) if profile.files: stats.append((_("Archaeological files"), _count_stats(apps.get_model("archaeological_files", "File")))) stats.append((_("Administrative acts"), _count_stats(apps.get_model("archaeological_operations", "AdministrativeAct")))) return stats def index(request): """ Main page """ profile = get_current_profile() welcome_title = profile.homepage_title or str(HOMEPAGE_TITLE) dct = {"warnings": [], "extra_form_modals": wizards.EXTRA_FORM_MODALS, "welcome_title": welcome_title} if settings.PROJECT_SLUG == "default": dct["warnings"].append( _( 'PROJECT_SLUG is set to "default". Change it in your ' "local_settings (or ask your admin to do it)." ) ) if profile.slug == "default": dct["warnings"].append( _( 'The slug of your current profile is set to "default". Change it ' "on the administration page (or ask your admin to do it)." ) ) authenticated = request.user.is_authenticated dct["news_feed"] = get_news_feed() display_random_image = authenticated and profile.homepage_random_image_available if display_random_image: dct["display_random_image"] = True dct["random_image"] = get_random_item_image_link(request) if profile.homepage: dct["homepage"] = markdown(profile.homepage) # remove old hardcoded "{random_image}" from custom templates if "{random_image}" in dct["homepage"]: dct["homepage"] = dct["homepage"].replace("{random_image}", "") else: old_language = translation.get_language() translation.activate(settings.LANGUAGE_CODE) dct["homepage"] = loader.render_to_string("ishtar/homepage_default.html") translation.activate(old_language) dct["homepage"] = dct["homepage"].replace("ISHTAR_DOCUMENT_VERSION", settings.ISHTAR_DOCUMENT_VERSION) display_statistics = profile.homepage_statistics_available and ( authenticated or profile.homepage_statistics_available_offline ) if display_statistics: dct["display_statistics"] = True dct["statistics"] = _get_statistics(profile) try: return render(request, "index.html", dct) except NoReverseMatch: # probably rights exception (rights revoked) logout(request) return render(request, "index.html", dct) def display_news_feed(request): """ Display news feed """ news_feed = "" if hasattr(request.user, "ishtaruser") and \ request.user.ishtaruser and request.user.ishtaruser.display_forum_entries: news_feed = get_news_feed() return render(request, "ishtar/blocks/news_feed.html", {"news_feed": news_feed}) class LoginView(auth_view.LoginView): form_class = forms.AuthenticationForm def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["registration_open"] = settings.REGISTRATION_OPEN return context class LogoutView(auth_view.LogoutView): def get(self, request, *args, **kwargs): # clear cache keys = [] if request.user and hasattr(request.user, "pk") and request.user.pk: keys.append(f"{settings.PROJECT_SLUG}-password_expired-{request.user.pk}") for key in keys: cache.delete(key) return super().get(request, *args, **kwargs) def update_password_last_update(user): try: ishtar_user = models.IshtarUser.objects.get(pk=user.pk) except models.IshtarUser.DoesNotExist: return ishtar_user.password_last_update = datetime.date.today() ishtar_user.save() key = f"{settings.PROJECT_SLUG}-password_expired-{user.pk}" cache.delete(key) class PasswordChangeView(auth_view.PasswordChangeView): form_class = forms.PasswordChangeForm success_url = reverse_lazy('start') template_name = 'registration/form.html' def form_valid(self, form): returned = super().form_valid(form) update_password_last_update(form.user) messages.add_message(self.request, messages.INFO, _("Password changed")) return returned def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["page_title"] = _("Change password") return context class PasswordResetConfirmView(auth_view.PasswordResetConfirmView): form_class = forms.SetPasswordForm success_url = reverse_lazy('login') def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["page_title"] = _("Password reset") return context def form_valid(self, form): returned = super().form_valid(form) update_password_last_update(form.user) messages.add_message(self.request, messages.INFO, _("Password changed")) return returned class RegistrationView(registration_views.RegistrationView): template_name = 'registration/form.html' def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["page_title"] = _("Register") return context def register(self, form): # TODO raise NotImplementedError class PasswordResetView(auth_view.PasswordResetView): template_name = 'registration/form.html' success_url = reverse_lazy('start') def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["page_title"] = _("Reset password") return context def form_valid(self, form): returned = super().form_valid(form) messages.add_message( self.request, messages.INFO, _("Email with password reset instructions has been sent.") ) return returned person_search_wizard = wizards.PersonSearch.as_view( [("general-person_search", forms.PersonFormSelection)], label=_("Person search"), url_name="person_search", ) person_creation_wizard_steps = [ ("identity-person_creation", forms.PersonForm), ("final-person_creation", FinalForm), ] person_creation_wizard = wizards.PersonWizard.as_view( person_creation_wizard_steps, label=_("New person"), url_name="person_creation", ) person_modification_wizard = wizards.PersonModifWizard.as_view( [ ("selec-person_modification", forms.PersonFormSelection), ("identity-person_modification", forms.PersonForm), ("final-person_modification", FinalForm), ], label=_("Person modification"), url_name="person_modification", ) def person_modify(request, pk): if not wizard_is_available(person_modification_wizard, request, models.Person, pk): return HttpResponseRedirect("/") wizards.PersonModifWizard.session_set_value( request, "selec-person_modification", "pk", pk, reset=True ) return redirect( reverse("person_modification", kwargs={"step": "identity-person_modification"}) ) person_deletion_wizard = wizards.PersonDeletionWizard.as_view( [ ("selec-person_deletion", forms.PersonFormMultiSelection), ("final-person_deletion", FinalDeleteForm), ], label=_("Person deletion"), url_name="person_deletion", ) def person_delete(request, pk): if not wizard_is_available(person_deletion_wizard, request, models.Person, pk): return HttpResponseRedirect("/") wizards.PersonDeletionWizard.session_set_value( request, "selec-person_deletion", "pks", pk, reset=True ) return redirect( reverse("person_deletion", kwargs={"step": "final-person_deletion"}) ) organization_search_wizard = wizards.OrganizationSearch.as_view( [("general-organization_search", forms.OrganizationFormSelection)], label=_("Organization search"), url_name="organization_search", ) organization_creation_wizard = wizards.OrganizationWizard.as_view( [ ("identity-organization_creation", forms.OrganizationForm), ("final-organization_creation", FinalForm), ], label=_("New organization"), url_name="organization_creation", ) organization_modification_wizard = wizards.OrganizationModifWizard.as_view( [ ("selec-organization_modification", forms.OrganizationFormSelection), ("identity-organization_modification", forms.OrganizationForm), ("final-organization_modification", FinalForm), ], label=_("Organization modification"), url_name="organization_modification", ) def organization_modify(request, pk): if not wizard_is_available( organization_modification_wizard, request, models.Organization, pk ): return HttpResponseRedirect("/") wizards.OrganizationModifWizard.session_set_value( request, "selec-organization_modification", "pk", pk, reset=True ) return redirect( reverse( "organization_modification", kwargs={"step": "identity-organization_modification"}, ) ) organization_deletion_wizard = wizards.OrganizationDeletionWizard.as_view( [ ("selec-organization_deletion", forms.OrganizationFormMultiSelection), ("final-organization_deletion", FinalDeleteForm), ], label=_("Organization deletion"), url_name="organization_deletion", ) def organization_delete(request, pk): if not wizard_is_available( organization_deletion_wizard, request, models.Organization, pk ): return HttpResponseRedirect("/") wizard_url = "organization_deletion" wizards.OrganizationDeletionWizard.session_set_value( request, "selec-" + wizard_url, "pks", pk, reset=True ) return redirect(reverse(wizard_url, kwargs={"step": "final-" + wizard_url})) account_wizard_steps = [ ("selec-account_management", forms.PersonUserFormSelection), ("account-account_management", forms.AccountForm), ("profile-account_management", forms.ProfileFormset), ("final-account_management", forms.FinalAccountForm), ] account_management_wizard = wizards.AccountWizard.as_view( account_wizard_steps, label=_("Account management"), url_name="account_management", ) account_deletion_wizard = wizards.IshtarUserDeletionWizard.as_view( [ ("selec-account_deletion", forms.AccountFormMultiSelection), ("final-account_deletion", FinalDeleteForm), ], label=_("Account deletion"), url_name="account_deletion", ) def account_manage(request, pk): if not wizard_is_available(account_management_wizard, request, models.Person, pk): return HttpResponseRedirect("/") wizards.AccountWizard.session_set_value( request, "selec-account_management", "pk", pk, reset=True ) return redirect( reverse("account_management", kwargs={"step": "account-account_management"}) ) def get_autocomplete_generic(model, extra=None): if not extra: extra = {"available": True} def func(request): q = request.GET.get("term") query = Q(**extra) nb = 0 objects = [] has_parent = hasattr(model, "parent") if not q: q = "" else: query_exact = query & Q(label__iexact=q) query_exact = model.objects.filter(query_exact) nb = query_exact.count() objects = list(query_exact.all()) for q in q.split(" "): if not q: continue sub_q = Q(label__icontains=q) if has_parent: sub_q |= Q(parent__label__icontains=q) sub_q |= Q(parent__parent__label__icontains=q) sub_q |= Q(parent__parent__parent__label__icontains=q) query = query & (sub_q) limit = 20 - nb if limit > 0: sort = ["label"] if has_parent: sort = ["parent__label", "label"] objects += list( model.objects.filter(query).order_by(*sort).distinct()[:limit] ) get_label = lambda x: x.full_label() if hasattr(x, "full_label") else str(x) data = json.dumps([{"id": obj.pk, "value": get_label(obj)} for obj in objects]) return HttpResponse(data, content_type="text/plain") return func def hide_shortcut_menu(request): request.session["SHORTCUT_SHOW"] = "off" return HttpResponse("OK", content_type="text/plain") def show_shortcut_menu(request): request.session["SHORTCUT_SHOW"] = "on" return HttpResponse("OK", content_type="text/plain") def activate_all_search(request): request.session["SHORTCUT_SEARCH"] = "all" return HttpResponse("OK", content_type="text/plain") def activate_own_search(request): request.session["SHORTCUT_SEARCH"] = "own" return HttpResponse("OK", content_type="text/plain") def activate_advanced_shortcut_menu(request): if not hasattr(request.user, "ishtaruser"): return HttpResponse("KO", content_type="text/plain") request.user.ishtaruser.advanced_shortcut_menu = True request.user.ishtaruser.save() return HttpResponse("OK", content_type="text/plain") def activate_simple_shortcut_menu(request): if not hasattr(request.user, "ishtaruser"): return HttpResponse("KO", content_type="text/plain") request.user.ishtaruser.advanced_shortcut_menu = False request.user.ishtaruser.save() return HttpResponse("OK", content_type="text/plain") def settings_js(request): request.is_js = True return render(request, "ishtar/settings.js", content_type="text/javascript") def shortcut_menu(request): profile = get_current_profile() CURRENT_ITEMS = [] if profile.files: CURRENT_ITEMS.append((_("Archaeological file"), File)) CURRENT_ITEMS.append((_("Operation"), Operation)) if profile.archaeological_site: CURRENT_ITEMS.append((profile.get_site_label(), ArchaeologicalSite)) if profile.context_record: CURRENT_ITEMS.append((_("Context record"), ContextRecord)) if profile.warehouse: CURRENT_ITEMS.append((_("Warehouse"), Warehouse)) if profile.find: CURRENT_ITEMS.append((_("Find"), Find)) if profile.warehouse: CURRENT_ITEMS.append((_("Treatment request"), TreatmentFile)) CURRENT_ITEMS.append((_("Treatment"), Treatment)) if ( hasattr(request.user, "ishtaruser") and request.user.ishtaruser.advanced_shortcut_menu ): dct = { "current_menu": [], "menu": [], "SHORTCUT_SEARCH": request.session["SHORTCUT_SEARCH"] if "SHORTCUT_SEARCH" in request.session else "own", "SHORTCUT_SHOW": request.session["SHORTCUT_SHOW"] if "SHORTCUT_SHOW" in request.session else "on", } current_selected_labels = [] for lbl, model in CURRENT_ITEMS: model_name = model.SLUG current = model_name in request.session and request.session[model_name] if current: try: item = model.objects.get(pk=int(current)) item_label = shortify(str(item), 60) current_selected_labels.append(item_label) except model.DoesNotExist: pass dct["menu"].append( ( lbl, model_name, current or 0, JQueryAutoComplete( reverse("get-" + model.SLUG + "-shortcut"), model ).render( model.SLUG + "-shortcut", value=current, attrs={"id": "current_" + model.SLUG}, ), ) ) dct["current_selected_labels"] = current_selected_labels return render(request, "ishtar/blocks/advanced_shortcut_menu.html", dct) dct = { "current_menu": [], "SHORTCUT_SHOW": request.session["SHORTCUT_SHOW"] if "SHORTCUT_SHOW" in request.session else "off", } current_selected_labels = [] current_selected_item = {} labels = {} for lbl, model in CURRENT_ITEMS: new_selected_item = None model_name = model.SLUG cls = "" current = model_name in request.session and request.session[model_name] items = [] current_items = [] labels[model_name] = {} lbl_key = "cached_label" if model.SLUG == "warehouse": lbl_key = "name" values = ["id", lbl_key] owns = ( model.get_owns( request.user, menu_filtr=current_selected_item, limit=100, values=values, get_short_menu_class=True, ) or [] ) for item, shortmenu_class in owns: pk = str(item["id"]) if shortmenu_class == "basket": pk = "basket-" + pk # prevent duplicates if pk in current_items: continue current_items.append(pk) selected = pk == current item_label = shortify(item[lbl_key], 60) if selected: cls = shortmenu_class new_selected_item = pk labels[model_name][str(pk)] = item_label items.append((pk, item_label, selected, shortmenu_class)) # selected is not in owns - add it to the list if not new_selected_item and current: try: item = model.objects.get(pk=int(current)) new_selected_item = item.pk item_label = shortify(str(item), 60) labels[model_name][str(item.pk)] = item_label items.append( (item.pk, item_label, True, item.get_short_menu_class(item.pk)) ) except (model.DoesNotExist, ValueError): pass if items: dct["current_menu"].append((lbl, model_name, cls, items)) if new_selected_item: current_selected_item[model_name] = new_selected_item if str(new_selected_item) in labels[model_name]: current_selected_labels.append( labels[model_name][str(new_selected_item)] ) dct["current_selected_labels"] = current_selected_labels return render(request, "ishtar/blocks/shortcut_menu.html", dct) def get_current_items(request): currents = {} current_item_keys = get_current_item_keys() for key, model in current_item_keys: currents[key] = None if key in request.session and request.session[key]: try: currents[key] = model.objects.get(pk=int(request.session[key])) except (ValueError, model.DoesNotExist): continue return currents def unpin(request, item_type, cascade=False): current_item_keys_dict = get_current_item_keys_dict() if item_type not in current_item_keys_dict.keys(): logger.warning("unpin unknow type: {}".format(item_type)) return HttpResponse("nok") if "administrativeact" in item_type: request.session[item_type] = "" return HttpResponse("ok") request.session["treatment"] = "" if item_type == "treatment" and not cascade: return HttpResponse("ok") request.session["treatmentfile"] = "" if item_type == "treatmentfile" and not cascade: return HttpResponse("ok") request.session["find"] = "" if item_type == "find" and not cascade: return HttpResponse("ok") request.session["warehouse"] = "" if item_type == "warehouse" and not cascade: return HttpResponse("ok") request.session["contextrecord"] = "" if item_type == "contextrecord" and not cascade: return HttpResponse("ok") request.session["site"] = "" if item_type == "site" and not cascade: return HttpResponse("ok") request.session["operation"] = "" if item_type == "operation" and not cascade: return HttpResponse("ok") request.session["file"] = "" if item_type == "file" and not cascade: return HttpResponse("ok") def update_current_item(request, item_type=None, pk=None): if not item_type or not pk: if not request.is_ajax() and not request.method == "POST": raise Http404 item_type = request.POST["item"] if "value" in request.POST and "item" in request.POST: request.session[item_type] = request.POST["value"] else: request.session[item_type] = str(pk) request.session["SHORTCUT_SEARCH"] = "all" currents = get_current_items(request) # re-init when descending item are not relevant if ( item_type == "file" and currents["file"] and currents["operation"] and currents["operation"].associated_file != currents["file"] ): request.session["operation"] = "" currents["operation"] = None if ( item_type in ("operation", "file") and currents["contextrecord"] and ( not request.session.get("operation", None) or currents["contextrecord"].operation != currents["operation"] ) ): request.session["contextrecord"] = "" currents["contextrecord"] = None from archaeological_finds.models import Find if ( item_type in ("contextrecord", "operation", "file") and currents["find"] and ( not request.session.get("contextrecord", None) or not Find.objects.filter( downstream_treatment__isnull=True, base_finds__context_record__pk=request.session["contextrecord"], pk=currents["find"].pk, ).count() ) ): request.session["find"] = "" currents["find"] = None # re-init ascending item with relevant values if item_type == "find" and currents["find"]: from archaeological_context_records.models import ContextRecord q = ContextRecord.objects.filter(base_finds__find__pk=currents["find"].pk) if q.count(): currents["contextrecord"] = q.all()[0] request.session["contextrecord"] = str(currents["contextrecord"].pk) if item_type in ("find", "contextrecord") and currents["contextrecord"]: currents["operation"] = currents["contextrecord"].operation request.session["operation"] = str(currents["operation"].pk) if item_type in ("find", "contextrecord", "operation") and currents["operation"]: currents["file"] = currents["operation"].associated_file request.session["file"] = str(currents["file"].pk) if currents["file"] else None return HttpResponse("ok") def pin_search(request, item_type): if item_type.startswith("administrativeact"): item_type = "administrativeact" key = "pin-search-" + item_type if not item_type or not ( request.is_ajax() and request.method == "POST" and "value" in request.POST ): raise Http404 request.session[key] = request.POST["value"] if not request.POST["value"]: # empty all unpin(request, item_type, cascade=True) else: unpin(request, item_type) return HttpResponse("ok") def get_by_importer( request, slug, data_type="json", full=False, force_own=False, **dct ): q = models.ImporterType.objects.filter(slug=slug) if not q.count(): res = "" if data_type == "json": res = "{}" return HttpResponse(res, content_type="text/plain") importer = q.all()[0] importer_class = importer.get_importer_class() cols, col_names = importer.get_columns(importer_class=importer_class) if data_type == "csv" or dct.get("type", "") == "csv": obj_name = importer.name else: obj_name = importer_class.OBJECT_CLS.__name__.lower() return get_item(importer_class.OBJECT_CLS, "get_" + obj_name, obj_name, own_table_cols=cols)( request, data_type, full, force_own, col_names=col_names, **dct ) def autocomplete_person_permissive( request, person_types=None, attached_to=None, is_ishtar_user=None ): return autocomplete_person( request, person_types=person_types, attached_to=attached_to, is_ishtar_user=is_ishtar_user, permissive=True, ) def autocomplete_user(request): query = get_autocomplete_query(request, "ishtar_common", "person") if query: return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term") limit = request.GET.get("limit", 20) try: limit = int(limit) except ValueError: return HttpResponseBadRequest() for q in q.split(" "): qu = ( Q(ishtaruser__person__name__icontains=q) | Q(ishtaruser__person__surname__icontains=q) | Q(first_name__icontains=q) | Q(last_name__icontains=q) ) query = query & qu users = models.User.objects.filter(query).distinct()[:limit] values = [] for user in users: try: if user and user.ishtaruser: values.append({"id": user.pk, "value": str(user.ishtaruser)}) except models.User.ishtaruser.RelatedObjectDoesNotExist: pass data = json.dumps(values) return HttpResponse(data, content_type="text/plain") def autocomplete_ishtaruser(request): query = get_autocomplete_query(request, "ishtar_common", "person") if query is None: return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term", "") limit = request.GET.get("limit", 20) try: limit = int(limit) except ValueError: return HttpResponseBadRequest() for q in q.split(" "): qu = ( Q(person__name__unaccent__icontains=q) | Q(person__surname__unaccent__icontains=q) | Q(person__raw_name__unaccent__icontains=q) ) query = query & qu users = models.IshtarUser.objects.filter(query).distinct()[:limit] data = json.dumps([{"id": user.pk, "value": str(user)} for user in users]) return HttpResponse(data, content_type="text/plain") def autocomplete_person( request, person_types=None, attached_to=None, is_ishtar_user=None, permissive=False ): query = get_autocomplete_query(request, "ishtar_common", "person") if query is None: return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term") limit = request.GET.get("limit", 20) try: limit = int(limit) except ValueError: return HttpResponseBadRequest() for q in q.split(" "): qu = ( Q(name__unaccent__icontains=q) | Q(surname__unaccent__icontains=q) | Q(email__unaccent__icontains=q) | Q(attached_to__name__unaccent__icontains=q) ) if permissive: qu = qu | Q(raw_name__unaccent__icontains=q) query = query & qu if attached_to: query = query & Q(attached_to__pk__in=attached_to.split("_")) if person_types and str(person_types) != "0": try: typs = [int(tp) for tp in person_types.split("_") if tp] typ = models.PersonType.objects.filter(pk__in=typs).all() query = query & Q(person_types__in=typ) except (ValueError, ObjectDoesNotExist): pass if is_ishtar_user: query = query & Q(ishtaruser__isnull=False) persons = models.Person.objects.filter(query).distinct()[:limit] data = json.dumps( [{"id": person.pk, "value": str(person)} for person in persons if person] ) return HttpResponse(data, content_type="text/plain") def autocomplete_import(request): query = get_autocomplete_query(request, "ishtar_common", "import") if query is None: return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term") limit = request.GET.get("limit", 20) try: limit = int(limit) except ValueError: return HttpResponseBadRequest() for q in q.split(" "): query = query & (Q(name__unaccent__icontains=q) | Q(group__name__unaccent__icontains=q)) items = models.Import.objects.filter(query).distinct()[:limit] data = [{"id": item.pk, "value": item.name} for item in items if item] return HttpResponse(json.dumps(data), content_type="text/plain") def autocomplete_area(request): if not request.GET.get("term"): return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term") q = unicodedata.normalize("NFKD", q).encode("ascii", "ignore").decode() query = Q() for q in q.split(" "): extra = Q(label__icontains=q) query = query & extra limit = 20 areas = models.Area.objects.filter(query).distinct()[:limit] data = json.dumps( [{"id": area.pk, "value": str(area)} for area in areas] ) return HttpResponse(data, content_type="text/plain") def autocomplete_department(request): if not request.GET.get("term"): return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term") q = unicodedata.normalize("NFKD", q).encode("ascii", "ignore").decode() query = Q() for q in q.split(" "): extra = Q(label__icontains=q) | Q(number__istartswith=q) query = query & extra limit = 20 departments = models.Department.objects.filter(query).distinct()[:limit] data = json.dumps( [{"id": department.pk, "value": str(department)} for department in departments] ) return HttpResponse(data, content_type="text/plain") def autocomplete_town(request): if not request.GET.get("term"): return HttpResponse(content_type="text/plain") q = request.GET.get("term") q = unicodedata.normalize("NFKD", q).encode("ascii", "ignore").decode() query = Q() for q in q.split(" "): extra = Q(name__unaccent__icontains=q) if settings.COUNTRY == "fr": extra |= Q(numero_insee__istartswith=q) query &= extra limit = 20 towns = models.Town.objects.filter(query).distinct()[:limit] data = json.dumps([{"id": town.pk, "value": str(town)} for town in towns]) return HttpResponse(data, content_type="text/plain") def autocomplete_advanced_town(request, department_id=None, state_id=None): if not request.GET.get("term"): return HttpResponse(content_type="text/plain") q = request.GET.get("term") q = unicodedata.normalize("NFKD", q).encode("ascii", "ignore").decode() query = Q() for q in q.split(" "): extra = Q(name__icontains=q) if settings.COUNTRY == "fr": extra = extra | Q(numero_insee__istartswith=q) if not department_id: extra = extra | Q(departement__label__istartswith=q) query = query & extra if department_id: query = query & Q(departement__number__iexact=department_id) if state_id: query = query & Q(departement__state__number__iexact=state_id) limit = 20 towns = models.Town.objects.filter(query).distinct()[:limit] result = [] for town in towns: val = town.name if hasattr(town, "numero_insee"): val += " (%s)" % town.numero_insee result.append({"id": town.pk, "value": val}) data = json.dumps(result) return HttpResponse(data, content_type="text/plain") def autocomplete_document(request): query = get_autocomplete_query(request, "ishtar_common", "document") if query is None: return HttpResponse(content_type="text/plain") q = request.GET.get("term") q = unicodedata.normalize("NFKD", q).encode("ascii", "ignore").decode() fields = [ "title__icontains", "reference__icontains", "internal_reference__icontains", "isbn__icontains", "authors__person__cached_label__icontains", "authors_raw__icontains", ] for q in q.split(" "): qu = Q(**{fields[0]: q}) for field in fields[1:]: qu |= Q(**{field: q}) query = query & qu limit = 20 items = models.Document.objects.filter(query).exclude(title="").distinct()[:limit] data = json.dumps([{"id": item.pk, "value": str(item)} for item in items]) return HttpResponse(data, content_type="text/plain") def department_by_state(request, state_id=""): if not state_id: data = [] else: departments = models.Department.objects.filter(state__number=state_id) data = json.dumps( [ { "id": department.pk, "number": department.number, "value": str(department), } for department in departments ] ) return HttpResponse(data, content_type="text/plain") def autocomplete_organization(request, orga_type=None): query = get_autocomplete_query(request, "ishtar_common", "organization") if query is None: return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term") for q in q.split(" "): extra = Q(cached_label__unaccent__icontains=q) query = query & extra if orga_type: try: typs = [int(tp) for tp in orga_type.split("_") if tp] typ = models.OrganizationType.objects.filter(pk__in=typs).all() query = query & Q(organization_type__in=typ) except (ValueError, ObjectDoesNotExist): pass limit = 15 organizations = models.Organization.objects.filter(query).distinct()[:limit] data = json.dumps([{"id": org.pk, "value": str(org)} for org in organizations]) return HttpResponse(data, content_type="text/plain") def autocomplete_author(request): query = get_autocomplete_query(request, "ishtar_common", "author") if query is None: return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term") for q in q.split(" "): extra = ( Q(person__name__icontains=q) | Q(person__surname__icontains=q) | Q(person__email__icontains=q) | Q(author_type__label__icontains=q) ) query = query & extra limit = 15 authors = models.Author.objects.filter(query).distinct()[:limit] data = json.dumps([{"id": author.pk, "value": str(author)} for author in authors]) return HttpResponse(data, content_type="text/plain") def autocomplete_biographical_note(request): query = get_autocomplete_query(request, "ishtar_common", "person") if query is None: return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term", "") limit = request.GET.get("limit", 20) try: limit = int(limit) except ValueError: return HttpResponseBadRequest() for q in q.split(" "): qu = ( Q(last_name__unaccent__icontains=q) | Q(first_name__unaccent__icontains=q) | Q(denomination__unaccent__icontains=q) ) query = query & qu users = models.BiographicalNote.objects.filter(query).distinct()[:limit] data = json.dumps([{"id": user.pk, "value": str(user)} for user in users]) return HttpResponse(data, content_type="text/plain") new_person = new_qa_item(models.Person, forms.PersonForm, page_name=_("New person"), callback=get_person_gdpr_log) modify_person = modify_qa_item(models.Person, forms.PersonForm, callback=get_person_gdpr_log) detail_person = get_short_html_detail(models.Person) new_person_noorga = new_qa_item( models.Person, forms.NoOrgaPersonForm, page_name=_("New person"), callback=get_person_gdpr_log ) new_organization = new_qa_item( models.Organization, forms.OrganizationForm, page_name=_("New organization") ) show_organization = show_item(models.Organization, "organization") get_organization = get_item(models.Organization, "get_organization", "organization") modify_organization = modify_qa_item(models.Organization, forms.OrganizationForm) detail_organization = get_short_html_detail(models.Organization) new_author = new_qa_item(models.Author, forms.AuthorForm, page_name=_("New author")) show_person = show_item(models.Person, "person", callback=get_person_gdpr_log) get_person = get_item(models.Person, "get_person", "person", callback=get_person_gdpr_log) show_ishtaruser = show_item(models.IshtarUser, "ishtaruser", callback=get_ishtaruser_gdpr_log) show_biographical_note = show_item(models.BiographicalNote, "biographicalnote") new_biographical_note = new_qa_item( models.BiographicalNote, forms.BiographicalNoteForm, page_name=_("New biographical note") ) get_person_for_account = get_item( models.Person, "get_person", "person", own_table_cols=models.Person.TABLE_COLS_ACCOUNT, ) get_ishtaruser = get_item(models.IshtarUser, "get_ishtaruser", "ishtaruser") show_town = show_item(models.Town, "town") show_area = show_item(models.Area, "area") show_import = show_item(models.Import, "import") show_import_group = show_item(models.ImportGroup, "importgroup") ACTION_MODEL_DICT = { 'import': models.Import, 'account': models.IshtarUser, 'document': models.Document, 'person': models.Person, 'orga': models.Organization, 'organization': models.Organization, 'operation': apps.get_model("archaeological_operations", "Operation"), 'administrativact': apps.get_model( "archaeological_operations", "AdministrativeAct"), 'file': apps.get_model("archaeological_files", "File"), 'site': apps.get_model("archaeological_operations", "ArchaeologicalSite"), 'contextrecord': apps.get_model("archaeological_context_records", "ContextRecord"), 'record': apps.get_model("archaeological_context_records", "ContextRecord"), 'find': apps.get_model("archaeological_finds", "Find"), 'treatment': apps.get_model("archaeological_finds", "Treatment"), 'treatmentfle': apps.get_model("archaeological_finds", "TreatmentFile"), 'treatmentfile': apps.get_model("archaeological_finds", "TreatmentFile"), 'exhibition': apps.get_model("archaeological_finds", "Exhibition"), 'container': apps.get_model("archaeological_warehouse", "Container"), 'warehouse': apps.get_model("archaeological_warehouse", "Warehouse"), } def action(request, action_slug, obj_id=None, *args, **kwargs): """ Action management """ if not check_permission(request, action_slug): not_permitted_msg = ugettext("Operation not permitted.") if obj_id: model_name = action.split('_')[0].split("-")[0].split("/")[0] if model_name not in ACTION_MODEL_DICT: print(f"ishtar_common/views - action: {model_name} not in ACTION_MODEL_DICT") return HttpResponse(not_permitted_msg) try: obj = ACTION_MODEL_DICT[model_name].objects.get(pk=obj_id) except ACTION_MODEL_DICT[model_name].DoesNotExist: return HttpResponse(not_permitted_msg) if not check_permission(request, action_slug, obj): return HttpResponse(not_permitted_msg) else: return HttpResponse(not_permitted_msg) request.session["CURRENT_ACTION"] = action_slug dct = {} globals_dct = globals() if action_slug in globals_dct: return globals_dct[action_slug](request, dct, obj_id, *args, **kwargs) return render(request, "index.html", dct) def reset_wizards(request): # dynamically execute each reset_wizards of each ishtar app for app in settings.INSTALLED_APPS: if app == "ishtar_common": # no need for infinite recursion continue try: module = __import__(app) except ImportError: continue if hasattr(module, "views") and hasattr(module.views, "reset_wizards"): module.views.reset_wizards(request) return redirect(reverse("start")) ITEM_PER_PAGE = 20 def merge_action(model, form, key, name_key="name", callback=None): def merge(request, page=1): current_url = key + "_merge" if not page: page = 1 page = int(page) FormSet = modelformset_factory( model.merge_candidate.through, form=form, formset=forms.MergeFormSet, extra=0, ) q = model.merge_candidate.through.objects count = q.count() max_page = count // ITEM_PER_PAGE if count % ITEM_PER_PAGE != 0: max_page += 1 context = { "current_url": current_url, "current_page": page, "max_page": max_page, } if page < max_page: context["next_page"] = page + 1 if page > 1: context["previous_page"] = page - 1 item_nb = (page - 1) * ITEM_PER_PAGE item_nb_1 = item_nb + ITEM_PER_PAGE from_key = "from_" + key to_key = "to_" + key queryset = q.all().order_by(from_key + "__" + name_key)[item_nb:item_nb_1] FormSet.from_key = from_key FormSet.to_key = to_key if request.method == "POST": context["formset"] = FormSet(request.POST, queryset=queryset) if context["formset"].is_valid(): context["formset"].merge(callback=callback, request=request) return redirect(reverse(current_url, kwargs={"page": page})) else: context["formset"] = FormSet(queryset=queryset) return render(request, "ishtar/merge_" + key + ".html", context) return merge def regenerate_external_id(request): if not request.user.ishtaruser.is_ishtaradmin: raise Http404() model = None for key in request.GET: model = get_model_by_slug(key) if model: break if not model: raise Http404() try: item = model.objects.get(pk=request.GET[model.SLUG]) except model.DoesNotExist: raise Http404() item.regenerate_external_id() return HttpResponseRedirect(reverse("success")) def regenerate_permissions(request, user_id): if not request.user.ishtaruser.is_ishtaradmin: raise Http404() try: ishtaruser = models.IshtarUser.objects.get(pk=user_id) except models.IshtarUser.DoesNotExist: raise Http404() ishtaruser.generate_permission() return HttpResponseRedirect(reverse("success")) person_merge = merge_action(models.Person, forms.MergePersonForm, "person", callback=get_person_gdpr_log) organization_merge = merge_action( models.Organization, forms.MergeOrganizationForm, "organization" ) class IshtarMixin: page_name = "" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["page_name"] = self.page_name return context class JSONResponseMixin: """ Render a JSON response. """ def render_to_response(self, context, **response_kwargs): return JsonResponse(self.get_data(context), **response_kwargs) def get_data(self, context): return context class LoginRequiredMixin(object): @method_decorator(login_required) def dispatch(self, request, *args, **kwargs): return super(LoginRequiredMixin, self).dispatch(request, *args, **kwargs) class AdminLoginRequiredMixin(LoginRequiredMixin): def dispatch(self, request, *args, **kwargs): if not request.user.is_staff: return redirect(reverse("start")) return super(AdminLoginRequiredMixin, self).dispatch(request, *args, **kwargs) class ChangelogView(IshtarMixin, LoginRequiredMixin, TemplateView): template_name = "ishtar/changelog.html" page_name = _("Changelog") current_url = "changelog" def update_read_version(self): if not self.request.user or not hasattr(self.request.user, "ishtaruser") \ or not self.request.user.ishtaruser: return cache_key = f"{settings.PROJECT_SLUG}-news-version" current_version = cache.get(cache_key) if not current_version: return ishtar_user = models.IshtarUser.objects.get(pk=self.request.user.ishtaruser.pk) if not ishtar_user.latest_news_version \ or ishtar_user.latest_news_version != current_version: ishtar_user.latest_news_version = current_version ishtar_user.save() def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) changelog_dir = os.path.join( settings.SHARE_BASE_PATH, "changelog", settings.LANGUAGE_CODE.split('-')[0] ) if not os.path.exists(changelog_dir): raise Http404() page_number = int(kwargs.get("page", 0) or 0) or 1 if page_number == 1: self.update_read_version() list_dir = list([fle for fle in os.listdir(changelog_dir) if fle.startswith("changelog_") and fle.endswith(".md")]) current_file = "" for idx, fle in enumerate(reversed(sorted(list_dir))): if (idx + 1) == page_number: current_file = fle break if not current_file: raise Http404() changelog_file = os.path.join(changelog_dir, current_file) VERSION_RE = re.compile(r"

(.*) - (\d{4})-(\d{2})-(\d{2})

") with open(changelog_file, "r", encoding="utf-8") as changelog: changelog_base = markdown(changelog.read()) changelog_full = "" for line in changelog_base.split("\n"): line = line.strip() if not line: continue #

v4.0.42 - 2023-01-25

m = VERSION_RE.match(line) if not m: changelog_full += line continue version, year, month, day = m.groups() try: d = datetime.date(int(year), int(month), int(day)) except ValueError: changelog_full += line continue changelog_full += f"
" changelog_full += f"{human_date(d)}" changelog_full += f"{version}
" context["changelog"] = changelog_full if page_number > 1: context["next"] = page_number - 1 if len(list_dir) > page_number: context["previous"] = page_number + 1 return context class ProfileEdit(LoginRequiredMixin, FormView): template_name = "ishtar/forms/profile.html" form_class = forms.ProfilePersonForm def dispatch(self, request, *args, **kwargs): if kwargs.get("pk"): try: profile = models.UserProfile.objects.get( pk=kwargs["pk"], person__ishtaruser__user_ptr=request.user ) except models.UserProfile.DoesNotExist: # cannot edit a profile that is not yours... return redirect(reverse("index")) current_changed = False # the profile edited became the current profile if not profile.current: current_changed = True profile.current = True # force post-save in case of many current profile profile.save() if current_changed: clean_session_cache(request.session) if "EXTERNAL_SOURCES" in request.session: del request.session["EXTERNAL_SOURCES"] return super(ProfileEdit, self).dispatch(request, *args, **kwargs) def get_success_url(self): return reverse("profile") def get_form_kwargs(self): kwargs = super(ProfileEdit, self).get_form_kwargs() kwargs["user"] = self.request.user return kwargs def get_context_data(self, **kwargs): data = super(ProfileEdit, self).get_context_data(**kwargs) data["page_name"] = _("Current profile") return data def form_valid(self, form): form.save(self.request.session) return HttpResponseRedirect(self.get_success_url()) class DynamicModelView: def get_model(self, kwargs): app = kwargs.get("app").replace("-", "_") model_name = "".join( [part.capitalize() for part in kwargs.get("model_name").split("-")] ) try: return apps.get_model(app, model_name) except LookupError: raise Http404() class QRCodeView(DynamicModelView, IshtarMixin, LoginRequiredMixin, View): def get(self, request, *args, **kwargs): model = self.get_model(kwargs) try: item = model.objects.get(pk=kwargs.get("pk")) except model.DoesNotExist: raise Http404() if not hasattr(item, "qrcode"): raise Http404() if not item.qrcode or not item.qrcode.name or not os.path.exists( item.qrcode.path): item.generate_qrcode(request=self.request) if not item.qrcode or not item.qrcode.name: # generation has failed raise Http404() with open(settings.MEDIA_ROOT + os.sep + item.qrcode.name, "rb") as f: return HttpResponse(f.read(), content_type="image/png") class GenerateView(IshtarMixin, LoginRequiredMixin, View): def get_template(self, template_slug): try: return models.DocumentTemplate.objects.get( slug=template_slug, available=True, for_labels=False ) except models.DocumentTemplate.DoesNotExist: raise Http404() def publish(self, tpl, objects): return tpl.publish(objects[0]) def get_items(self, request, model): item_pk = self.kwargs.get("item_pk") try: object = model.objects.get(pk=item_pk) if not object.can_view(request): raise Http404() except model.DoesNotExist: raise Http404() return [object] def get(self, request, *args, **kwargs): template_slug = kwargs.get("template_slug") tpl = self.get_template(template_slug) app, __, model_name = tpl.associated_model.klass.split(".") model = apps.get_model(app, model_name) objects = self.get_items(request, model) if not objects: return HttpResponse(content_type="text/plain") document = self.publish(tpl, objects) if not document: return HttpResponse(content_type="text/plain") base_extension = tpl.template.name.split(".")[-1].lower() extension = tpl.export_format if not extension: extension = base_extension if not settings.USE_LIBREOFFICE and extension not in ("ods", "odt"): return HttpResponseBadRequest("Invalid format type - need LibreOffice daemon") # bad configuration content_type = models.EXPORT_FORMATS_CONTENT_TYPE.get( extension, "application/vnd.oasis.opendocument.text" ) if tpl.export_format and convert_document: document = convert_document(document, tpl.export_format) with open(document, "rb") as f: response = HttpResponse(f.read(), content_type=content_type) response["Content-Disposition"] = "attachment; filename={}".format( document.split(os.sep)[-1] ) return response # TODO def generate_label_view(): pass class GenerateLabelView(GenerateView): def get_template(self, template_slug): try: return models.DocumentTemplate.objects.get( slug=template_slug, available=True, for_labels=True ) except models.DocumentTemplate.DoesNotExist: raise Http404() def publish(self, tpl, objects): return tpl.publish_labels(objects) def get_items(self, request, model): # rights are managed by get_item get_list = get_item(model, None, model.SLUG, own_table_cols=["id"], search_form=model.get_default_search_form())( request, no_link=True, no_limit=True ) item_list = json.loads(get_list.content.decode("utf-8"))["rows"] try: objects = [model.objects.get(pk=int(dct["id"])) for dct in item_list] except model.DoesNotExist: raise Http404() return objects class ExportMediaView(IshtarMixin, LoginRequiredMixin, View): def get_exporter(self, slug, request): try: exporter = models.MediaExporter.objects.get( slug=slug, available=True ) except models.DocumentTemplate.DoesNotExist: raise Http404() if not exporter.is_available(request): raise PermissionDenied() return exporter def get_item(self, request, model): item_pk = self.kwargs.get("item_pk") try: obj = model.objects.get(pk=item_pk) if not obj.can_view(request): raise PermissionDenied() except model.DoesNotExist: raise Http404() return obj def get(self, request, *args, **kwargs): slug = kwargs.get("exporter") exporter = self.get_exporter(slug, request) app, __, model_name = exporter.associated_model.klass.split(".") model = apps.get_model(app, model_name) obj = self.get_item(request, model) if not obj: return HttpResponse(content_type="text/plain") with tempfile.TemporaryDirectory() as tmpdir: export = exporter.export(obj, request.user.ishtaruser, tmpdir=tmpdir) if not export: return HttpResponse(content_type="text/plain") with open(export, "rb") as f: content_type = "application/zip" response = HttpResponse(f.read(), content_type=content_type) response["Content-Disposition"] = "attachment; filename={}".format( export.split(os.sep)[-1] ) return response class BaseImportView(IshtarMixin, LoginRequiredMixin): template_name = "ishtar/form.html" model = models.Import form_class = forms.NewImportForm def get_success_url(self): if self.object.has_pre_import_form: return reverse("import_pre_import_form", args=[self.object.pk]) return reverse("current_imports") def get_form_kwargs(self): kwargs = super().get_form_kwargs() kwargs["user"] = self.request.user return kwargs def form_valid(self, form): user = models.IshtarUser.objects.get(pk=self.request.user.pk) self.object = form.save(user=user) return HttpResponseRedirect(self.get_success_url()) class NewImportView(BaseImportView, CreateView): page_name = _("Import: create (table)") class ImportPermissionMixin: permission_full = "ishtar_common.change_import" permission_own = "ishtar_common.change_own_import" def dispatch(self, request, *args, **kwargs): import_pk = self.kwargs["pk"] user = request.user if not user or not user.ishtaruser: return redirect("/") ishtaruser = user.ishtaruser model = models.ImportGroup if self.kwargs.get("group", None) else models.Import q = model.query_can_access(user, perm=self.permission_full).filter(pk=import_pk) if not ishtaruser.has_permission("ishtaradmin") and \ not ishtaruser.has_permission(self.permission_full): if not ishtaruser.has_permission(self.permission_own): return redirect("/") q = q.filter(Q(importer_type__users__pk=user.ishtaruser.pk)) if not q.count(): return redirect("/") returned = super().dispatch(request, *args, **kwargs) return returned class EditImportView(ImportPermissionMixin, BaseImportView, UpdateView): page_name = _("Import: edit (table)") class NewImportGISView(NewImportView): template_name = "ishtar/form.html" model = models.Import form_class = forms.NewImportGISForm page_name = _("Import: create (GIS)") class NewImportGroupView(NewImportView): template_name = "ishtar/form.html" model = models.Import form_class = forms.NewImportGroupForm page_name = _("Import: create (group)") class ImportPreFormView(IshtarMixin, LoginRequiredMixin, FormView): template_name = "ishtar/form.html" form_class = forms.PreImportForm page_name = _("Import: pre-form") def get_success_url(self): return reverse("current_imports") def dispatch(self, request, *args, **kwargs): self.user = models.IshtarUser.objects.get(pk=self.request.user.pk) try: self.import_item = models.Import.objects.get(pk=self.kwargs["import_id"]) except models.Import.DoesNotExist: raise Http404() if not self.import_item.is_available(self.user): raise Http404() if not self.import_item.importer_type.columns.filter(col_number__lte=0).count(): # no pre-form fields raise Http404() return super().dispatch(request, *args, **kwargs) def get_form_kwargs(self): kwargs = super().get_form_kwargs() kwargs["import_item"] = self.import_item return kwargs def get_context_data(self, **kwargs): data = super().get_context_data(**kwargs) data["page_name"] = self.page_name return data def form_valid(self, form): self.object = form.save() return HttpResponseRedirect(self.get_success_url()) def get_permissions_for_actions(user, imprt, owns, permissions): can_view, can_edit, can_delete = False, False, False is_own = None if permissions["can_edit_own"] or permissions["can_delete_own"] \ or permissions["can_view_own"]: # need to check owner if imprt.importer_type_id not in owns: # "is_own" only query once by importer type owns[imprt.importer_type.pk] = imprt.importer_type.is_own(user.ishtaruser) is_own = owns[imprt.importer_type_id] if permissions["can_view_all"] or (permissions["can_view_own"] and is_own): can_view = True if permissions["can_edit_all"] or (permissions["can_edit_own"] and is_own): can_edit = True if permissions["can_delete_all"] or (permissions["can_delete_own"] and is_own): can_delete = True return can_view, can_edit, can_delete class ImportListView(IshtarMixin, LoginRequiredMixin, ListView): template_name = "ishtar/import_list.html" model = models.Import page_name = _("Current imports") current_url = "current_imports" pagination = False page_step = 20 def _queryset_filter(self, query): return query.exclude(state="AC") def get_queryset(self): user = self.request.user if not user.pk or not user.ishtaruser: raise Http404() q1 = self._queryset_filter( self.model.query_can_access( user, ["ishtar_common.view_import", "ishtar_common.change_import"] ) ) q1 = q1.filter(group__isnull=True).order_by("-end_date", "-creation_date", "-pk") q2 = self._queryset_filter( models.ImportGroup.query_can_access( user, ["ishtar_common.view_import", "ishtar_common.change_import"] ) ) q2 = q2.order_by("-end_date", "-creation_date", "-pk") values = list(reversed( sorted( list(q1) + list(q2), key=lambda x: (x.end_date or x.creation_date) ) )) permissions = models.Import.get_permissions_for_actions(user) imports = [] owns = {} for imprt in values: can_view, can_edit, can_delete = get_permissions_for_actions( user, imprt, owns, permissions ) if not can_view: continue imprt.action_list = imprt.get_actions( can_edit=can_edit, can_delete=can_delete ) imports.append(imprt) self.imports_len = len(imports) self.current_page = 0 if self.imports_len > self.page_step and self.pagination: self.current_page = int(self.request.GET.get("page", 1)) self.page_number = int((self.imports_len - 1) / self.page_step) + 1 min_page = (self.current_page - 1) * self.page_step max_page = (self.current_page * self.page_step) imports = imports[min_page:max_page] return imports def post(self, request, *args, **kwargs): permissions = models.Import.get_permissions_for_actions(request.user) owns = {} for field in request.POST: if not field.startswith("import-action-") or not request.POST[field]: continue model = models.Import is_group = field.startswith("import-action-group-") if is_group: model = models.ImportGroup # prevent forged forms try: imprt = model.objects.get(pk=int(field.split("-")[-1])) except (models.Import.DoesNotExist, ValueError): continue can_view, can_edit, can_delete = get_permissions_for_actions( request.user, imprt, owns, permissions ) action = request.POST[field] if can_delete and action == "D": url = "import_group_delete" if is_group else "import_delete" return HttpResponseRedirect( reverse(url, kwargs={"pk": imprt.pk}) ) elif can_edit and action == "ED": url = "edit_import_group" if is_group else "edit_import" return HttpResponseRedirect( reverse(url, kwargs={"pk": imprt.pk}) ) elif can_edit and action == "A": imprt.initialize( user=self.request.user.ishtaruser, session_key=request.session.session_key, ) elif can_edit and action == "I": if settings.USE_BACKGROUND_TASK: imprt.delayed_importation(request, request.session.session_key) else: try: imprt.importation() except ImporterError as e: imprt.state = "FE" imprt.end_date = datetime.datetime.now() imprt.save() put_session_message( request.session.session_key, f"{imprt} - {e}", "warning", ) elif can_edit and action == "CH": if settings.USE_BACKGROUND_TASK: imprt.delayed_check_modified(request.session.session_key) else: imprt.check_modified() elif can_edit and action == "IS": if imprt.current_line is None: imprt.current_line = imprt.skip_lines imprt.save() return HttpResponseRedirect( reverse( "import_step_by_step", args=[imprt.pk, imprt.current_line + 1] ) ) elif can_edit and action == "AC": imprt.archive() elif can_edit and action in ("F", "FE"): imprt.unarchive(action) return HttpResponseRedirect(reverse(self.current_url)) def _get_page_range(self): page_range = [] # same algo as ishtar_common/static/js/ishtar.js render_gallery # TODO: harmonization... page_range.append(_("Previous")) page_total = (self.imports_len // self.page_step) + 1 page_range.append(1) if self.current_page < 5: for idx_page in range(2, 6): if idx_page > page_total: break page_range.append(idx_page) if page_total > 5: page_range.append("...") page_range.append(page_total) else: page_range.append("...") if page_total > (self.current_page - 2): page_range.append(self.current_page - 3) if page_total > (self.current_page - 3): page_range.append(self.current_page - 2) page_range.append(self.current_page - 1) page_range.append(self.current_page) if page_total > self.current_page: page_range.append(self.current_page + 1) if page_total < (self.current_page + 3): for idx_page in range(self.current_page + 2, page_total + 1): page_range.append(idx_page) else: page_range.append("...") page_range.append(page_total) page_range.append(_("Next")) return page_range def get_context_data(self, **kwargs): dct = super().get_context_data(**kwargs) if self.imports_len > self.page_step and self.pagination: dct["current_page"] = self.current_page dct["page_range"] = self._get_page_range() add_import_perm = self.request.user.ishtaruser.has_permission( "ishtar_common.add_import" ) import_type_table = models.ImporterType.objects.filter( available=True, is_import=True, type='tab' ) import_type_gis = models.ImporterType.objects.filter( available=True, is_import=True, type='gis' ) import_type_group = models.ImporterGroup.objects.filter(available=True) ishtaruser = self.request.user.ishtaruser if not add_import_perm and ishtaruser.has_permission( "ishtar_common.add_own_import"): import_type_table = import_type_table.filter( users__pk=self.request.user.ishtaruser.pk ) import_type_gis = import_type_gis.filter( users__pk=self.request.user.ishtaruser.pk ) import_type_group = import_type_group.filter( users__pk=self.request.user.ishtaruser.pk ) add_import_perm = True has_import_table, has_import_gis, has_import_group = False, False, False if add_import_perm: if import_type_table.count(): has_import_table = True if import_type_gis.count(): has_import_gis = True if import_type_group.count(): has_import_group = True dct.update({ "has_import_table": has_import_table, "has_import_gis": has_import_gis, "has_import_group": has_import_group, "can_create_import": has_import_table or has_import_gis or has_import_group, }) return dct class ImportStepByStepView(IshtarMixin, LoginRequiredMixin, TemplateView): template_name = "ishtar/import_step_by_step.html" page_name = _("Import step by step") current_url = "import_step_by_step" def get_import(self): try: self.imprt_obj = models.Import.objects.get(pk=int(self.kwargs["pk"])) except (models.Import.DoesNotExist, ValueError): raise Http404 if not self.request.user.is_superuser: # user can only edit his own imports user = models.IshtarUser.objects.get(pk=self.request.user.pk) if self.imprt_obj.user != user: raise Http404 if not hasattr(self, "current_line_number"): self.current_line_number = int(self.kwargs["line_number"]) - 1 def update_csv(self, request): prefix = "col-" submited_line = [ (int(k[len(prefix) :]), request.POST[k]) for k in request.POST if k.startswith(prefix) ] updated_line = [value for line, value in sorted(submited_line)] filename = self.imprt_obj.imported_file.path with open(filename, "r", encoding=self.imprt_obj.encoding) as f: reader = csv.reader(f) lines = [] for idx, line in enumerate(reader): if idx == self.current_line_number: line = updated_line line = [ v.encode(self.imprt_obj.encoding, errors="replace") for v in line ] lines.append(line) with open(filename, "w") as f: writer = csv.writer(f, **CSV_OPTIONS) writer.writerows(lines) def import_line(self, request, *args, **kwargs): try: self.imprt, data = self.imprt_obj.importation( line_to_process=self.current_line_number, return_importer_and_data=True ) except IOError as e: self.errors = [str(e)] return super(ImportStepByStepView, self).get(request, *args, **kwargs) if self.imprt_obj.get_number_of_lines() >= self.current_line_number: self.current_line_number += 1 else: self.current_line_number = None self.imprt_obj.current_line = self.current_line_number self.imprt_obj.save() return self.current_line_number def post(self, request, *args, **kwargs): if not request.POST or request.POST.get("valid", None) not in ( "change-csv", "import", "change-page", ): return self.get(request, *args, **kwargs) self.get_import() if request.POST.get("valid") == "change-page": return HttpResponseRedirect( reverse( "import_step_by_step", args=[self.imprt_obj.pk, request.POST.get("line-to-go", None)], ) ) if request.POST.get("valid") == "change-csv": self.update_csv(request) return self.get(request, *args, **kwargs) if not self.import_line(request, *args, **kwargs): return HttpResponseRedirect(reverse("current_imports")) else: return HttpResponseRedirect( reverse( "import_step_by_step", args=[self.imprt_obj.pk, self.current_line_number + 1], ) ) def get(self, request, *args, **kwargs): self.get_import() self.imprt = None self.errors, self.new_data = None, None try: self.imprt, data = self.imprt_obj.importation( simulate=True, line_to_process=self.current_line_number, return_importer_and_data=True, ) except IOError as e: self.errors = [None, None, str(e)] return super(ImportStepByStepView, self).get(request, *args, **kwargs) if not data or not data[0]: self.errors = self.imprt.errors if not self.errors: self.errors = [("", "", _("No data provided"))] else: self.new_data = data[:] return super(ImportStepByStepView, self).get(request, *args, **kwargs) def get_pagination(self, dct): pagination_step = 5 only_modified = not self.kwargs.get("all_pages", False) dct["all"] = not only_modified dct["import_url"] = ( "import_step_by_step" if only_modified else "import_step_by_step_all" ) line_nb = self.imprt_obj.get_number_of_lines() total_line_nb = self.imprt_obj.skip_lines + line_nb delta = 0 already_imported = [] if self.imprt_obj.imported_line_numbers: already_imported = self.imprt_obj.imported_line_numbers.split(",") changes = [] if self.imprt_obj.changed_line_numbers: changes = self.imprt_obj.changed_line_numbers.split(",") dct["page_is_last"] = self.current_line_number == line_nb # label, number, enabled, is_imported, has_changes dct["page_numbers"] = [] # first pass for the delta current = 0 for idx in range(self.imprt_obj.skip_lines, total_line_nb): imported = str(idx) in already_imported changed = str(idx) in changes if only_modified and (imported or not changed): continue current += 1 if idx == self.current_line_number - 1: delta = int(current / pagination_step) current, has_next, previous = 0, False, None for idx in range(self.imprt_obj.skip_lines, total_line_nb): if current >= ((delta + 1) * pagination_step): has_next = idx break imported = str(idx) in already_imported changed = str(idx) in changes if only_modified and (imported or not changed): continue current += 1 if current <= (delta * pagination_step): previous = idx continue nb = idx + 1 dct["page_numbers"].append((nb, nb, True, imported, changed)) if previous: dct["page_numbers"].insert( 0, (_("Previous"), previous + 1, True, False, True) ) else: dct["page_numbers"].insert( 0, (_("Previous"), self.imprt_obj.skip_lines, False, False, True) ) if has_next: dct["page_numbers"].append((_("Next"), has_next + 1, True, False, True)) else: dct["page_numbers"].append((_("Next"), total_line_nb, False, False, True)) def get_context_data(self, **kwargs): dct = super(ImportStepByStepView, self).get_context_data(**kwargs) dct["import"] = self.imprt_obj dct["line_number_displayed"] = self.current_line_number + 1 dct["line_is_imported"] = self.imprt_obj.line_is_imported( self.current_line_number ) self.get_pagination(dct) dct["errors"] = self.errors if self.errors: if self.imprt.current_csv_line: headers = [ f.label if f else _("Not imported") for f in self.imprt.get_formaters() ] dct["values"] = zip( range(1, len(headers) + 1), headers, self.imprt.current_csv_line ) return dct headers, self.path_to_column, interpreted_values = [], {}, [] for idx, formater in enumerate(self.imprt.get_formaters()): if not formater: headers.append(_("Not imported")) interpreted_values.append("–") continue lbl = formater.label if formater.comment: lbl += ' ' ' {} {} ' "".format(col, _("Col. "), col) ) value_dct[label] = dct[k] return value_dct def list_to_html(self, lst): if not lst: return _("* empty *") return ( "
  • " + "
  • ".join( [self.get_value(item) for item in lst] ) + "
" ) def get_value(self, item): if hasattr(item, "SHOW_URL"): return "{}{}".format(str(item), simple_link_to_window(item)) if hasattr(item, "explicit_label"): return item.explicit_label if item in (None, [], [None]): return _("* empty *") if isinstance(item, list): return self.list_to_html(item) return str(item) class ImportListTableView(ImportListView): template_name = "ishtar/import_table.html" current_url = "current_imports_table" def get_context_data(self, **kwargs): dct = super(ImportListTableView, self).get_context_data(**kwargs) dct["AJAX"] = True dct["MESSAGES"] = [] request = self.request if "messages" in request.session and request.session["messages"]: for message, message_type in request.session["messages"]: dct["MESSAGES"].append((message, message_type)) request.session["messages"] = [] if ( "current_import_id" in request.session and request.session["current_import_id"] ): dct["refreshed_pks"] = request.session.pop("current_import_id") return dct class ImportOldListView(ImportListView): page_name = _("Old imports") current_url = "old_imports" pagination = True def _queryset_filter(self, query): return query.filter(state="AC") def get_context_data(self, **kwargs): data = super().get_context_data(**kwargs) data["ARCHIVE_PAGE"] = True return data def import_get_status(request, current_right=None): response = {"group": [], "import": []} import_ids = request.POST.getlist("items[]", []) for import_id in import_ids: model = models.Import key = "import" if import_id.startswith("group-"): model = models.ImportGroup key = "group" idx = import_id.split("-")[-1] q = model.objects.filter(pk=idx) if not q.count(): continue item = q.all()[0] item_dct = { "id": item.id, "full_id": ("group-" if key == "group" else "") + str(item.id), "state": item.state, "status": str(item.status), "has_error": item.has_error } if key == "import": if item.current_line and not item.number_of_line: item.get_number_of_lines() item_dct.update({ "current_line": item.current_line, "number_of_line": item.number_of_line, "progress_percent": item.progress_percent, }) permissions = models.Import.get_permissions_for_actions(request.user) can_view, can_edit, can_delete = get_permissions_for_actions( request.user, item, {}, permissions ) item_dct["actions"] = [ (key, str(lbl)) for key, lbl in item.get_actions(can_edit=can_edit, can_delete=can_delete) ] response[key].append(item_dct) data = json.dumps(response) return HttpResponse(data, content_type="application/json") class ImportMatchView(ImportPermissionMixin, IshtarMixin, LoginRequiredMixin, ModelFormSetView): template_name = "ishtar/formset_import_match.html" model = models.TargetKey page_name = _("Link unmatched items") factory_kwargs = { "extra": 0, } form_class = forms.TargetKeyForm formset_class = forms.TargetKeyFormset max_fields = 250 permission_full = "change_import" permission_own = "change_own_import" def get_formset_kwargs(self): kwargs = super().get_formset_kwargs() kwargs["user"] = self.request.user return kwargs def full_queryset(self): return self.model.objects.filter( is_set=False, associated_import=self.kwargs["pk"] ) def get_queryset(self): return self.full_queryset()[: self.max_fields] def get_context_data(self, **kwargs): data = super().get_context_data(**kwargs) total = self.full_queryset().count() if total > self.max_fields: data["MAX_FIELDS_REACHED"] = self.max_fields data["TOTAL"] = total return data def get_success_url(self): return reverse("import_link_unmatched", args=[self.kwargs["pk"]]) class ImportDeleteView(ImportPermissionMixin, IshtarMixin, LoginRequiredMixin, DeleteView): template_name = "ishtar/import_delete.html" model = models.Import page_name = _("Delete import") permission_full = "delete_import" permission_own = "delete_own_import" def get_success_url(self): return reverse("current_imports") class ImportGroupDeleteView(ImportPermissionMixin, IshtarMixin, LoginRequiredMixin, DeleteView): template_name = "ishtar/import_delete.html" model = models.ImportGroup page_name = _("Delete import") permission_full = "delete_import" permission_own = "delete_own_import" def dispatch(self, request, *args, **kwargs): self.kwargs['group'] = True return super().dispatch(request, *args, **kwargs) def get_success_url(self): return reverse("current_imports") class ImportCSVView(ImportPermissionMixin, IshtarMixin, LoginRequiredMixin, TemplateView): template_name = "ishtar/blocks/view_import_csv.html" ATTRIBUTES = { "source": "get_imported_values", "error": "error_file", "result": "result_file", "match": "match_file" } TITLES = { "source": ("fa fa-file-text-o", _("Source")), "error": ("text-danger fa fa-exclamation-triangle", _("Error")), "result": ("fa fa-th", _("Result")), "match": ("fa fa-arrows-h", _("Match")), } permission_full = "ishtar_common.view_import" permission_own = "ishtar_common.view_own_import" def get(self, request, *args, **kwargs): user = self.request.user if not user.pk: raise Http404() self.is_group = kwargs.get("group", None) model = models.ImportGroup if self.is_group else models.Import q = model.query_can_access( self.request.user, perm=self.permission_full ).filter(pk=kwargs.get("pk", -1)) if not q.count(): raise Http404() self.import_item = q.all()[0] attribute = self.ATTRIBUTES[kwargs["target"]] self.csv_file = getattr(self.import_item, attribute) if callable(self.csv_file): self.csv_file = self.csv_file() if not self.csv_file: raise Http404() return super().get(request, *args, **kwargs) def get_context_data(self, **kwargs): data = super().get_context_data(**kwargs) encoding = "utf-8" if self.kwargs["target"] == "source" and self.import_item.encoding: encoding = self.import_item.encoding data["icon"], data["target"] = self.TITLES[self.kwargs["target"]] data["title"] = str(self.import_item) line_errors = [] has_line_errors = False if self.kwargs["target"] == "error": has_line_errors = True q = models.ImportLineError.objects.filter(import_item=self.import_item) if not q.count(): self.import_item.parse_error_file() line_errors = models.ImportLineError.objects.filter( import_item=self.import_item).values_list("pk", "ignored") data["has_line_errors"] = has_line_errors data["content"] = [] header = [] delimiter = "," if self.kwargs["target"] == "source": delimiter = self.import_item.csv_sep if not self.is_group: q = models.ImporterColumn.objects.filter(importer_type=self.import_item.importer_type).order_by("col_number") cols = list(q.values_list("col_number", "label")) col_dict = dict(cols) if cols: for idx in range(cols[-1][0]): if (idx + 1) in col_dict: header.append(col_dict[idx + 1]) else: header.append("-") data["header"] = header with open(self.csv_file.path, "r", encoding=encoding) as f: reader = csv.reader(f, delimiter=delimiter) for idx, line in enumerate(reader): if self.kwargs["target"] == "source": if idx < self.import_item.skip_lines: if not header: data["header"] = line continue elif not idx: data["header"] = line continue if line_errors and len(line_errors) >= idx: line = [line_errors[idx - 1]] + line data["content"].append(line) data["window_id"] = "csv-view-" + (self.kwargs.get("group", "") or "") + str(self.import_item.pk) data["import_id"] = self.import_item.pk return data def line_error(request, line_id, current_right=None): """ Set or unset ignored state of a csv error file """ user = request.user if not user.pk: raise Http404() q = models.ImportLineError.objects.filter(pk=line_id) if not q.count(): return line = q.all()[0] q = models.Import.query_can_access( request.user, perm="ishtar_common.change_import" ).filter(pk=line.import_item_id) if not q.count(): raise Http404() line.ignored = not line.ignored line.save() return HttpResponse(content_type="text/plain") class PersonCreate(LoginRequiredMixin, CreateView): model = models.Person form_class = forms.BasePersonForm template_name = "ishtar/person_form.html" def form_valid(self, form): returned = super().form_valid(form) get_person_gdpr_log("new_item", self.request, None, self.model.objects.filter(pk=self.object.pk)) return returned def get_success_url(self): return reverse("person_edit", args=[self.object.pk]) class PersonEdit(LoginRequiredMixin, UpdateView): model = models.Person form_class = forms.BasePersonForm template_name = "ishtar/person_form.html" def form_valid(self, form): returned = super().form_valid(form) get_person_gdpr_log("modify_item", self.request, None, self.model.objects.filter(pk=self.object.pk)) return returned def get_success_url(self): return reverse("person_edit", args=[self.object.pk]) class ManualMergeMixin(object): def form_valid(self, form): self.items = form.get_items() return super(ManualMergeMixin, self).form_valid(form) def get_success_url(self): return reverse( self.redir_url, args=["_".join([str(item.pk) for item in self.items])] ) class PersonManualMerge(ManualMergeMixin, IshtarMixin, LoginRequiredMixin, FormView): form_class = forms.PersonMergeFormSelection template_name = "ishtar/form.html" page_name = _("Merge persons") current_url = "person-manual-merge" redir_url = "person_manual_merge_items" class ManualMergeItemsMixin: item_type = None def get_form_kwargs(self): kwargs = super(ManualMergeItemsMixin, self).get_form_kwargs() kwargs["items"] = self.kwargs["pks"].split("_") return kwargs def form_valid(self, form): callback = None if self.item_type: callback = get_person_gdpr_log self.item = form.merge(callback=callback, request=self.request) return super(ManualMergeItemsMixin, self).form_valid(form) def get_success_url(self): return reverse("display-item", args=[self.item_type, self.item.pk]) class PersonManualMergeItems( ManualMergeItemsMixin, IshtarMixin, LoginRequiredMixin, FormView ): form_class = forms.PersonMergeIntoForm template_name = "ishtar/form.html" page_name = _("Select the main person") current_url = "person-manual-merge-items" item_type = "person" class OrgaManualMerge(ManualMergeMixin, IshtarMixin, LoginRequiredMixin, FormView): form_class = forms.OrgaMergeFormSelection template_name = "ishtar/form.html" page_name = _("Merge organization") current_url = "orga-manual-merge" redir_url = "orga_manual_merge_items" class OrgaManualMergeItems( ManualMergeItemsMixin, IshtarMixin, LoginRequiredMixin, FormView ): form_class = forms.OrgaMergeIntoForm template_name = "ishtar/form.html" page_name = _("Select the main organization") current_url = "orga-manual-merge-items" item_type = "organization" class OrganizationCreate(LoginRequiredMixin, CreateView): model = models.Organization form_class = forms.BaseOrganizationForm template_name = "ishtar/organization_form.html" form_prefix = "orga" def get_form_kwargs(self): kwargs = super(OrganizationCreate, self).get_form_kwargs() if hasattr(self.form_class, "form_prefix"): kwargs.update({"prefix": self.form_class.form_prefix}) return kwargs def get_success_url(self): return reverse("organization_edit", args=[self.object.pk]) class OrganizationEdit(LoginRequiredMixin, UpdateView): model = models.Organization form_class = forms.BaseOrganizationForm template_name = "ishtar/organization_form.html" def get_form_kwargs(self): kwargs = super(OrganizationEdit, self).get_form_kwargs() if hasattr(self.form_class, "form_prefix"): kwargs.update({"prefix": self.form_class.form_prefix}) return kwargs def get_success_url(self): return reverse("organization_edit", args=[self.object.pk]) class OrganizationPersonCreate(LoginRequiredMixin, CreateView): model = models.Person form_class = forms.BaseOrganizationPersonForm template_name = "ishtar/organization_person_form.html" relative_label = _("Corporation manager") def get_context_data(self, *args, **kwargs): data = super(OrganizationPersonCreate, self).get_context_data(*args, **kwargs) data["relative_label"] = self.relative_label return data def get_success_url(self): return reverse("organization_person_edit", args=[self.object.pk]) def form_valid(self, form): returned = super().form_valid(form) get_person_gdpr_log("new_item", self.request, None, self.model.objects.filter(pk=self.object.pk)) return returned class OrganizationPersonEdit(LoginRequiredMixin, UpdateView): model = models.Person form_class = forms.BaseOrganizationPersonForm template_name = "ishtar/organization_person_form.html" relative_label = _("Corporation manager") def get_context_data(self, *args, **kwargs): data = super(OrganizationPersonEdit, self).get_context_data(*args, **kwargs) data["relative_label"] = self.relative_label return data def get_success_url(self): return reverse("organization_person_edit", args=[self.object.pk]) def form_valid(self, form): returned = super().form_valid(form) get_person_gdpr_log("modify_item", self.request, None, self.model.objects.filter(pk=self.object.pk)) return returned # documents new_document_tag = new_qa_item( models.DocumentTag, forms.AddDocumentTagForm, page_name=_("New tag") ) autocomplete_documenttag = get_autocomplete_generic(models.DocumentTag) show_document = show_item(models.Document, "document") get_document = get_item( models.Document, "get_document", "document", search_form=forms.DocumentSelect ) display_document = display_item(models.Document) document_search_wizard = wizards.DocumentSearch.as_view( [("selec-document_search", forms.DocumentFormSelection)], label=_("Document: search"), url_name="search-document", ) class DocumentFormMixin(IshtarMixin, LoginRequiredMixin): form_class = forms.DocumentForm template_name = "ishtar/forms/base_related_items.html" model = models.Document def get_context_data(self, **kwargs): data = super(DocumentFormMixin, self).get_context_data(**kwargs) data["extra_form_modals"] = self.form_class.extra_form_modals return data def get_success_url(self): return reverse("edit-document") + "?open_item={}".format(self.object.pk) class DocumentCreateView(DocumentFormMixin, CreateView): page_name = _("Document creation") def get_form_kwargs(self): kwargs = super(DocumentCreateView, self).get_form_kwargs() initial = kwargs.get("initial", {}) source = None if self.request.GET.get("source_pk", None): try: source = models.Document.objects.get( pk=self.request.GET.get("source_pk") ) except models.Document.DoesNotExist: raise Http404() for related_key in models.Document.RELATED_MODELS_ALT: model = models.Document._meta.get_field(related_key).related_model if model.SLUG in self.request.GET: try: item = model.objects.get(pk=self.request.GET[model.SLUG]) except model.DoesNotExist: continue initial[related_key] = str(item.pk) if source: for k in models.Document.RELATED_MODELS: value = getattr(source, k) if hasattr(value, "all"): value = ",".join([str(v.pk) for v in value.all()]) if hasattr(value, "pk"): value = value.pk initial[k] = value initial["source"] = source.pk if initial: kwargs["initial"] = initial kwargs["user"] = self.request.user return kwargs def form_valid(self, form): returned = super().form_valid(form) ct = ContentType.objects.get_for_model(self.object) for profile in self.request.user.ishtaruser.person.profiles.all(): for permission_type in ("view", "change", "delete"): profile.generate_permission( ct, permission_type, obj_id=self.object.pk ) return returned class DocumentSelectView(IshtarMixin, LoginRequiredMixin, FormView): form_class = forms.DocumentFormSelection template_name = "ishtar/form.html" redir_url = "edit-document" def form_valid(self, form): self.pk = form.cleaned_data["pk"] return super(DocumentSelectView, self).form_valid(form) def get_form_kwargs(self): kwargs = super(DocumentSelectView, self).get_form_kwargs() kwargs["user"] = self.request.user return kwargs def get_context_data(self, **kwargs): data = super(DocumentSelectView, self).get_context_data(**kwargs) if self.request.GET and "open_item" in self.request.GET: data["open_url"] = ( reverse("show-document", args=[self.request.GET["open_item"]]) + "/" ) return data def get_success_url(self): return reverse(self.redir_url, args=[self.pk]) class DocumentEditView(DocumentFormMixin, UpdateView): page_name = _("Document modification") def get_form_kwargs(self): kwargs = super(DocumentEditView, self).get_form_kwargs() try: document = models.Document.objects.get(pk=self.kwargs.get("pk")) except models.Document.DoesNotExist: raise Http404() if not check_permission(self.request, "ishtar_common.change_document", document): raise Http404() initial = {} for k in ( list(self.form_class.base_fields.keys()) + models.Document.RELATED_MODELS ): value = getattr(document, k) if hasattr(value, "all"): value = ",".join([str(v.pk) for v in value.all()]) if hasattr(value, "pk"): value = value.pk initial[k] = value # main image initialisation if document.image: kwargs["main_items_fields"] = {} for k in models.Document.RELATED_MODELS: kwargs["main_items_fields"][k] = [] for related_item in getattr(document, k).all(): key = "{}_{}_main_image".format(k, related_item.pk) kwargs["main_items_fields"][k].append( (key, "{} - {}".format(_("Main image for"), related_item)) ) if related_item.main_image == document: initial[key] = True kwargs["initial"] = initial kwargs["user"] = self.request.user self.document = document return kwargs def get_context_data(self, **kwargs): kwargs = super(DocumentEditView, self).get_context_data(**kwargs) rel = self.document.cache_related_label if len(rel) == 1000: # truncated rel += " (...)" kwargs["item_related_label"] = rel return kwargs document_deletion_steps = [ ("selec-document_deletion", forms.DocumentFormMultiSelection), ("final-document_deletion", FinalDeleteForm), ] document_deletion_wizard = wizards.DocumentDeletionWizard.as_view( document_deletion_steps, label=_("Document deletion"), url_name="document_deletion", ) def document_delete(request, pk): if not wizard_is_available(document_deletion_wizard, request, models.Document, pk): return HttpResponseRedirect("/") wizard_url = "document_deletion" wizards.DocumentDeletionWizard.session_set_value( request, "selec-" + wizard_url, "pks", pk, reset=True ) return redirect(reverse(wizard_url, kwargs={"step": "final-" + wizard_url})) def get_bookmark(request, pk): try: sq = models.SearchQuery.objects.get( pk=pk, profile__person__ishtaruser__user_ptr=request.user ) except models.SearchQuery.DoesNotExist: raise Http404() slug = sq.content_type.model_class().SLUG return redirect(reverse(slug + "_search") + "?bookmark={}".format(sq.pk)) def gen_generate_doc(model): def func(request, pk, template_pk=None): ishtaruser = getattr(request.user, "ishtaruser", None) if not ishtaruser: return HttpResponse(content_type="text/plain") meta = model._meta perm = f"{meta.app_label}.view_{meta.model_name}" if not ishtaruser.has_permission(perm): return HttpResponse(content_type="text/plain") try: item = model.objects.get(pk=pk) doc = item.publish(template_pk) except model.DoesNotExist: doc = None except TemplateSyntaxError as e: dct = { "error_title": _("Error on your template"), "error": str(e), "back_url": reverse("display-item", args=[item.SLUG, pk]), } template = loader.get_template("error.html") return HttpResponse(template.render(dct, request)) if doc: MIMES = { "odt": "application/vnd.oasis.opendocument.text", "ods": "application/vnd.oasis.opendocument.spreadsheet", } ext = doc.split(".")[-1] doc_name = item.get_filename() + "." + ext mimetype = "text/csv" if ext in MIMES: mimetype = MIMES[ext] with open(doc, "rb") as d: response = HttpResponse(d, content_type=mimetype) response["Content-Disposition"] = "attachment; filename=%s" % doc_name return response return HttpResponse(content_type="text/plain") return func class SearchQueryMixin(object): """ Manage content type and profile init """ def dispatch(self, request, *args, **kwargs): if not request.user.pk: raise Http404() try: self.profile = models.UserProfile.objects.get( current=True, person__ishtaruser__user_ptr=request.user ) except models.UserProfile.DoesNotExist: # no current profile raise Http404() self.app_label = kwargs.get("app_label") self.model = kwargs.get("model") model = self.model if model == "site": model = "archaeologicalsite" try: self.content_type = ContentType.objects.get( app_label=self.app_label.replace("-", "_"), model=model.replace("-", "_"), ) except ContentType.DoesNotExist: raise Http404() return super(SearchQueryMixin, self).dispatch(request, *args, **kwargs) class SearchQueryEdit(SearchQueryMixin, LoginRequiredMixin, FormView): template_name = "ishtar/forms/search_query.html" form_class = forms.SearchQueryForm def get_form_kwargs(self): kwargs = super(SearchQueryEdit, self).get_form_kwargs() kwargs["profile"] = self.profile kwargs["content_type"] = self.content_type return kwargs def get_context_data(self, **kwargs): data = super(SearchQueryEdit, self).get_context_data(**kwargs) data["app_label"] = self.app_label data["model"] = self.model return data def form_valid(self, form): form.save() return HttpResponseRedirect(self.get_success_url()) def get_success_url(self): return reverse("success", args=["bookmark"]) class SuccessView(TemplateView): template_name = "ishtar/forms/success.html" def get_context_data(self, **kwargs): data = super().get_context_data(**kwargs) msg = self.request.GET.get("message") if msg: data["message"] = urllib.parse.unquote(msg) return data class BookmarkList( SearchQueryMixin, JSONResponseMixin, LoginRequiredMixin, TemplateView ): def get_data(self, context): q = models.SearchQuery.objects.filter( content_type=self.content_type, profile=self.profile ) return { "bookmarks": [ {"label": sq.label, "query": sq.query, "id": sq.id} for sq in q.all() ] } class QRCodeForSearchView(LoginRequiredMixin, FormView): template_name = "ishtar/forms/qrcode_for_search.html" form_class = forms.QRSearchForm def form_valid(self, form): context_data = self.get_context_data() context_data["qr_code"] = form.save() return self.render_to_response(context_data) class SearchQueryDelete(LoginRequiredMixin, DeleteView): model = models.SearchQuery template_name = "ishtar/forms/bookmark_delete.html" page_name = _("Delete bookmark") def dispatch(self, request, *args, **kwargs): if not request.user.pk: raise Http404() try: self.profile = models.UserProfile.objects.get( current=True, person__ishtaruser__user_ptr=request.user ) except models.UserProfile.DoesNotExist: # no current profile raise Http404() try: self.search_query = models.SearchQuery.objects.get( profile=self.profile, pk=kwargs["pk"] ) except models.SearchQuery.DoesNotExist: raise Http404() return super(SearchQueryDelete, self).dispatch(request, *args, **kwargs) def get_context_data(self, **kwargs): data = super(SearchQueryDelete, self).get_context_data(**kwargs) data["modal_size"] = "small" data["page_name"] = _("Bookmark - Delete") data["action_name"] = _("Delete") data["item"] = self.search_query.label data["url"] = reverse("bookmark-delete", args=[self.search_query.pk]) return data def get_success_url(self): return reverse("success", args=["bookmark"]) class AlertList(JSONResponseMixin, LoginRequiredMixin, TemplateView): def dispatch(self, request, *args, **kwargs): if not request.user.pk: raise Http404() try: self.profile = models.UserProfile.objects.get( current=True, person__ishtaruser__user_ptr=request.user ) except models.UserProfile.DoesNotExist: # no current profile raise Http404() return super(AlertList, self).dispatch(request, *args, **kwargs) def get_data(self, context): q = models.SearchQuery.objects.filter(profile=self.profile, is_alert=True) alerts = [] for sq in q.all(): model = sq.content_type.model_class() module = model.__module__.split(".")[0] views = importlib.import_module(module + ".views") try: get_view = getattr(views, "get_" + model.SLUG) except AttributeError: continue nb = get_view(self.request, query={"search_vector": sq.query}, count=True) alerts.append({"label": sq.label, "query_id": sq.pk, "number": nb}) return {"alerts": alerts} class QANotAvailable(IshtarMixin, LoginRequiredMixin, TemplateView): template_name = "ishtar/forms/qa_message.html" modal_size = "small" contexts = { "locked-by-others": _("Some items have been locked by other " "users."), "locked": _("Some items are locked."), } def get_context_data(self, **kwargs): data = super(QANotAvailable, self).get_context_data(**kwargs) data["page_name"] = _("Not available") data["message"] = _("Action not available for these items.") if self.kwargs.get("context"): context = self.kwargs.get("context") if context in self.contexts: data["message"] += " {}".format(self.contexts[context]) return data class QAItemForm(IshtarMixin, LoginRequiredMixin, FormView): template_name = "ishtar/forms/qa_form.html" model = None base_url = None form_class = None page_name = "" success_url = "/success/" modal_size = None # large, small or None (medium) icon = "fa fa-pencil" action_name = None permissions = [] def get_permissions(self): return self.permissions def get_quick_action(self): quick_action = self.model.get_quick_action_by_url(self.base_url) if quick_action: return quick_action # if not listed in QUICK_ACTIONS return QuickAction( url=self.base_url, icon_class=self.icon, text=self.page_name, rights=self.get_permissions() ) def pre_dispatch(self, request, *args, **kwargs): if not self.model: if "model" in kwargs: self.model = kwargs["model"] else: raise NotImplementedError("No attribute model defined.") pks = kwargs.get("pks") if isinstance(pks, int): pks = [pks] else: pks = [int(pk) for pk in kwargs.get("pks").split("-")] self.items = list(self.model.objects.filter(pk__in=pks)) if not self.items: raise Http404() # check availability quick_action = self.get_quick_action() if not quick_action: raise Http404() if not quick_action.is_available(user=request.user): for item in self.items: if not quick_action.is_available( user=request.user, obj=item ): raise Http404() self.url = request.get_full_path() def dispatch(self, request, *args, **kwargs): redirected = self.pre_dispatch(request, *args, **kwargs) if redirected: return redirected return super(QAItemForm, self).dispatch(request, *args, **kwargs) def get_form_kwargs(self): kwargs = super(QAItemForm, self).get_form_kwargs() kwargs["items"] = self.items return kwargs def get_page_name(self): page_name = "" if self.icon: page_name = f'  ' return page_name + "{} – {}".format( self.model._meta.verbose_name, self.page_name ) def get_context_data(self, **kwargs): data = super().get_context_data(**kwargs) data["url"] = self.url data["items"] = self.items data["modal_size"] = self.modal_size data["page_name"] = self.get_page_name() if self.action_name: data["action_name"] = self.action_name return data class QAItemEditForm(QAItemForm): form_class_multi = None modal_size = "large" def get_quick_action(self): return self.model.QA_EDIT def pre_dispatch(self, request, *args, **kwargs): self.confirm = kwargs.get("confirm", False) and True redirected = super(QAItemEditForm, self).pre_dispatch(request, *args, **kwargs) if redirected: return redirected if hasattr(self.model, "is_locked"): for item in self.items: if item.is_locked(request.user): redirected = HttpResponseRedirect( reverse("qa-not-available", args=["locked"]) ) return redirected def get_form_class(self): if len(self.items) > 1 and self.form_class_multi: return self.form_class_multi return self.form_class def get_form_kwargs(self): kwargs = super(QAItemEditForm, self).get_form_kwargs() kwargs["confirm"] = self.confirm kwargs["user"] = self.request.user.ishtaruser return kwargs def get_page_name(self): page_name = "" if self.icon: page_name = f'  ' if not self.page_name: return page_name + "{} – {}".format( self.model._meta.verbose_name, self.model.QA_EDIT.text ) else: return page_name + "{} – {}".format( self.model._meta.verbose_name, self.page_name ) def get_context_data(self, **kwargs): data = super().get_context_data(**kwargs) if self.confirm: if "confirm" not in self.url: data["url"] = self.url.split("?")[0] + "confirm/" data["confirm"] = True data["action_name"] = _("Confirm") return data def form_valid(self, form): if not self.confirm: self.confirm = True return self.render_to_response(self.get_context_data(form=self.get_form())) return self.form_save(form) def form_save(self, form): message = form.save(self.items, self.request.user) extra_args = "" if message: extra_args = "?message=" + urllib.parse.quote(str(message)) return HttpResponseRedirect(reverse("success") + extra_args) class QABaseLockView(QAItemForm): form_class = forms.QALockForm page_name = _("Lock/unlock") icon = "fa fa-lock" def pre_dispatch(self, request, *args, **kwargs): super(QABaseLockView, self).pre_dispatch(request, *args, **kwargs) if [ True for item in self.items if item.lock_user and item.lock_user != request.user ]: url = reverse("qa-not-available", args=["locked-by-others"]) return HttpResponseRedirect(url) def form_valid(self, form): form.save(self.items, self.request.user) return HttpResponseRedirect(reverse("success")) class QALinkView(QAItemForm): form_class = forms.QALinkForm page_name = _("Link items") icon = "fa fa-link" def pre_dispatch(self, request, *args, **kwargs): self.base_url = kwargs["url"] super().pre_dispatch(request, *args, **kwargs) if not request.user.ishtaruser.is_ishtaradmin: url = reverse("qa-not-available") return HttpResponseRedirect(url) def form_valid(self, form): form.save(self.items, self.request.user) return HttpResponseRedirect(reverse("success")) class QAOrganizationForm(QAItemEditForm): model = models.Organization form_class = forms.QAOrganizationFormMulti class QAPersonForm(QAItemEditForm): model = models.Person form_class = forms.QAPersonFormMulti def form_save(self, form): get_person_gdpr_log("modify_item", self.request, None, self.model.objects.filter(pk__in=[item.pk for item in self.items])) form.save(self.items, self.request.user) return HttpResponseRedirect(reverse("success")) class QABiographicalNoteForm(QAItemForm): model = models.BiographicalNote form_class = forms.BiographicalNoteEditForm page_name = _("Modify") def get_quick_action(self): return self.model.QA_EDIT def get_form_kwargs(self): kwargs = super().get_form_kwargs() kwargs["user"] = self.request.user return kwargs def form_valid(self, form): return self.form_save(form) def form_save(self, form): form.save(self.request.user, self.items[0]) return HttpResponseRedirect(reverse("success")) class QADocumentForm(QAItemEditForm): model = models.Document form_class = forms.QADocumentFormMulti class QADocumentDuplicateFormView(QAItemForm): template_name = "ishtar/forms/qa_document_duplicate.html" model = models.Document page_name = _("Duplicate") form_class = forms.QADocumentDuplicateForm base_url = "document-qa-duplicate" def get_form_kwargs(self): kwargs = super(QADocumentDuplicateFormView, self).get_form_kwargs() kwargs["user"] = self.request.user return kwargs def form_valid(self, form): new = form.save() if form.cleaned_data.get("open_edit", False): redir = reverse("edit-document", args=[new.pk]).replace("/", "|") return HttpResponseRedirect(reverse("success", args=["redirect", redir])) return HttpResponseRedirect(reverse("success")) def get_context_data(self, **kwargs): data = super(QADocumentDuplicateFormView, self).get_context_data(**kwargs) data["action_name"] = _("Duplicate") return data class QADocumentUnlink(QAItemForm): model = models.Document page_name = _("Unlink") form_class = forms.QAForm base_url = "document-qa-unlink" icon = "fa fa-chain-broken" action_name = _("Unlink") def pre_dispatch(self, request, *args, **kwargs): related_model = kwargs.get("related_model") # model, related_id if related_model not in ACTION_MODEL_DICT: raise Http404() related_model = ACTION_MODEL_DICT[related_model] related_id = kwargs.get("related_id") try: self.related_item = related_model.objects.get(pk=related_id) except related_model.DoesNotExist: raise Http404() return super().pre_dispatch(request, *args, **kwargs) def get_permissions(self): rel_meta = self.related_item.__class__._meta return [ f"{rel_meta.app_label}.change_{rel_meta.model_name}", f"{rel_meta.app_label}.change_own_{rel_meta.model_name}", ] def get_form_kwargs(self): kwargs = super().get_form_kwargs() kwargs["confirm"] = True return kwargs def form_valid(self, form): self.related_item.documents.remove(self.items[0]) return HttpResponseRedirect(reverse("success")) def get_context_data(self, **kwargs): data = super().get_context_data(**kwargs) data["messages"] = [ BSMessage( str(_('Unlink document "{}" from this item.')).format(self.items[0]), "info", "", no_dismiss=True) ] return data class QADocumentPackagingFormView(QAItemForm): template_name = "ishtar/forms/qa_document_packaging.html" model = models.Document form_class = forms.QADocumentPackagingForm page_name = _("Packaging") base_url = "document-qa-packaging" def dispatch(self, request, *args, **kwargs): returned = super(QADocumentPackagingFormView, self).dispatch( request, *args, **kwargs ) """ for item in self.items: if item.is_locked(request.user): return HttpResponseRedirect(reverse("qa-not-available")) """ return returned def get_form_kwargs(self): kwargs = super(QADocumentPackagingFormView, self).get_form_kwargs() kwargs["user"] = self.request.user kwargs["prefix"] = "qa-packaging" return kwargs def form_valid(self, form): form.save(self.items, self.request.user) return HttpResponseRedirect(reverse("success")) class DisplayItemView(IshtarMixin, TemplateView): template_name = "ishtar/display_item.html" SHOW_VIEWS = {"document": show_document} ASSOCIATED_MODEL = {"document": models.Document} def dispatch(self, request, *args, **kwargs): if not self.request.user.is_authenticated: return auth_view.redirect_to_login(reverse("display-item", kwargs=kwargs)) return super(DisplayItemView, self).dispatch(request, *args, **kwargs) def get_context_data(self, *args, **kwargs): data = super(DisplayItemView, self).get_context_data(*args, **kwargs) pk = kwargs.get("pk") item_type = kwargs.get("item_type") if item_type in self.SHOW_VIEWS: data["view_content"] = self.SHOW_VIEWS[item_type](self.request, pk).content.decode("utf-8") if item_type in self.ASSOCIATED_MODEL and hasattr( self.ASSOCIATED_MODEL[item_type], "extra_meta" ): model = self.ASSOCIATED_MODEL[item_type] try: data["extra_meta"] = model.objects.get(pk=pk).extra_meta except model.DoesNotExist: pass else: data["show_url"] = "/show-{}/{}/".format(item_type, pk) return data class GeoPreCreateView(IshtarMixin, LoginRequiredMixin, FormView): page_name = _("Geo item creation") form_class = forms.PreGISForm template_name = "ishtar/forms/base_form.html" def get(self, request, *args, **kwargs): self.back_url = request.GET.get("back_url") return super().get(request, *args, **kwargs) def post(self, request, *args, **kwargs): self.back_url = request.POST.get("back_url") return super().post(request, *args, **kwargs) def get_form_kwargs(self): kwargs = super().get_form_kwargs() kwargs["back_url"] = self.back_url return kwargs def form_valid(self, form): model_source = self.kwargs.get("model_source") source_pk = self.kwargs.get("source_pk") find_id = None if model_source == "find": try: find = Find.objects.get(pk=source_pk) except Find.DoesNotExist: raise Http404() bf = find.get_first_base_find() if not bf: raise Http404() model_source = "basefind" find_id = source_pk source_pk = bf.pk success_url = reverse( "create-geo", kwargs={ "app_source": self.kwargs.get("app_source"), "model_source": model_source, "source_pk": source_pk, "geom_type": form.cleaned_data.get("geom_type"), } ) if self.back_url: success_url += "?back_url=" + urllib.parse.quote(self.back_url) if find_id: success_url += "&" if self.back_url else "?" success_url += f"find_id={find_id}" return HttpResponseRedirect(success_url) class GeoFormMixin(IshtarMixin, LoginRequiredMixin): form_class = forms.GISForm template_name = "ishtar/forms/geo_form.html" model = models.GeoVectorData def get(self, request, *args, **kwargs): self.back_url = request.GET.get("back_url") self.find_id = request.GET.get("find_id") return super().get(request, *args, **kwargs) def post(self, request, *args, **kwargs): self.back_url = request.POST.get("back_url") self.find_id = request.POST.get("find_id") return super().post(request, *args, **kwargs) def get_context_data(self, **kwargs): data = super(GeoFormMixin, self).get_context_data(**kwargs) data["extra_form_modals"] = self.form_class.extra_form_modals return data def get_success_url(self): if not self.back_url: for rel_model_key in models.GeoVectorData.RELATED_MODELS: # should in a logic order from largest to close # town before operation, operation before context record... rel = getattr(self.object, rel_model_key) if rel.count(): rel_item = rel.all()[0] if not hasattr(rel_item, "SLUG"): continue return reverse("display-item", kwargs={"item_type": rel_item.SLUG, "pk": rel_item.pk}) return reverse("edit-geo", kwargs={"pk": self.object.pk}) back_url = self.back_url if self.find_id: back_url = back_url.split("=")[0] + "=" + str(self.find_id) return back_url def get_form_kwargs(self): kwargs = super().get_form_kwargs() kwargs["back_url"] = self.back_url kwargs["find_id"] = self.find_id return kwargs class GeoCreateView(GeoFormMixin, CreateView): page_name = _("Geo item creation") def get_form_kwargs(self): if not hasattr(self.request.user, "ishtaruser"): raise Http404() ishtaruser = self.request.user.ishtaruser kwargs = super(GeoCreateView, self).get_form_kwargs() try: content_type = ContentType.objects.get( app_label=self.kwargs.get("app_source"), model=self.kwargs.get("model_source") ) except ContentType.DoesNotExist: raise Http404() model = content_type.model_class() try: obj = model.objects.get(pk=self.kwargs.get("source_pk")) except model.DoesNotExist: raise Http404() # check permission to add and view attached item attached_meta = model._meta perm_attached = f"{attached_meta.app_label}.view_{attached_meta.model_name}" perm_own_attached = f"{attached_meta.app_label}.view_own_{attached_meta.model_name}" if not ishtaruser.has_permission( "ishtar_common.add_geovectordata") or ( not ishtaruser.has_permission(perm_attached) and not ishtaruser.has_permission(perm_own_attached, obj=obj)): # check permission to view own attached item raise Http404() kwargs["main_items_fields"] = {} for k in models.GeoVectorData.RELATED_MODELS: kwargs["main_items_fields"][k] = [] if k.endswith(obj.SLUG): kwargs["initial"][k] = [obj.pk] kwargs["initial"]["name"] = obj.short_label \ if hasattr(obj, "short_label") else str(obj) key = f"{k}_{obj.pk}_main_item" kwargs["main_items_fields"][k].append( (key, "{} - {}".format(_("Main geo item for"), obj)) ) kwargs["source_content_type"] = content_type.pk kwargs["source_id"] = obj.pk kwargs["geom_type"] = self.kwargs.get("geom_type") main_geodata = obj.main_geodata if main_geodata and main_geodata.cached_x and main_geodata.cached_y: kwargs["default_center"] = (main_geodata.cached_x, main_geodata.cached_y) kwargs["user"] = self.request.user return kwargs class GeoEditView(GeoFormMixin, UpdateView): page_name = _("Geo item modification") def get_form_kwargs(self): kwargs = super(GeoEditView, self).get_form_kwargs() try: geo = models.GeoVectorData.objects.get(pk=self.kwargs.get("pk")) except models.GeoVectorData.DoesNotExist: raise Http404() if not check_permission(self.request, "ishtar_common.change_geovectordata", geo): raise Http404() initial = {} for k in ( list(self.form_class.base_fields.keys()) + models.GeoVectorData.RELATED_MODELS ): value = getattr(geo, k) if hasattr(value, "all"): value = ",".join([str(v.pk) for v in value.all().order_by("pk")]) if hasattr(value, "pk"): value = value.pk initial[k] = value kwargs["main_items_fields"] = {} kwargs["too_many"] = {} LIMIT = 10 for k in models.GeoVectorData.RELATED_MODELS: kwargs["main_items_fields"][k] = [] values = [] for idx, related_item in enumerate(getattr(geo, k).all()): if idx >= LIMIT: if k not in kwargs["too_many"]: kwargs["too_many"][k] = [] kwargs["too_many"][k].append(related_item.pk) continue pk = str(related_item.pk) values.append(pk) key = "{}_{}_main_item".format(k, pk) kwargs["main_items_fields"][k].append( (key, "{} - {}".format(_("Main geo item for"), related_item)) ) if related_item.main_geodata == geo: initial[key] = True initial[k] = ",".join(values) kwargs["initial"] = initial kwargs["user"] = self.request.user return kwargs class GeoDeleteView(IshtarMixin, LoginRequiredMixin, DeleteView): template_name = "ishtar/forms/geo_delete_form.html" model = models.GeoVectorData page_name = _("Delete geographic item") def get(self, request, *args, **kwargs): self.back_url = request.GET.get("back_url", None) return super().get(request, *args, **kwargs) def get_context_data(self, **kwargs): context_data = super().get_context_data(**kwargs) if self.back_url: context_data["back_url"] = self.back_url return context_data def get_success_url(self): if self.request.POST.get("back_url", None): return self.request.POST.get("back_url", None) return reverse("start")