#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2010-2017 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import csv import datetime import importlib from jinja2 import TemplateSyntaxError import json import logging import os import unicodedata from django.apps import apps from django.conf import settings from django.contrib.auth import logout from django.contrib.auth.decorators import login_required from django.contrib.auth.views import redirect_to_login from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ObjectDoesNotExist from django.core.urlresolvers import reverse, NoReverseMatch from django.db.models import Q from django.template import loader from django.forms.models import modelformset_factory from django.http import ( HttpResponse, Http404, HttpResponseRedirect, HttpResponseBadRequest, JsonResponse, ) from django.shortcuts import redirect, render, get_object_or_404 from django.utils.decorators import method_decorator from django.utils.translation import ugettext, ugettext_lazy as _ from django.views.generic import ListView, TemplateView, View from django.views.generic.edit import CreateView, DeleteView, FormView, UpdateView from extra_views import ModelFormSetView from markdown import markdown from . import models from archaeological_context_records.models import ContextRecord from archaeological_files.forms import DashboardForm as DashboardFormFile from archaeological_files.models import File from archaeological_finds.forms import ( DashboardTreatmentForm, DashboardTreatmentFileForm, ) from archaeological_finds.models import Find, Treatment, TreatmentFile from archaeological_operations.forms import DashboardForm as DashboardFormOpe from archaeological_operations.models import Operation, ArchaeologicalSite from archaeological_warehouse.models import Warehouse from ishtar_common import forms_common as forms from ishtar_common import wizards from ishtar_common.forms import FinalForm, FinalDeleteForm from ishtar_common.models import get_current_profile from ishtar_common.templatetags.link_to_window import simple_link_to_window from ishtar_common.utils import ( clean_session_cache, CSV_OPTIONS, get_field_labels_from_path, get_random_item_image_link, shortify, dict_to_tuple, put_session_message, get_model_by_slug, ) from ishtar_common.widgets import JQueryAutoComplete from ishtar_common import tasks from .views_item import ( CURRENT_ITEM_KEYS, CURRENT_ITEM_KEYS_DICT, check_permission, display_item, get_item, show_item, new_qa_item, modify_qa_item, get_short_html_detail, ) logger = logging.getLogger(__name__) def status(request): return HttpResponse("OK") def raise_error(request): return 1 / 0 def raise_task_error(request): if settings.USE_BACKGROUND_TASK: tasks.trigger_error.delay() return HttpResponse("OK") def wizard_is_available(wizard, request, model, pk): try: wizard(request) except IndexError: # no step available put_session_message( request.session.session_key, _("You don't have sufficient permissions to do this action."), "warning", ) return q = model.objects.filter(pk=pk) if not q.count(): raise Http404() obj = q.all()[0] if hasattr(obj, "is_locked") and obj.is_locked(request.user): raise Http404() return obj def tiny_redirect(request, url_id): db_id = models.TinyUrl.decode_id(url_id) link_db = get_object_or_404(models.TinyUrl, id=db_id) return redirect(link_db.link) def index(request): """ Main page """ dct = {"warnings": [], "extra_form_modals": wizards.EXTRA_FORM_MODALS} if settings.PROJECT_SLUG == "default": dct["warnings"].append( _( 'PROJECT_SLUG is set to "default". Change it in your ' "local_settings (or ask your admin to do it)." ) ) profile = get_current_profile() if profile.slug == "default": dct["warnings"].append( _( 'The slug of your current profile is set to "default". Change it ' "on the administration page (or ask your admin to do it)." ) ) image = get_random_item_image_link(request) if hasattr(profile, "homepage") and profile.homepage: dct["homepage"] = markdown(profile.homepage) if "{random_image}" in dct["homepage"]: dct["homepage"] = dct["homepage"].replace("{random_image}", image) else: dct["random_image"] = image try: return render(request, "index.html", dct) except NoReverseMatch: # probably rights exception (rights revoked) logout(request) return render(request, "index.html", dct) person_search_wizard = wizards.PersonSearch.as_view( [("general-person_search", forms.PersonFormSelection)], label=_("Person search"), url_name="person_search", ) person_creation_wizard = wizards.PersonWizard.as_view( [ ("identity-person_creation", forms.SimplePersonForm), ("person_type-person_creation", forms.PersonTypeForm), ("final-person_creation", FinalForm), ], label=_("New person"), url_name="person_creation", ) person_modification_wizard = wizards.PersonModifWizard.as_view( [ ("selec-person_modification", forms.PersonFormSelection), ("identity-person_modification", forms.SimplePersonForm), ("person_type-person_creation", forms.PersonTypeForm), ("final-person_modification", FinalForm), ], label=_("Person modification"), url_name="person_modification", ) def person_modify(request, pk): if not wizard_is_available(person_modification_wizard, request, models.Person, pk): return HttpResponseRedirect("/") wizards.PersonModifWizard.session_set_value( request, "selec-person_modification", "pk", pk, reset=True ) return redirect( reverse("person_modification", kwargs={"step": "identity-person_modification"}) ) person_deletion_wizard = wizards.PersonDeletionWizard.as_view( [ ("selec-person_deletion", forms.PersonFormMultiSelection), ("final-person_deletion", FinalDeleteForm), ], label=_("Person deletion"), url_name="person_deletion", ) def person_delete(request, pk): if not wizard_is_available(person_deletion_wizard, request, models.Person, pk): return HttpResponseRedirect("/") wizards.PersonDeletionWizard.session_set_value( request, "selec-person_deletion", "pks", pk, reset=True ) return redirect( reverse("person_deletion", kwargs={"step": "final-person_deletion"}) ) organization_search_wizard = wizards.OrganizationSearch.as_view( [("general-organization_search", forms.OrganizationFormSelection)], label=_("Organization search"), url_name="organization_search", ) organization_creation_wizard = wizards.OrganizationWizard.as_view( [ ("identity-organization_creation", forms.OrganizationForm), ("final-organization_creation", FinalForm), ], label=_("New organization"), url_name="organization_creation", ) organization_modification_wizard = wizards.OrganizationModifWizard.as_view( [ ("selec-organization_modification", forms.OrganizationFormSelection), ("identity-organization_modification", forms.OrganizationForm), ("final-organization_modification", FinalForm), ], label=_("Organization modification"), url_name="organization_modification", ) def organization_modify(request, pk): if not wizard_is_available( organization_modification_wizard, request, models.Organization, pk ): return HttpResponseRedirect("/") wizards.OrganizationModifWizard.session_set_value( request, "selec-organization_modification", "pk", pk, reset=True ) return redirect( reverse( "organization_modification", kwargs={"step": "identity-organization_modification"}, ) ) organization_deletion_wizard = wizards.OrganizationDeletionWizard.as_view( [ ("selec-organization_deletion", forms.OrganizationFormMultiSelection), ("final-organization_deletion", FinalDeleteForm), ], label=_("Organization deletion"), url_name="organization_deletion", ) def organization_delete(request, pk): if not wizard_is_available( organization_deletion_wizard, request, models.Organization, pk ): return HttpResponseRedirect("/") wizard_url = "organization_deletion" wizards.OrganizationDeletionWizard.session_set_value( request, "selec-" + wizard_url, "pks", pk, reset=True ) return redirect(reverse(wizard_url, kwargs={"step": "final-" + wizard_url})) account_wizard_steps = [ ("selec-account_management", forms.PersonUserFormSelection), ("account-account_management", forms.AccountForm), ("profile-account_management", forms.ProfileFormset), ("final-account_management", forms.FinalAccountForm), ] account_management_wizard = wizards.AccountWizard.as_view( account_wizard_steps, label=_("Account management"), url_name="account_management", ) account_deletion_wizard = wizards.IshtarUserDeletionWizard.as_view( [ ("selec-account_deletion", forms.AccountFormSelection), ("final-account_deletion", FinalDeleteForm), ], label=_("Account deletion"), url_name="account_deletion", ) def get_autocomplete_generic(model, extra=None): if not extra: extra = {"available": True} def func(request): q = request.GET.get("term") query = Q(**extra) if not q: q = "" for q in q.split(" "): if not q: continue query = query & Q(label__icontains=q) limit = 20 objects = model.objects.filter(query).distinct()[:limit] get_label = lambda x: x.full_label() if hasattr(x, "full_label") else str(x) data = json.dumps([{"id": obj.pk, "value": get_label(obj)} for obj in objects]) return HttpResponse(data, content_type="text/plain") return func def hide_shortcut_menu(request): request.session["SHORTCUT_SHOW"] = "off" return HttpResponse("OK", content_type="text/plain") def show_shortcut_menu(request): request.session["SHORTCUT_SHOW"] = "on" return HttpResponse("OK", content_type="text/plain") def activate_all_search(request): request.session["SHORTCUT_SEARCH"] = "all" return HttpResponse("OK", content_type="text/plain") def activate_own_search(request): request.session["SHORTCUT_SEARCH"] = "own" return HttpResponse("OK", content_type="text/plain") def activate_advanced_shortcut_menu(request): if not hasattr(request.user, "ishtaruser"): return HttpResponse("KO", content_type="text/plain") request.user.ishtaruser.advanced_shortcut_menu = True request.user.ishtaruser.save() return HttpResponse("OK", content_type="text/plain") def activate_simple_shortcut_menu(request): if not hasattr(request.user, "ishtaruser"): return HttpResponse("KO", content_type="text/plain") request.user.ishtaruser.advanced_shortcut_menu = False request.user.ishtaruser.save() return HttpResponse("OK", content_type="text/plain") def shortcut_menu(request): profile = get_current_profile() CURRENT_ITEMS = [] if profile.files: CURRENT_ITEMS.append((_("Archaeological file"), File)) CURRENT_ITEMS.append((_("Operation"), Operation)) if profile.archaeological_site: CURRENT_ITEMS.append((profile.get_site_label(), ArchaeologicalSite)) if profile.context_record: CURRENT_ITEMS.append((_("Context record"), ContextRecord)) if profile.warehouse: CURRENT_ITEMS.append((_("Warehouse"), Warehouse)) if profile.find: CURRENT_ITEMS.append((_("Find"), Find)) if profile.warehouse: CURRENT_ITEMS.append((_("Treatment request"), TreatmentFile)) CURRENT_ITEMS.append((_("Treatment"), Treatment)) if ( hasattr(request.user, "ishtaruser") and request.user.ishtaruser.advanced_shortcut_menu ): dct = { "current_menu": [], "menu": [], "SHORTCUT_SEARCH": request.session["SHORTCUT_SEARCH"] if "SHORTCUT_SEARCH" in request.session else "own", "SHORTCUT_SHOW": request.session["SHORTCUT_SHOW"] if "SHORTCUT_SHOW" in request.session else "on", } current_selected_labels = [] for lbl, model in CURRENT_ITEMS: model_name = model.SLUG current = model_name in request.session and request.session[model_name] if current: try: item = model.objects.get(pk=int(current)) item_label = shortify(str(item), 60) current_selected_labels.append(item_label) except model.DoesNotExist: pass dct["menu"].append( ( lbl, model_name, current or 0, JQueryAutoComplete( reverse("get-" + model.SLUG + "-shortcut"), model ).render( model.SLUG + "-shortcut", value=current, attrs={"id": "current_" + model.SLUG}, ), ) ) dct["current_selected_labels"] = current_selected_labels return render(request, "ishtar/blocks/advanced_shortcut_menu.html", dct) dct = { "current_menu": [], "SHORTCUT_SHOW": request.session["SHORTCUT_SHOW"] if "SHORTCUT_SHOW" in request.session else "off", } current_selected_labels = [] current_selected_item = {} labels = {} for lbl, model in CURRENT_ITEMS: new_selected_item = None model_name = model.SLUG cls = "" current = model_name in request.session and request.session[model_name] items = [] current_items = [] labels[model_name] = {} lbl_key = "cached_label" if model.SLUG == "warehouse": lbl_key = "name" values = ["id", lbl_key] owns = ( model.get_owns( request.user, menu_filtr=current_selected_item, limit=100, values=values, get_short_menu_class=True, ) or [] ) for item, shortmenu_class in owns: pk = str(item["id"]) if shortmenu_class == "basket": pk = "basket-" + pk # prevent duplicates if pk in current_items: continue current_items.append(pk) selected = pk == current item_label = shortify(item[lbl_key], 60) if selected: cls = shortmenu_class new_selected_item = pk labels[model_name][str(pk)] = item_label items.append((pk, item_label, selected, shortmenu_class)) # selected is not in owns - add it to the list if not new_selected_item and current: try: item = model.objects.get(pk=int(current)) new_selected_item = item.pk item_label = shortify(str(item), 60) labels[model_name][str(item.pk)] = item_label items.append( (item.pk, item_label, True, item.get_short_menu_class(item.pk)) ) except (model.DoesNotExist, ValueError): pass if items: dct["current_menu"].append((lbl, model_name, cls, items)) if new_selected_item: current_selected_item[model_name] = new_selected_item if str(new_selected_item) in labels[model_name]: current_selected_labels.append( labels[model_name][str(new_selected_item)] ) dct["current_selected_labels"] = current_selected_labels return render(request, "ishtar/blocks/shortcut_menu.html", dct) def get_current_items(request): currents = {} for key, model in CURRENT_ITEM_KEYS: currents[key] = None if key in request.session and request.session[key]: try: currents[key] = model.objects.get(pk=int(request.session[key])) except (ValueError, model.DoesNotExist): continue return currents def unpin(request, item_type, cascade=False): if item_type not in CURRENT_ITEM_KEYS_DICT.keys(): logger.warning("unpin unknow type: {}".format(item_type)) return HttpResponse("nok") if "administrativeact" in item_type: request.session[item_type] = "" return HttpResponse("ok") request.session["treatment"] = "" if item_type == "treatment" and not cascade: return HttpResponse("ok") request.session["treatmentfile"] = "" if item_type == "treatmentfile" and not cascade: return HttpResponse("ok") request.session["find"] = "" if item_type == "find" and not cascade: return HttpResponse("ok") request.session["warehouse"] = "" if item_type == "warehouse" and not cascade: return HttpResponse("ok") request.session["contextrecord"] = "" if item_type == "contextrecord" and not cascade: return HttpResponse("ok") request.session["site"] = "" if item_type == "site" and not cascade: return HttpResponse("ok") request.session["operation"] = "" if item_type == "operation" and not cascade: return HttpResponse("ok") request.session["file"] = "" if item_type == "file" and not cascade: return HttpResponse("ok") def update_current_item(request, item_type=None, pk=None): if not item_type or not pk: if not request.is_ajax() and not request.method == "POST": raise Http404 item_type = request.POST["item"] if "value" in request.POST and "item" in request.POST: request.session[item_type] = request.POST["value"] else: request.session[item_type] = str(pk) request.session["SHORTCUT_SEARCH"] = "all" currents = get_current_items(request) # re-init when descending item are not relevant if ( item_type == "file" and currents["file"] and currents["operation"] and currents["operation"].associated_file != currents["file"] ): request.session["operation"] = "" currents["operation"] = None if ( item_type in ("operation", "file") and currents["contextrecord"] and ( not request.session.get("operation", None) or currents["contextrecord"].operation != currents["operation"] ) ): request.session["contextrecord"] = "" currents["contextrecord"] = None from archaeological_finds.models import Find if ( item_type in ("contextrecord", "operation", "file") and currents["find"] and ( not request.session.get("contextrecord", None) or not Find.objects.filter( downstream_treatment__isnull=True, base_finds__context_record__pk=request.session["contextrecord"], pk=currents["find"].pk, ).count() ) ): request.session["find"] = "" currents["find"] = None # re-init ascending item with relevant values if item_type == "find" and currents["find"]: from archaeological_context_records.models import ContextRecord q = ContextRecord.objects.filter(base_finds__find__pk=currents["find"].pk) if q.count(): currents["contextrecord"] = q.all()[0] request.session["contextrecord"] = str(currents["contextrecord"].pk) if item_type in ("find", "contextrecord") and currents["contextrecord"]: currents["operation"] = currents["contextrecord"].operation request.session["operation"] = str(currents["operation"].pk) if item_type in ("find", "contextrecord", "operation") and currents["operation"]: currents["file"] = currents["operation"].associated_file request.session["file"] = str(currents["file"].pk) if currents["file"] else None return HttpResponse("ok") def pin_search(request, item_type): key = "pin-search-" + item_type if not item_type or not ( request.is_ajax() and request.method == "POST" and "value" in request.POST ): raise Http404 request.session[key] = request.POST["value"] if not request.POST["value"]: # empty all unpin(request, item_type, cascade=True) else: unpin(request, item_type) return HttpResponse("ok") def get_by_importer( request, slug, data_type="json", full=False, force_own=False, **dct ): q = models.ImporterType.objects.filter(slug=slug) if not q.count(): res = "" if data_type == "json": res = "{}" return HttpResponse(res, content_type="text/plain") imp = q.all()[0].get_importer_class() cols, col_names = [], [] for formater in imp.LINE_EXPORT_FORMAT: if not formater: cols.append("") col_names.append("") continue cols.append(formater.export_field_name) col_names.append(formater.label) obj_name = imp.OBJECT_CLS.__name__.lower() return get_item(imp.OBJECT_CLS, "get_" + obj_name, obj_name, own_table_cols=cols)( request, data_type, full, force_own, col_names=col_names, **dct ) def autocomplete_person_permissive( request, person_types=None, attached_to=None, is_ishtar_user=None ): return autocomplete_person( request, person_types=person_types, attached_to=attached_to, is_ishtar_user=is_ishtar_user, permissive=True, ) def autocomplete_user(request): if not request.user.has_perm("ishtar_common.view_person", models.Person): return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term") limit = request.GET.get("limit", 20) try: limit = int(limit) except ValueError: return HttpResponseBadRequest() query = Q() for q in q.split(" "): qu = ( Q(ishtaruser__person__name__icontains=q) | Q(ishtaruser__person__surname__icontains=q) | Q(first_name__icontains=q) | Q(last_name__icontains=q) ) query = query & qu users = models.User.objects.filter(query).distinct()[:limit] values = [] for user in users: try: if user and user.ishtaruser: values.append({"id": user.pk, "value": str(user.ishtaruser)}) except models.User.ishtaruser.RelatedObjectDoesNotExist: pass data = json.dumps(values) return HttpResponse(data, content_type="text/plain") def autocomplete_ishtaruser(request): if not request.user.has_perm("ishtar_common.view_person", models.Person): return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term", "") limit = request.GET.get("limit", 20) try: limit = int(limit) except ValueError: return HttpResponseBadRequest() query = Q() for q in q.split(" "): qu = ( Q(person__name__unaccent__icontains=q) | Q(person__surname__unaccent__icontains=q) | Q(person__raw_name__unaccent__icontains=q) ) query = query & qu users = models.IshtarUser.objects.filter(query).distinct()[:limit] data = json.dumps([{"id": user.pk, "value": str(user)} for user in users]) return HttpResponse(data, content_type="text/plain") def autocomplete_person( request, person_types=None, attached_to=None, is_ishtar_user=None, permissive=False ): all_items = request.user.has_perm("ishtar_common.view_person", models.Person) own_items = False if not all_items: own_items = request.user.has_perm( "ishtar_common.view_own_person", models.Person ) if not all_items and not own_items or not request.GET.get("term"): return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term") limit = request.GET.get("limit", 20) try: limit = int(limit) except ValueError: return HttpResponseBadRequest() query = Q() for q in q.split(" "): qu = ( Q(name__unaccent__icontains=q) | Q(surname__unaccent__icontains=q) | Q(email__unaccent__icontains=q) | Q(attached_to__name__unaccent__icontains=q) ) if permissive: qu = qu | Q(raw_name__unaccent__icontains=q) query = query & qu if attached_to: query = query & Q(attached_to__pk__in=attached_to.split("_")) if person_types and str(person_types) != "0": try: typs = [int(tp) for tp in person_types.split("_") if tp] typ = models.PersonType.objects.filter(pk__in=typs).all() query = query & Q(person_types__in=typ) except (ValueError, ObjectDoesNotExist): pass if is_ishtar_user: query = query & Q(ishtaruser__isnull=False) if own_items: if not hasattr(request.user, "ishtaruser"): return HttpResponse(json.dumps([]), content_type="text/plain") query &= models.Person.get_query_owns(request.user.ishtaruser) persons = models.Person.objects.filter(query).distinct()[:limit] data = json.dumps( [{"id": person.pk, "value": str(person)} for person in persons if person] ) return HttpResponse(data, content_type="text/plain") def autocomplete_department(request): if not request.GET.get("term"): return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term") q = unicodedata.normalize("NFKD", q).encode("ascii", "ignore").decode() query = Q() for q in q.split(" "): extra = Q(label__icontains=q) | Q(number__istartswith=q) query = query & extra limit = 20 departments = models.Department.objects.filter(query).distinct()[:limit] data = json.dumps( [{"id": department.pk, "value": str(department)} for department in departments] ) return HttpResponse(data, content_type="text/plain") def autocomplete_town(request): if not request.GET.get("term"): return HttpResponse(content_type="text/plain") q = request.GET.get("term") q = unicodedata.normalize("NFKD", q).encode("ascii", "ignore").decode() query = Q() for q in q.split(" "): extra = Q(name__icontains=q) if settings.COUNTRY == "fr": extra |= Q(numero_insee__istartswith=q) query &= extra limit = 20 towns = models.Town.objects.filter(query).distinct()[:limit] data = json.dumps([{"id": town.pk, "value": str(town)} for town in towns]) return HttpResponse(data, content_type="text/plain") def autocomplete_advanced_town(request, department_id=None, state_id=None): if not request.GET.get("term"): return HttpResponse(content_type="text/plain") q = request.GET.get("term") q = unicodedata.normalize("NFKD", q).encode("ascii", "ignore").decode() query = Q() for q in q.split(" "): extra = Q(name__icontains=q) if settings.COUNTRY == "fr": extra = extra | Q(numero_insee__istartswith=q) if not department_id: extra = extra | Q(departement__label__istartswith=q) query = query & extra if department_id: query = query & Q(departement__number__iexact=department_id) if state_id: query = query & Q(departement__state__number__iexact=state_id) limit = 20 towns = models.Town.objects.filter(query).distinct()[:limit] result = [] for town in towns: val = town.name if hasattr(town, "numero_insee"): val += " (%s)" % town.numero_insee result.append({"id": town.pk, "value": val}) data = json.dumps(result) return HttpResponse(data, content_type="text/plain") def autocomplete_document(request): if not request.GET.get("term"): return HttpResponse(content_type="text/plain") q = request.GET.get("term") q = unicodedata.normalize("NFKD", q).encode("ascii", "ignore").decode() fields = [ "title__icontains", "reference__icontains", "internal_reference__icontains", "isbn__icontains", "authors__person__cached_label__icontains", "authors_raw__icontains", ] query = None for q in q.split(" "): qu = Q(**{fields[0]: q}) for field in fields[1:]: qu |= Q(**{field: q}) query = qu if not query else query & qu limit = 20 items = models.Document.objects.filter(query).exclude(title="").distinct()[:limit] data = json.dumps([{"id": item.pk, "value": str(item)} for item in items]) return HttpResponse(data, content_type="text/plain") def department_by_state(request, state_id=""): if not state_id: data = [] else: departments = models.Department.objects.filter(state__number=state_id) data = json.dumps( [ { "id": department.pk, "number": department.number, "value": str(department), } for department in departments ] ) return HttpResponse(data, content_type="text/plain") def autocomplete_organization(request, orga_type=None): if ( not request.user.has_perm( "ishtar_common.view_organization", models.Organization ) and not request.user.has_perm( "ishtar_common.view_own_organization", models.Organization ) and not request.user.ishtaruser.has_right( "person_search", session=request.session ) ): return HttpResponse("[]", content_type="text/plain") if not request.GET.get("term"): return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term") query = Q() for q in q.split(" "): extra = Q(cached_label__unaccent__icontains=q) query = query & extra if orga_type: try: typs = [int(tp) for tp in orga_type.split("_") if tp] typ = models.OrganizationType.objects.filter(pk__in=typs).all() query = query & Q(organization_type__in=typ) except (ValueError, ObjectDoesNotExist): pass limit = 15 organizations = models.Organization.objects.filter(query).distinct()[:limit] data = json.dumps([{"id": org.pk, "value": str(org)} for org in organizations]) return HttpResponse(data, content_type="text/plain") def autocomplete_author(request): if not request.user.has_perm( "ishtar_common.view_author", models.Author ) and not request.user.has_perm("ishtar_common.view_own_author", models.Author): return HttpResponse("[]", content_type="text/plain") if not request.GET.get("term"): return HttpResponse("[]", content_type="text/plain") q = request.GET.get("term") query = Q() for q in q.split(" "): extra = ( Q(person__name__icontains=q) | Q(person__surname__icontains=q) | Q(person__email__icontains=q) | Q(author_type__label__icontains=q) ) query = query & extra limit = 15 authors = models.Author.objects.filter(query).distinct()[:limit] data = json.dumps([{"id": author.pk, "value": str(author)} for author in authors]) return HttpResponse(data, content_type="text/plain") new_person = new_qa_item(models.Person, forms.PersonForm, page_name=_("New person")) modify_person = modify_qa_item(models.Person, forms.PersonForm) detail_person = get_short_html_detail(models.Person) new_person_noorga = new_qa_item( models.Person, forms.NoOrgaPersonForm, page_name=_("New person") ) new_organization = new_qa_item( models.Organization, forms.OrganizationForm, page_name=_("New organization") ) show_organization = show_item(models.Organization, "organization") get_organization = get_item(models.Organization, "get_organization", "organization") modify_organization = modify_qa_item(models.Organization, forms.OrganizationForm) detail_organization = get_short_html_detail(models.Organization) new_author = new_qa_item(models.Author, forms.AuthorForm, page_name=_("New author")) show_person = show_item(models.Person, "person") get_person = get_item(models.Person, "get_person", "person") get_person_for_account = get_item( models.Person, "get_person", "person", own_table_cols=models.Person.TABLE_COLS_ACCOUNT, ) get_ishtaruser = get_item(models.IshtarUser, "get_ishtaruser", "ishtaruser") def action(request, action_slug, obj_id=None, *args, **kwargs): """ Action management """ if not check_permission(request, action_slug, obj_id): not_permitted_msg = ugettext("Operation not permitted.") return HttpResponse(not_permitted_msg) request.session["CURRENT_ACTION"] = action_slug dct = {} globals_dct = globals() if action_slug in globals_dct: return globals_dct[action_slug](request, dct, obj_id, *args, **kwargs) return render(request, "index.html", dct) def dashboard_main(request, dct, obj_id=None, *args, **kwargs): """ Main dashboard """ app_list = [] profile = models.get_current_profile() if profile.files: app_list.append((_("Archaeological files"), "files")) app_list.append((_("Operations"), "operations")) if profile.context_record: app_list.append((_("Context records"), "contextrecords")) if profile.find: app_list.append((_("Finds"), "finds")) if profile.warehouse: app_list.append((_("Treatment requests"), "treatmentfiles")) app_list.append((_("Treatments"), "treatments")) dct = {"app_list": app_list} return render(request, "ishtar/dashboards/dashboard_main.html", dct) DASHBOARD_FORMS = { "files": DashboardFormFile, "operations": DashboardFormOpe, "treatments": DashboardTreatmentForm, "treatmentfiles": DashboardTreatmentFileForm, } def dashboard_main_detail(request, item_name): """ Specific tab of the main dashboard """ if item_name == "users": dct = {"ishtar_users": models.UserDashboard()} return render( request, "ishtar/dashboards/dashboard_main_detail_users.html", dct ) form = None slicing, date_source, fltr, show_detail = "year", None, {}, False profile = models.get_current_profile() has_form = ( (item_name == "files" and profile.files) or item_name == "operations" or (item_name in ("treatmentfiles", "treatments") and profile.warehouse) ) if has_form: slicing = "month" if item_name in DASHBOARD_FORMS: if request.method == "POST": form = DASHBOARD_FORMS[item_name](request.POST) if form.is_valid(): slicing = form.cleaned_data["slicing"] fltr = form.get_filter() if hasattr(form, "get_date_source"): date_source = form.get_date_source() if hasattr(form, "get_show_detail"): show_detail = form.get_show_detail() else: form = DASHBOARD_FORMS[item_name]() lbl, dashboard = None, None dashboard_kwargs = {} if has_form: dashboard_kwargs = {"slice": slicing, "fltr": fltr, "show_detail": show_detail} # date_source is only relevant when the form has set one if date_source: dashboard_kwargs["date_source"] = date_source if item_name == "files" and profile.files: lbl, dashboard = ( _("Archaeological files"), models.Dashboard(File, **dashboard_kwargs), ) elif item_name == "operations": from archaeological_operations.models import Operation lbl, dashboard = ( _("Operations"), models.Dashboard(Operation, **dashboard_kwargs), ) elif item_name == "contextrecords" and profile.context_record: lbl, dashboard = ( _("Context records"), models.Dashboard(ContextRecord, slice=slicing, fltr=fltr), ) elif item_name == "finds" and profile.find: lbl, dashboard = (_("Finds"), models.Dashboard(Find, slice=slicing, fltr=fltr)) elif item_name == "treatmentfiles" and profile.warehouse: lbl, dashboard = ( _("Treatment requests"), models.Dashboard(TreatmentFile, **dashboard_kwargs), ) elif item_name == "treatments" and profile.warehouse: if "date_source" not in dashboard_kwargs: dashboard_kwargs["date_source"] = "start" lbl, dashboard = ( _("Treatments"), models.Dashboard(Treatment, **dashboard_kwargs), ) if not lbl: raise Http404 dct = { "lbl": lbl, "dashboard": dashboard, "item_name": item_name.replace("-", "_"), "VALUE_QUOTE": "" if slicing == "year" else "'", "form": form, "slicing": slicing, } n = datetime.datetime.now() dct["unique_id"] = ( dct["item_name"] + "_" + "%d_%d_%d" % (n.minute, n.second, n.microsecond) ) return render(request, "ishtar/dashboards/dashboard_main_detail.html", dct) def reset_wizards(request): # dynamically execute each reset_wizards of each ishtar app for app in settings.INSTALLED_APPS: if app == "ishtar_common": # no need for infinite recursion continue try: module = __import__(app) except ImportError: continue if hasattr(module, "views") and hasattr(module.views, "reset_wizards"): module.views.reset_wizards(request) return redirect(reverse("start")) ITEM_PER_PAGE = 20 def merge_action(model, form, key, name_key="name"): def merge(request, page=1): current_url = key + "_merge" if not page: page = 1 page = int(page) FormSet = modelformset_factory( model.merge_candidate.through, form=form, formset=forms.MergeFormSet, extra=0, ) q = model.merge_candidate.through.objects count = q.count() max_page = count // ITEM_PER_PAGE if count % ITEM_PER_PAGE != 0: max_page += 1 context = { "current_url": current_url, "current_page": page, "max_page": max_page, } if page < max_page: context["next_page"] = page + 1 if page > 1: context["previous_page"] = page - 1 item_nb = (page - 1) * ITEM_PER_PAGE item_nb_1 = item_nb + ITEM_PER_PAGE from_key = "from_" + key to_key = "to_" + key queryset = q.all().order_by(from_key + "__" + name_key)[item_nb:item_nb_1] FormSet.from_key = from_key FormSet.to_key = to_key if request.method == "POST": context["formset"] = FormSet(request.POST, queryset=queryset) if context["formset"].is_valid(): context["formset"].merge() return redirect(reverse(current_url, kwargs={"page": page})) else: context["formset"] = FormSet(queryset=queryset) return render(request, "ishtar/merge_" + key + ".html", context) return merge def regenerate_external_id(request): if not request.user.is_superuser: raise Http404() model = None for key in request.GET: model = get_model_by_slug(key) if model: break if not model: raise Http404() try: item = model.objects.get(pk=request.GET[model.SLUG]) except model.DoesNotExist: raise Http404() item.regenerate_external_id() return HttpResponseRedirect(reverse("success")) person_merge = merge_action(models.Person, forms.MergePersonForm, "person") organization_merge = merge_action( models.Organization, forms.MergeOrganizationForm, "organization" ) class IshtarMixin(object): page_name = "" def get_context_data(self, **kwargs): context = super(IshtarMixin, self).get_context_data(**kwargs) context["page_name"] = self.page_name return context class JSONResponseMixin: """ Render a JSON response. """ def render_to_response(self, context, **response_kwargs): return JsonResponse(self.get_data(context), **response_kwargs) def get_data(self, context): return context class LoginRequiredMixin(object): @method_decorator(login_required) def dispatch(self, request, *args, **kwargs): return super(LoginRequiredMixin, self).dispatch(request, *args, **kwargs) class AdminLoginRequiredMixin(LoginRequiredMixin): def dispatch(self, request, *args, **kwargs): if not request.user.is_staff: return redirect(reverse("start")) return super(AdminLoginRequiredMixin, self).dispatch(request, *args, **kwargs) class ProfileEdit(LoginRequiredMixin, FormView): template_name = "ishtar/forms/profile.html" form_class = forms.ProfilePersonForm def dispatch(self, request, *args, **kwargs): if kwargs.get("pk"): try: profile = models.UserProfile.objects.get( pk=kwargs["pk"], person__ishtaruser__user_ptr=request.user ) except models.UserProfile.DoesNotExist: # cannot edit a profile that is not yours... return redirect(reverse("index")) current_changed = False # the profile edited became the current profile if not profile.current: current_changed = True profile.current = True # force post-save in case of many current profile profile.save() if current_changed: clean_session_cache(request.session) return super(ProfileEdit, self).dispatch(request, *args, **kwargs) def get_success_url(self): return reverse("profile") def get_form_kwargs(self): kwargs = super(ProfileEdit, self).get_form_kwargs() kwargs["user"] = self.request.user return kwargs def get_context_data(self, **kwargs): data = super(ProfileEdit, self).get_context_data(**kwargs) data["page_name"] = _("Current profile") return data def form_valid(self, form): form.save(self.request.session) return HttpResponseRedirect(self.get_success_url()) class DynamicModelView: def get_model(self, kwargs): app = kwargs.get("app").replace("-", "_") model_name = "".join( [part.capitalize() for part in kwargs.get("model_name").split("-")] ) try: return apps.get_model(app, model_name) except LookupError: raise Http404() class QRCodeView(DynamicModelView, IshtarMixin, LoginRequiredMixin, View): def get(self, request, *args, **kwargs): model = self.get_model(kwargs) try: item = model.objects.get(pk=kwargs.get("pk")) assert hasattr(item, "qrcode") except (model.DoesNotExist, AssertionError): raise Http404() if not item.qrcode or not item.qrcode.name: item.generate_qrcode(request=self.request) if not item.qrcode or not item.qrcode.name: # generation has failed raise Http404() with open(settings.MEDIA_ROOT + os.sep + item.qrcode.name, "rb") as f: return HttpResponse(f.read(), content_type="image/png") class GenerateView(IshtarMixin, LoginRequiredMixin, View): def get_template(self, template_slug): try: return models.DocumentTemplate.objects.get( slug=template_slug, available=True, for_labels=False ) except models.DocumentTemplate.DoesNotExist: raise Http404() def publish(self, tpl, objects): return tpl.publish(objects[0]) def get_items(self, request, model): item_pk = self.kwargs.get("item_pk") try: object = model.objects.get(pk=item_pk) if not object.can_view(request): raise Http404() except model.DoesNotExist: raise Http404() return [object] def get(self, request, *args, **kwargs): template_slug = kwargs.get("template_slug") tpl = self.get_template(template_slug) app, __, model_name = tpl.associated_model.klass.split(".") model = apps.get_model(app, model_name) objects = self.get_items(request, model) if not objects: return HttpResponse(content_type="text/plain") document = self.publish(tpl, objects) if not document: return HttpResponse(content_type="text/plain") with open(document, "rb") as f: response = HttpResponse( f.read(), content_type="application/vnd.oasis.opendocument.text" ) response["Content-Disposition"] = "attachment; filename={}".format( document.split(os.sep)[-1] ) return response class GenerateLabelView(GenerateView): def get_template(self, template_slug): try: return models.DocumentTemplate.objects.get( slug=template_slug, available=True, for_labels=True ) except models.DocumentTemplate.DoesNotExist: raise Http404() def publish(self, tpl, objects): return tpl.publish_labels(objects) def get_items(self, request, model): # rights are managed by get_item get_list = get_item(model, None, model.SLUG, own_table_cols=["id"])( request, no_link=True, no_limit=True ) item_list = json.loads(get_list.content.decode("utf-8"))["rows"] try: objects = [model.objects.get(pk=int(dct["id"])) for dct in item_list] except model.DoesNotExist: raise Http404() return objects class GlobalVarEdit(IshtarMixin, AdminLoginRequiredMixin, ModelFormSetView): template_name = "ishtar/formset.html" model = models.GlobalVar factory_kwargs = {"extra": 1, "can_delete": True} page_name = _("Global variables") fields = ["slug", "value", "description"] class NewImportView(IshtarMixin, LoginRequiredMixin, CreateView): template_name = "ishtar/form.html" model = models.Import form_class = forms.NewImportForm page_name = _("New import") def get_success_url(self): return reverse("current_imports") def get_form_kwargs(self): kwargs = super(NewImportView, self).get_form_kwargs() kwargs["user"] = self.request.user return kwargs def form_valid(self, form): user = models.IshtarUser.objects.get(pk=self.request.user.pk) self.object = form.save(user=user) return HttpResponseRedirect(self.get_success_url()) class ImportListView(IshtarMixin, LoginRequiredMixin, ListView): template_name = "ishtar/import_list.html" model = models.Import page_name = _("Current imports") current_url = "current_imports" def get_queryset(self): q = self.model.objects.exclude(state="AC") if self.request.user.is_superuser: return q.order_by("-pk") user = models.IshtarUser.objects.get(pk=self.request.user.pk) return q.filter(user=user).order_by("-pk") def post(self, request, *args, **kwargs): for field in request.POST: if not field.startswith("import-action-") or not request.POST[field]: continue # prevent forged forms try: imprt = models.Import.objects.get(pk=int(field.split("-")[-1])) except (models.Import.DoesNotExist, ValueError): continue if not self.request.user.is_superuser: # user can only edit his own imports user = models.IshtarUser.objects.get(pk=self.request.user.pk) if imprt.user != user: continue action = request.POST[field] if action == "D": return HttpResponseRedirect( reverse("import_delete", kwargs={"pk": imprt.pk}) ) elif action == "A": imprt.initialize( user=self.request.user.ishtaruser, session_key=request.session.session_key, ) elif action == "I": if settings.USE_BACKGROUND_TASK: imprt.delayed_importation(request, request.session.session_key) else: imprt.importation() elif action == "CH": if settings.USE_BACKGROUND_TASK: imprt.delayed_check_modified(request.session.session_key) else: imprt.check_modified() elif action == "IS": if imprt.current_line is None: imprt.current_line = imprt.skip_lines imprt.save() return HttpResponseRedirect( reverse( "import_step_by_step", args=[imprt.pk, imprt.current_line + 1] ) ) elif action == "AC": imprt.archive() elif action in ("F", "FE"): imprt.unarchive(action) return HttpResponseRedirect(reverse(self.current_url)) def get_context_data(self, **kwargs): dct = super(ImportListView, self).get_context_data(**kwargs) dct["autorefresh_available"] = settings.USE_BACKGROUND_TASK return dct class ImportStepByStepView(IshtarMixin, LoginRequiredMixin, TemplateView): template_name = "ishtar/import_step_by_step.html" page_name = _("Import step by step") current_url = "import_step_by_step" def get_import(self): try: self.imprt_obj = models.Import.objects.get(pk=int(self.kwargs["pk"])) except (models.Import.DoesNotExist, ValueError): raise Http404 if not self.request.user.is_superuser: # user can only edit his own imports user = models.IshtarUser.objects.get(pk=self.request.user.pk) if self.imprt_obj.user != user: raise Http404 if not hasattr(self, "current_line_number"): self.current_line_number = int(self.kwargs["line_number"]) - 1 def update_csv(self, request): prefix = "col-" submited_line = [ (int(k[len(prefix) :]), request.POST[k]) for k in request.POST if k.startswith(prefix) ] updated_line = [value for line, value in sorted(submited_line)] filename = self.imprt_obj.imported_file.path with open(filename, "r", encoding=self.imprt_obj.encoding) as f: reader = csv.reader(f) lines = [] for idx, line in enumerate(reader): if idx == self.current_line_number: line = updated_line line = [ v.encode(self.imprt_obj.encoding, errors="replace") for v in line ] lines.append(line) with open(filename, "w") as f: writer = csv.writer(f, **CSV_OPTIONS) writer.writerows(lines) def import_line(self, request, *args, **kwargs): try: self.imprt, data = self.imprt_obj.importation( line_to_process=self.current_line_number, return_importer_and_data=True ) except IOError as e: self.errors = [str(e)] return super(ImportStepByStepView, self).get(request, *args, **kwargs) if self.imprt_obj.get_number_of_lines() >= self.current_line_number: self.current_line_number += 1 else: self.current_line_number = None self.imprt_obj.current_line = self.current_line_number self.imprt_obj.save() return self.current_line_number def post(self, request, *args, **kwargs): if not request.POST or request.POST.get("valid", None) not in ( "change-csv", "import", "change-page", ): return self.get(request, *args, **kwargs) self.get_import() if request.POST.get("valid") == "change-page": return HttpResponseRedirect( reverse( "import_step_by_step", args=[self.imprt_obj.pk, request.POST.get("line-to-go", None)], ) ) if request.POST.get("valid") == "change-csv": self.update_csv(request) return self.get(request, *args, **kwargs) if not self.import_line(request, *args, **kwargs): return HttpResponseRedirect(reverse("current_imports")) else: return HttpResponseRedirect( reverse( "import_step_by_step", args=[self.imprt_obj.pk, self.current_line_number + 1], ) ) def get(self, request, *args, **kwargs): self.get_import() self.imprt = None self.errors, self.new_data = None, None try: self.imprt, data = self.imprt_obj.importation( simulate=True, line_to_process=self.current_line_number, return_importer_and_data=True, ) except IOError as e: self.errors = [None, None, str(e)] return super(ImportStepByStepView, self).get(request, *args, **kwargs) if not data or not data[0]: self.errors = self.imprt.errors if not self.errors: self.errors = [("", "", _("No data provided"))] else: self.new_data = data[:] return super(ImportStepByStepView, self).get(request, *args, **kwargs) def get_pagination(self, dct): pagination_step = 5 only_modified = not self.kwargs.get("all_pages", False) dct["all"] = not only_modified dct["import_url"] = ( "import_step_by_step" if only_modified else "import_step_by_step_all" ) line_nb = self.imprt_obj.get_number_of_lines() total_line_nb = self.imprt_obj.skip_lines + line_nb delta = 0 already_imported = [] if self.imprt_obj.imported_line_numbers: already_imported = self.imprt_obj.imported_line_numbers.split(",") changes = [] if self.imprt_obj.changed_line_numbers: changes = self.imprt_obj.changed_line_numbers.split(",") dct["page_is_last"] = self.current_line_number == line_nb # label, number, enabled, is_imported, has_changes dct["page_numbers"] = [] # first pass for the delta current = 0 for idx in range(self.imprt_obj.skip_lines, total_line_nb): imported = str(idx) in already_imported changed = str(idx) in changes if only_modified and (imported or not changed): continue current += 1 if idx == self.current_line_number - 1: delta = int(current / pagination_step) current, has_next, previous = 0, False, None for idx in range(self.imprt_obj.skip_lines, total_line_nb): if current >= ((delta + 1) * pagination_step): has_next = idx break imported = str(idx) in already_imported changed = str(idx) in changes if only_modified and (imported or not changed): continue current += 1 if current <= (delta * pagination_step): previous = idx continue nb = idx + 1 dct["page_numbers"].append((nb, nb, True, imported, changed)) if previous: dct["page_numbers"].insert( 0, (_("Previous"), previous + 1, True, False, True) ) else: dct["page_numbers"].insert( 0, (_("Previous"), self.imprt_obj.skip_lines, False, False, True) ) if has_next: dct["page_numbers"].append((_("Next"), has_next + 1, True, False, True)) else: dct["page_numbers"].append((_("Next"), total_line_nb, False, False, True)) def get_context_data(self, **kwargs): dct = super(ImportStepByStepView, self).get_context_data(**kwargs) dct["import"] = self.imprt_obj dct["line_number_displayed"] = self.current_line_number + 1 dct["line_is_imported"] = self.imprt_obj.line_is_imported( self.current_line_number ) self.get_pagination(dct) dct["errors"] = self.errors if self.errors: if self.imprt.current_csv_line: headers = [ f.label if f else _("Not imported") for f in self.imprt.get_formaters() ] dct["values"] = zip( range(1, len(headers) + 1), headers, self.imprt.current_csv_line ) return dct headers, self.path_to_column, interpreted_values = [], {}, [] for idx, formater in enumerate(self.imprt.get_formaters()): if not formater: headers.append(_("Not imported")) interpreted_values.append("–") continue lbl = formater.label if formater.comment: lbl += ' ' ' {} {} ' "".format(col, _("Col. "), col) ) value_dct[label] = dct[k] return value_dct def list_to_html(self, lst): if not lst: return _("* empty *") return ( "
  • " + "
  • ".join( [self.get_value(item) for item in lst] ) + "
" ) def get_value(self, item): if hasattr(item, "SHOW_URL"): return "{}{}".format(str(item), simple_link_to_window(item)) if hasattr(item, "explicit_label"): return item.explicit_label if item in (None, [], [None]): return _("* empty *") if isinstance(item, list): return self.list_to_html(item) return str(item) class ImportListTableView(ImportListView): template_name = "ishtar/import_table.html" current_url = "current_imports_table" def get_context_data(self, **kwargs): dct = super(ImportListTableView, self).get_context_data(**kwargs) dct["AJAX"] = True dct["MESSAGES"] = [] request = self.request if "messages" in request.session and request.session["messages"]: for message, message_type in request.session["messages"]: dct["MESSAGES"].append((message, message_type)) request.session["messages"] = [] if ( "current_import_id" in request.session and request.session["current_import_id"] ): dct["refreshed_pks"] = request.session.pop("current_import_id") return dct class ImportOldListView(ImportListView): page_name = _("Old imports") current_url = "old_imports" def get_queryset(self): q = self.model.objects.filter(state="AC") if self.request.user.is_superuser: return q.order_by("-creation_date") user = models.IshtarUser.objects.get(pk=self.request.user.pk) return q.filter(user=user).order_by("-creation_date") class ImportLinkView(IshtarMixin, LoginRequiredMixin, ModelFormSetView): template_name = "ishtar/formset_import_match.html" model = models.TargetKey page_name = _("Link unmatched items") factory_kwargs = { "extra": 0, } form_class = forms.TargetKeyForm formset_class = forms.TargetKeyFormset max_fields = 250 def get_formset_kwargs(self): kwargs = super(ImportLinkView, self).get_formset_kwargs() kwargs["user"] = self.request.user return kwargs def full_queryset(self): return self.model.objects.filter( is_set=False, associated_import=self.kwargs["pk"] ) def get_queryset(self): return self.full_queryset()[: self.max_fields] def get_context_data(self, **kwargs): data = super().get_context_data(**kwargs) total = self.full_queryset().count() if total > self.max_fields: data["MAX_FIELDS_REACHED"] = self.max_fields data["TOTAL"] = total return data def get_success_url(self): return reverse("import_link_unmatched", args=[self.kwargs["pk"]]) class ImportDeleteView(IshtarMixin, LoginRequiredMixin, DeleteView): template_name = "ishtar/import_delete.html" model = models.Import page_name = _("Delete import") def get_success_url(self): return reverse("current_imports") class PersonCreate(LoginRequiredMixin, CreateView): model = models.Person form_class = forms.BasePersonForm template_name = "ishtar/person_form.html" def get_success_url(self): return reverse("person_edit", args=[self.object.pk]) class PersonEdit(LoginRequiredMixin, UpdateView): model = models.Person form_class = forms.BasePersonForm template_name = "ishtar/person_form.html" def get_success_url(self): return reverse("person_edit", args=[self.object.pk]) class ManualMergeMixin(object): def form_valid(self, form): self.items = form.get_items() return super(ManualMergeMixin, self).form_valid(form) def get_success_url(self): return reverse( self.redir_url, args=["_".join([str(item.pk) for item in self.items])] ) class PersonManualMerge(ManualMergeMixin, IshtarMixin, LoginRequiredMixin, FormView): form_class = forms.PersonMergeFormSelection template_name = "ishtar/form.html" page_name = _("Merge persons") current_url = "person-manual-merge" redir_url = "person_manual_merge_items" class ManualMergeItemsMixin(object): def get_form_kwargs(self): kwargs = super(ManualMergeItemsMixin, self).get_form_kwargs() kwargs["items"] = self.kwargs["pks"].split("_") return kwargs def form_valid(self, form): self.item = form.merge() return super(ManualMergeItemsMixin, self).form_valid(form) def get_success_url(self): return reverse("display-item", args=[self.item_type, self.item.pk]) class PersonManualMergeItems( ManualMergeItemsMixin, IshtarMixin, LoginRequiredMixin, FormView ): form_class = forms.PersonMergeIntoForm template_name = "ishtar/form.html" page_name = _("Select the main person") current_url = "person-manual-merge-items" item_type = "person" class OrgaManualMerge(ManualMergeMixin, IshtarMixin, LoginRequiredMixin, FormView): form_class = forms.OrgaMergeFormSelection template_name = "ishtar/form.html" page_name = _("Merge organization") current_url = "orga-manual-merge" redir_url = "orga_manual_merge_items" class OrgaManualMergeItems( ManualMergeItemsMixin, IshtarMixin, LoginRequiredMixin, FormView ): form_class = forms.OrgaMergeIntoForm template_name = "ishtar/form.html" page_name = _("Select the main organization") current_url = "orga-manual-merge-items" item_type = "organization" class OrganizationCreate(LoginRequiredMixin, CreateView): model = models.Organization form_class = forms.BaseOrganizationForm template_name = "ishtar/organization_form.html" form_prefix = "orga" def get_form_kwargs(self): kwargs = super(OrganizationCreate, self).get_form_kwargs() if hasattr(self.form_class, "form_prefix"): kwargs.update({"prefix": self.form_class.form_prefix}) return kwargs def get_success_url(self): return reverse("organization_edit", args=[self.object.pk]) class OrganizationEdit(LoginRequiredMixin, UpdateView): model = models.Organization form_class = forms.BaseOrganizationForm template_name = "ishtar/organization_form.html" def get_form_kwargs(self): kwargs = super(OrganizationEdit, self).get_form_kwargs() if hasattr(self.form_class, "form_prefix"): kwargs.update({"prefix": self.form_class.form_prefix}) return kwargs def get_success_url(self): return reverse("organization_edit", args=[self.object.pk]) class OrganizationPersonCreate(LoginRequiredMixin, CreateView): model = models.Person form_class = forms.BaseOrganizationPersonForm template_name = "ishtar/organization_person_form.html" relative_label = _("Corporation manager") def get_context_data(self, *args, **kwargs): data = super(OrganizationPersonCreate, self).get_context_data(*args, **kwargs) data["relative_label"] = self.relative_label return data def get_success_url(self): return reverse("organization_person_edit", args=[self.object.pk]) class OrganizationPersonEdit(LoginRequiredMixin, UpdateView): model = models.Person form_class = forms.BaseOrganizationPersonForm template_name = "ishtar/organization_person_form.html" relative_label = _("Corporation manager") def get_context_data(self, *args, **kwargs): data = super(OrganizationPersonEdit, self).get_context_data(*args, **kwargs) data["relative_label"] = self.relative_label return data def get_success_url(self): return reverse("organization_person_edit", args=[self.object.pk]) # documents new_document_tag = new_qa_item( models.DocumentTag, forms.AddDocumentTagForm, page_name=_("New tag") ) autocomplete_documenttag = get_autocomplete_generic(models.DocumentTag) show_document = show_item(models.Document, "document") get_document = get_item( models.Document, "get_document", "document", search_form=forms.DocumentSelect ) display_document = display_item(models.Document) document_search_wizard = wizards.DocumentSearch.as_view( [("selec-document_search", forms.DocumentFormSelection)], label=_("Document: search"), url_name="search-document", ) class DocumentFormMixin(IshtarMixin, LoginRequiredMixin): form_class = forms.DocumentForm template_name = "ishtar/forms/document.html" model = models.Document def get_context_data(self, **kwargs): data = super(DocumentFormMixin, self).get_context_data(**kwargs) data["extra_form_modals"] = self.form_class.extra_form_modals return data def get_success_url(self): return reverse("edit-document") + "?open_item={}".format(self.object.pk) class DocumentCreateView(DocumentFormMixin, CreateView): page_name = _("Document creation") def get_form_kwargs(self): kwargs = super(DocumentCreateView, self).get_form_kwargs() initial = kwargs.get("initial", {}) for related_key in models.Document.RELATED_MODELS_ALT: model = models.Document._meta.get_field(related_key).related_model if model.SLUG in self.request.GET: try: item = model.objects.get(pk=self.request.GET[model.SLUG]) except model.DoesNotExist: continue initial[related_key] = str(item.pk) if initial: kwargs["initial"] = initial kwargs["user"] = self.request.user return kwargs class DocumentSelectView(IshtarMixin, LoginRequiredMixin, FormView): form_class = forms.DocumentFormSelection template_name = "ishtar/form.html" redir_url = "edit-document" def form_valid(self, form): self.pk = form.cleaned_data["pk"] return super(DocumentSelectView, self).form_valid(form) def get_form_kwargs(self): kwargs = super(DocumentSelectView, self).get_form_kwargs() kwargs["user"] = self.request.user return kwargs def get_context_data(self, **kwargs): data = super(DocumentSelectView, self).get_context_data(**kwargs) if self.request.GET and "open_item" in self.request.GET: data["open_url"] = ( reverse("show-document", args=[self.request.GET["open_item"]]) + "/" ) return data def get_success_url(self): return reverse(self.redir_url, args=[self.pk]) class DocumentEditView(DocumentFormMixin, UpdateView): page_name = _("Document modification") def get_form_kwargs(self): kwargs = super(DocumentEditView, self).get_form_kwargs() try: document = models.Document.objects.get(pk=self.kwargs.get("pk")) assert check_permission(self.request, "document/edit", document.pk) except (AssertionError, models.Document.DoesNotExist): raise Http404() initial = {} for k in ( list(self.form_class.base_fields.keys()) + models.Document.RELATED_MODELS ): value = getattr(document, k) if hasattr(value, "all"): value = ",".join([str(v.pk) for v in value.all()]) if hasattr(value, "pk"): value = value.pk initial[k] = value # main image initialisation if document.image: kwargs["main_items_fields"] = {} for k in models.Document.RELATED_MODELS: kwargs["main_items_fields"][k] = [] for related_item in getattr(document, k).all(): key = "{}_{}_main_image".format(k, related_item.pk) kwargs["main_items_fields"][k].append( (key, "{} - {}".format(_("Main image for"), related_item)) ) if related_item.main_image == document: initial[key] = True kwargs["initial"] = initial kwargs["user"] = self.request.user self.document = document return kwargs def get_context_data(self, **kwargs): kwargs = super(DocumentEditView, self).get_context_data(**kwargs) rel = self.document.cache_related_label if len(rel) == 1000: # truncated rel += " (...)" kwargs["item_related_label"] = rel return kwargs document_deletion_steps = [ ("selec-document_deletion", forms.DocumentFormMultiSelection), ("final-document_deletion", FinalDeleteForm), ] document_deletion_wizard = wizards.DocumentDeletionWizard.as_view( document_deletion_steps, label=_("Document deletion"), url_name="document_deletion", ) def document_delete(request, pk): if not wizard_is_available(document_deletion_wizard, request, models.Document, pk): return HttpResponseRedirect("/") wizard_url = "document_deletion" wizards.DocumentDeletionWizard.session_set_value( request, "selec-" + wizard_url, "pks", pk, reset=True ) return redirect(reverse(wizard_url, kwargs={"step": "final-" + wizard_url})) def get_bookmark(request, pk): try: sq = models.SearchQuery.objects.get( pk=pk, profile__person__ishtaruser__user_ptr=request.user ) except models.SearchQuery.DoesNotExist: raise Http404() slug = sq.content_type.model_class().SLUG return redirect(reverse(slug + "_search") + "?bookmark={}".format(sq.pk)) def gen_generate_doc(model): def func(request, pk, template_pk=None): if not request.user.has_perm("view_" + model.SLUG, model): return HttpResponse(content_type="text/plain") try: item = model.objects.get(pk=pk) doc = item.publish(template_pk) except model.DoesNotExist: doc = None except TemplateSyntaxError as e: dct = { "error_title": _("Error on your template"), "error": str(e), "back_url": reverse("display-item", args=[item.SLUG, pk]), } template = loader.get_template("error.html") return HttpResponse(template.render(dct, request)) if doc: MIMES = { "odt": "application/vnd.oasis.opendocument.text", "ods": "application/vnd.oasis.opendocument.spreadsheet", } ext = doc.split(".")[-1] doc_name = item.get_filename() + "." + ext mimetype = "text/csv" if ext in MIMES: mimetype = MIMES[ext] with open(doc, "rb") as d: response = HttpResponse(d, content_type=mimetype) response["Content-Disposition"] = "attachment; filename=%s" % doc_name return response return HttpResponse(content_type="text/plain") return func class SearchQueryMixin(object): """ Manage content type and profile init """ def dispatch(self, request, *args, **kwargs): if not request.user.pk: raise Http404() try: self.profile = models.UserProfile.objects.get( current=True, person__ishtaruser__user_ptr=request.user ) except models.UserProfile.DoesNotExist: # no current profile raise Http404() self.app_label = kwargs.get("app_label") self.model = kwargs.get("model") model = self.model if model == "site": model = "archaeologicalsite" try: self.content_type = ContentType.objects.get( app_label=self.app_label.replace("-", "_"), model=model.replace("-", "_"), ) except ContentType.DoesNotExist: raise Http404() return super(SearchQueryMixin, self).dispatch(request, *args, **kwargs) class SearchQueryEdit(SearchQueryMixin, LoginRequiredMixin, FormView): template_name = "ishtar/forms/search_query.html" form_class = forms.SearchQueryForm def get_form_kwargs(self): kwargs = super(SearchQueryEdit, self).get_form_kwargs() kwargs["profile"] = self.profile kwargs["content_type"] = self.content_type return kwargs def get_context_data(self, **kwargs): data = super(SearchQueryEdit, self).get_context_data(**kwargs) data["app_label"] = self.app_label data["model"] = self.model return data def form_valid(self, form): form.save() return HttpResponseRedirect(self.get_success_url()) def get_success_url(self): return reverse("success", args=["bookmark"]) class BookmarkList( SearchQueryMixin, JSONResponseMixin, LoginRequiredMixin, TemplateView ): def get_data(self, context): q = models.SearchQuery.objects.filter( content_type=self.content_type, profile=self.profile ) return { "bookmarks": [ {"label": sq.label, "query": sq.query, "id": sq.id} for sq in q.all() ] } class QRCodeForSearchView(LoginRequiredMixin, FormView): template_name = "ishtar/forms/qrcode_for_search.html" form_class = forms.QRSearchForm def form_valid(self, form): context_data = self.get_context_data() context_data["qr_code"] = form.save() return self.render_to_response(context_data) class SearchQueryDelete(LoginRequiredMixin, DeleteView): model = models.SearchQuery template_name = "ishtar/forms/bookmark_delete.html" page_name = _("Delete bookmark") def dispatch(self, request, *args, **kwargs): if not request.user.pk: raise Http404() try: self.profile = models.UserProfile.objects.get( current=True, person__ishtaruser__user_ptr=request.user ) except models.UserProfile.DoesNotExist: # no current profile raise Http404() try: self.search_query = models.SearchQuery.objects.get( profile=self.profile, pk=kwargs["pk"] ) except models.SearchQuery.DoesNotExist: raise Http404() return super(SearchQueryDelete, self).dispatch(request, *args, **kwargs) def get_context_data(self, **kwargs): data = super(SearchQueryDelete, self).get_context_data(**kwargs) data["modal_size"] = "small" data["page_name"] = _("Bookmark - Delete") data["action_name"] = _("Delete") data["item"] = self.search_query.label data["url"] = reverse("bookmark-delete", args=[self.search_query.pk]) return data def get_success_url(self): return reverse("success", args=["bookmark"]) class AlertList(JSONResponseMixin, LoginRequiredMixin, TemplateView): def dispatch(self, request, *args, **kwargs): if not request.user.pk: raise Http404() try: self.profile = models.UserProfile.objects.get( current=True, person__ishtaruser__user_ptr=request.user ) except models.UserProfile.DoesNotExist: # no current profile raise Http404() return super(AlertList, self).dispatch(request, *args, **kwargs) def get_data(self, context): q = models.SearchQuery.objects.filter(profile=self.profile, is_alert=True) alerts = [] for sq in q.all(): model = sq.content_type.model_class() module = model.__module__.split(".")[0] views = importlib.import_module(module + ".views") try: get_view = getattr(views, "get_" + model.SLUG) except AttributeError: continue nb = get_view(self.request, query={"search_vector": sq.query}, count=True) alerts.append({"label": sq.label, "query_id": sq.pk, "number": nb}) return {"alerts": alerts} class QANotAvailable(IshtarMixin, LoginRequiredMixin, TemplateView): template_name = "ishtar/forms/qa_message.html" modal_size = "small" contexts = { "locked-by-others": _("Some items have been locked by other " "users."), "locked": _("Some items are locked."), } def get_context_data(self, **kwargs): data = super(QANotAvailable, self).get_context_data(**kwargs) data["page_name"] = _("Not available") data["message"] = _("Action not available for these items.") if self.kwargs.get("context"): context = self.kwargs.get("context") if context in self.contexts: data["message"] += " {}".format(self.contexts[context]) return data class QAItemForm(IshtarMixin, LoginRequiredMixin, FormView): template_name = "ishtar/forms/qa_form.html" model = None base_url = None form_class = None page_name = "" success_url = "/success/" modal_size = None #  large, small or None (medium) def get_quick_action(self): # if not listed in QUICK_ACTIONS overload this method return self.model.get_quick_action_by_url(self.base_url) def pre_dispatch(self, request, *args, **kwargs): assert self.model pks = [int(pk) for pk in kwargs.get("pks").split("-")] self.items = list(self.model.objects.filter(pk__in=pks)) if not self.items: raise Http404() # check availability quick_action = self.get_quick_action() if not quick_action: raise Http404() if not quick_action.is_available(user=request.user, session=request.session): for item in self.items: if not quick_action.is_available( user=request.user, session=request.session, obj=item ): raise Http404() self.url = request.get_full_path() def dispatch(self, request, *args, **kwargs): redirected = self.pre_dispatch(request, *args, **kwargs) if redirected: return redirected return super(QAItemForm, self).dispatch(request, *args, **kwargs) def get_form_kwargs(self): kwargs = super(QAItemForm, self).get_form_kwargs() kwargs["items"] = self.items return kwargs def get_context_data(self, **kwargs): data = super(QAItemForm, self).get_context_data(**kwargs) data["url"] = self.url data["items"] = self.items data["modal_size"] = self.modal_size data["page_name"] = "{} – {}".format( self.model._meta.verbose_name, self.page_name ) return data class QAItemEditForm(QAItemForm): form_class_multi = None modal_size = "large" def get_quick_action(self): return self.model.QA_EDIT def pre_dispatch(self, request, *args, **kwargs): self.confirm = kwargs.get("confirm", False) and True redirected = super(QAItemEditForm, self).pre_dispatch(request, *args, **kwargs) if redirected: return redirected if hasattr(self.model, "is_locked"): for item in self.items: if item.is_locked(request.user): redirected = HttpResponseRedirect( reverse("qa-not-available", args=["locked"]) ) return redirected def get_form_class(self): if len(self.items) > 1 and self.form_class_multi: return self.form_class_multi return self.form_class def get_form_kwargs(self): kwargs = super(QAItemEditForm, self).get_form_kwargs() kwargs["confirm"] = self.confirm return kwargs def get_context_data(self, **kwargs): data = super(QAItemEditForm, self).get_context_data(**kwargs) data["page_name"] = "{} – {}".format( self.model._meta.verbose_name, self.model.QA_EDIT.text ) if self.confirm: if "confirm" not in self.url: data["url"] = self.url.split("?")[0] + "confirm/" data["confirm"] = True data["action_name"] = _("Confirm") return data def form_valid(self, form): if not self.confirm: self.confirm = True return self.render_to_response(self.get_context_data(form=self.get_form())) return self.form_save(form) def form_save(self, form): form.save(self.items, self.request.user) return HttpResponseRedirect(reverse("success")) class QABaseLockView(QAItemForm): form_class = forms.QALockForm page_name = _("lock/unlock") def pre_dispatch(self, request, *args, **kwargs): super(QABaseLockView, self).pre_dispatch(request, *args, **kwargs) if [ True for item in self.items if item.lock_user and item.lock_user != request.user ]: url = reverse("qa-not-available", args=["locked-by-others"]) return HttpResponseRedirect(url) def form_valid(self, form): form.save(self.items, self.request.user) return HttpResponseRedirect(reverse("success")) class QAOrganizationForm(QAItemEditForm): model = models.Organization form_class = forms.QAOrganizationFormMulti class QAPersonForm(QAItemEditForm): model = models.Person form_class = forms.QAPersonFormMulti class QADocumentForm(QAItemEditForm): model = models.Document form_class = forms.QADocumentFormMulti class QADocumentDuplicateFormView(QAItemForm): template_name = "ishtar/forms/qa_document_duplicate.html" model = models.Document page_name = _("Duplicate") form_class = forms.QADocumentDuplicateForm base_url = "document-qa-duplicate" def get_form_kwargs(self): kwargs = super(QADocumentDuplicateFormView, self).get_form_kwargs() kwargs["user"] = self.request.user return kwargs def form_valid(self, form): form.save() return HttpResponseRedirect(reverse("success")) def get_context_data(self, **kwargs): data = super(QADocumentDuplicateFormView, self).get_context_data(**kwargs) data["action_name"] = _("Duplicate") return data class QADocumentPackagingFormView(QAItemForm): template_name = "ishtar/forms/qa_document_packaging.html" model = models.Document form_class = forms.QADocumentPackagingForm page_name = _("Packaging") base_url = "document-qa-packaging" def dispatch(self, request, *args, **kwargs): returned = super(QADocumentPackagingFormView, self).dispatch( request, *args, **kwargs ) """ for item in self.items: if item.is_locked(request.user): return HttpResponseRedirect(reverse("qa-not-available")) """ return returned def get_form_kwargs(self): kwargs = super(QADocumentPackagingFormView, self).get_form_kwargs() kwargs["user"] = self.request.user kwargs["prefix"] = "qa-packaging" return kwargs def form_valid(self, form): form.save(self.items, self.request.user) return HttpResponseRedirect(reverse("success")) class DisplayItemView(IshtarMixin, TemplateView): template_name = "ishtar/display_item.html" SHOW_VIEWS = {"document": show_document} ASSOCIATED_MODEL = {"document": models.Document} def dispatch(self, request, *args, **kwargs): if not self.request.user.is_authenticated: return redirect_to_login(reverse("display-item", kwargs=kwargs)) return super(DisplayItemView, self).dispatch(request, *args, **kwargs) def get_context_data(self, *args, **kwargs): data = super(DisplayItemView, self).get_context_data(*args, **kwargs) pk = kwargs.get("pk") item_type = kwargs.get("item_type") if item_type in self.SHOW_VIEWS: data["view_content"] = self.SHOW_VIEWS[item_type](self.request, pk).content if item_type in self.ASSOCIATED_MODEL and hasattr( self.ASSOCIATED_MODEL[item_type], "extra_meta" ): model = self.ASSOCIATED_MODEL[item_type] try: data["extra_meta"] = model.objects.get(pk=pk).extra_meta except model.DoesNotExist: pass else: data["show_url"] = "/show-{}/{}/".format(item_type, pk) return data