#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2010-2025 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import csv from collections import OrderedDict import datetime from io import StringIO import os import pyqrcode import re import requests import shutil import tempfile from urllib.parse import urlparse, quote import zipfile from bootstrap_datepicker.widgets import DateField from django import forms from django.conf import settings from django.contrib.auth import password_validation from django.contrib.auth.models import User from django.contrib.auth.forms import ( UsernameField, AuthenticationForm as AuthAuthenticationForm, PasswordChangeForm as AuthPasswordChangeForm, SetPasswordForm as AuthSetPasswordForm, ) from django.core import validators from django.core.exceptions import ObjectDoesNotExist from django.core.files import File from django.core.validators import MinValueValidator from django.forms.formsets import formset_factory from django.forms.models import BaseModelFormSet, BaseFormSet from django.shortcuts import reverse from django.utils.text import slugify from django.utils.safestring import mark_safe from django.utils.translation import ugettext_lazy as _, pgettext from . import models, models_rest from .models_imports import FORMATER_WIDGETS_DCT from . import widgets from ishtar_common.templatetags.link_to_window import simple_link_to_window from .forms import ( CustomForm, CustomFormSearch, FieldType, file_size_validator, FinalForm, FormHeader, FormPermissionForm, FormSetWithDeleteSwitches, FormSet, HistorySelect, LockForm, ManageOldType, MultiSearchForm, name_validator, IshtarForm, QAForm, reverse_lazy, TableSelect, ) from ishtar_common.data_importer import ImporterError from ishtar_common.utils import ( clean_session_cache, generate_dict_from_list, get_file_from_link, is_downloadable, max_size_help, max_value_current_year, reverse_coordinates, update_data, ) from archaeological_operations.models import Operation from archaeological_context_records.models import ContextRecord from archaeological_finds.models import Find, FindBasket from archaeological_warehouse.models import Container def get_town_field(label=_("Town"), required=True): help_text = _( "

Type name, department code of the " "town you would like to select. The search is insensitive to case." "

\n

Only the first twenty results are displayed but specifying " "the department code is generally sufficient to get the appropriate " "result.

\n

For instance type \"saint denis 93\"" " for getting the french town Saint-Denis in the Seine-Saint-Denis " "department.

" ) # !FIXME hard_link, reverse_lazy doen't seem to work with formsets return forms.IntegerField( widget=widgets.JQueryAutoComplete( "/" + settings.URL_PATH + "autocomplete-town", associated_model=models.Town ), validators=[models.valid_id(models.Town)], label=label, help_text=mark_safe(help_text), required=required, ) def get_advanced_town_field(label=_("Town"), required=True): # !FIXME hard_link, reverse_lazy doen't seem to work with formsets return forms.IntegerField( widget=widgets.JQueryTown( "/" + settings.URL_PATH + "autocomplete-advanced-town" ), validators=[models.valid_id(models.Town)], label=label, required=required, ) def get_person_field(label=_("Person"), required=True, person_types=None): if not person_types: person_types = [] # !FIXME hard_link, reverse_lazy doen't seem to work with formsets widget = None url = "/" + settings.URL_PATH + "autocomplete-person" if person_types: person_types = [ str(models.PersonType.objects.get(txt_idx=person_type).pk) for person_type in person_types ] url += "/" + "_".join(person_types) widget = widgets.JQueryAutoComplete(url, associated_model=models.Person) return forms.IntegerField( widget=widget, label=label, required=required, validators=[models.valid_id(models.Person)], ) class AuthenticationForm(AuthAuthenticationForm): username = UsernameField( widget=forms.TextInput(attrs={"autofocus": True, "class": "no-append"}) ) password = forms.CharField( label=_("Password"), strip=False, widget=forms.PasswordInput( attrs={"autocomplete": "off", "data-toggle": "password"} ), ) class SetPasswordForm(AuthSetPasswordForm): new_password1 = forms.CharField( label=_("New password"), strip=False, help_text=password_validation.password_validators_help_text_html(), widget=forms.PasswordInput( attrs={"autocomplete": "off", "data-toggle": "password"} ), ) new_password2 = forms.CharField( label=_("New password confirmation"), strip=False, widget=forms.PasswordInput( attrs={"autocomplete": "off", "data-toggle": "password"} ), ) class PasswordChangeForm(SetPasswordForm, AuthPasswordChangeForm): old_password = forms.CharField( label=_("Old password"), strip=False, widget=forms.PasswordInput( attrs={"autofocus": True, "autocomplete": "off", "data-toggle": "password"} ), ) class NewItemForm(forms.Form): def __init__(self, *args, **kwargs): self.limits = {} if "limits" in kwargs: limits = kwargs.pop("limits") if limits: for item in limits.split(";"): key, values = item.split("__") self.limits[key] = values.split("-") super(NewItemForm, self).__init__(*args, **kwargs) def limit_fields(self): for key in self.limits: if key in self.fields and hasattr(self.fields[key], "choices"): new_choices = [ (value, lbl) for value, lbl in self.fields[key].choices if str(value) in self.limits[key] ] self.fields[key].choices = new_choices if len(new_choices) == 1: self.fields[key].initial = [new_choices[0][0]] class BaseImportForm(IshtarForm, forms.ModelForm): error_css_class = "error" required_css_class = "required" importer_type = "tab" imported_media_link = forms.URLField( label=_("Associated media (web link to a zip file or a path)"), required=False ) class Meta: model = models.Import fields = ( "name", "importer_type", "imported_file", "associated_group", "skip_lines", "encoding", "csv_sep", "imported_images", "imported_media_link", ) widgets = { "imported_file": widgets.BSClearableFileInput, "imported_images": widgets.BSClearableFileInput, } HEADERS = { "name": FormHeader(_("Import (table)")), "encoding": FormHeader(_("CSV options")), } def __init__(self, *args, **kwargs): user = kwargs.pop("user") super().__init__(*args, **kwargs) self.fields["imported_file"].required = True self.fields["skip_lines"].required = False self.fields["skip_lines"].initial = None self._filter_group(user) self._filter_importer_type(user) if "imported_images" in self.fields: self.fields["imported_images"].validators = [file_size_validator] self.fields["imported_file"].validators = [file_size_validator] self._post_init() def _filter_importer_type_query(self, q, user): ishtaruser = getattr(user, "ishtaruser", None) if not ishtaruser: self.fields["importer_type"].choices = [("", "--")] return if user.is_superuser or ishtaruser.has_permission("ishtar_common.add_import"): return q if not user.ishtaruser.has_permission("ishtar_common.add_own_import"): self.fields["importer_type"].choices = [("", "--")] return q = q.filter(users__pk=user.ishtaruser.pk) return q def _filter_importer_type(self, user): q = models.ImporterType.objects.filter( available=True, is_import=True, type=self.importer_type ) q = self._filter_importer_type_query(q, user) if not q: return self.fields["importer_type"].choices = [("", "--")] + [ (imp.pk, imp.name) for imp in q.all() ] def _filter_group(self, user): groups = models.TargetKeyGroup.objects.filter(available=True) if not user.is_superuser: groups = groups.filter(all_user_can_use=True) if not groups.count(): self.fields.pop("associated_group") else: self.fields["associated_group"].choices = [(None, "--")] + [ (g.pk, str(g)) for g in groups.all() ] BAD_CHARS = ["é", "³", "ô", "Ã\xa0", "é"] def _clean_imported_file(self, types=None): imported_file = self.cleaned_data.get("imported_file", None) if not imported_file: return imported_file_name = imported_file.name.lower() if types: if not [1 for tpe in types if imported_file_name.endswith(tpe)]: if len(types) == 1: msg = str(_("Bad format. Extension of the file must be {}.")).format(types[0][1:]) else: msg = str(_("Bad format. Extension of the file must be: {} or {}.")).format( ", ".join([tpe[1:] for tpe in types[:-1]]), types[-1][1:] ) raise forms.ValidationError(msg) encoding = self.cleaned_data.get("encoding", None) if encoding and imported_file_name.endswith(".csv"): try: imported_file.seek(0) reader = csv.reader(StringIO(imported_file.read().decode(encoding))) idx = 0 for row in reader: for col in row: for char in self.BAD_CHARS: if char in col: raise ValueError() idx += 1 if idx >= 200: break imported_file.seek(0) except (UnicodeDecodeError, ValueError) as e: raise forms.ValidationError( _("This is not a valid CSV file. Check file format and encoding.") ) def clean(self): data = self.cleaned_data if ( data.get("conservative_import", None) and data.get("importer_type") and not data.get("importer_type").unicity_keys ): raise forms.ValidationError( _( "This import type have no unicity type defined. " "Conservative import is not possible." ) ) return data def save(self, user, commit=True): self.instance.user = user if not self.cleaned_data["skip_lines"]: if hasattr(self.cleaned_data["importer_type"], "default_header_len"): self.instance.skip_lines = self.cleaned_data["importer_type"].default_header_len else: self.instance.skip_lines = -1 return super().save(commit) class NewImportForm(BaseImportForm): imported_media_link = forms.URLField( label=_("Associated media (web link to a zip file or a path)"), required=False ) class Meta: model = models.Import fields = ( "name", "importer_type", "imported_file", "associated_group", "skip_lines", "encoding", "csv_sep", "imported_images", "imported_media_link", ) HEADERS = { "name": FormHeader(_("Import (table)")), "encoding": FormHeader(_("CSV options")), "imported_images": FormHeader(_("Documents/Images")), } def __init__(self, *args, **kwargs): self.media_link_is_zip = False super().__init__(*args, **kwargs) def _need_archive(self, data): tpe = data["importer_type"] return tpe.archive_required def clean(self): data = super().clean() if data.get("imported_media_link", None) and data.get("imported_images", None): raise forms.ValidationError( _( "You put either a file or a download link for media " "but not both." ) ) if data.get("imported_images"): try: images = data.get("imported_images") zf = zipfile.ZipFile(images) zf.testzip() except zipfile.BadZipFile: raise forms.ValidationError( _('"Associated images" field must be a valid zip file.') ) types = [".csv", ".ods", ".xls", ".xlsx", ".xlsm"] self._clean_imported_file(types=types) archive_required = self._need_archive(data) if archive_required and ( not data.get("imported_images", None) and not data.get("imported_media_link", None) ): raise forms.ValidationError(_("This importer need a document archive.")) return data def clean_imported_images_link(self): value = self.cleaned_data.get("imported_media_link", None) if value and value.lower().endswith(".zip"): try: if not is_downloadable(value): raise forms.ValidationError("") except (requests.exceptions.RequestException, forms.ValidationError): raise forms.ValidationError( _("Invalid link or no file is available for this link.") ) self.media_link_is_zip = True return value def save(self, user, commit=True): item = super().save(user, commit=commit) if not self.media_link_is_zip: return item try: file_name, temp_file = get_file_from_link(item.imported_media_link) except ValueError: raise forms.ValidationError( _("Bad link for the archive.") ) item.imported_images.save(file_name, File(temp_file)) # media is downloaded - clean the link item.imported_media_link = None item.save() return item class NewImportGISForm(BaseImportForm): error_css_class = "error" required_css_class = "required" importer_type = "gis" class Meta: model = models.Import fields = ( "name", "importer_type", "imported_file", "associated_group", "skip_lines", "encoding", "csv_sep", ) HEADERS = { "name": FormHeader(_("Import (GIS)")), "encoding": FormHeader(_("CSV options")), } def clean_imported_file(self): value = self.cleaned_data.get("imported_file", None) if value: try: ext = value.name.lower().split(".")[-1] if ext not in ["zip", "gpkg", "csv", "ods", "xls", "xlsx", "xlsm"]: raise forms.ValidationError("") if ext == "zip": zip_file = zipfile.ZipFile(value) if zip_file.testzip(): raise forms.ValidationError("") has_correct_file = False for name in zip_file.namelist(): in_ext = name.lower().split(".")[-1] if in_ext in ("shp", "gpkg"): has_correct_file = True break if not has_correct_file: raise forms.ValidationError("") except forms.ValidationError: raise forms.ValidationError( _( "GIS file must be a table or a zip containing a ShapeFile or GeoPackage file." ) ) return value def clean(self): data = super().clean() types = [".zip", ".gpkg", ".csv", ".ods", ".xls", ".xlsx", ".xlsm"] self._clean_imported_file(types=types) return data class NewImportGroupForm(NewImportForm): error_css_class = "error" required_css_class = "required" class Meta: model = models.ImportGroup fields = ( "name", "importer_type", "imported_file", "skip_lines", "encoding", "csv_sep", "imported_images", "imported_media_link", ) HEADERS = { "name": FormHeader(_("Import (group)")), "encoding": FormHeader(_("CSV options")), "imported_images": FormHeader(_("Documents/Images")), } def _filter_importer_type(self, user): q = models.ImporterGroup.objects.filter(available=True) q = self._filter_importer_type_query(q, user) if not q: return self.fields["importer_type"].choices = [("", "--")] + [ (imp.pk, imp.name) for imp in q.all() ] def _filter_group(self, user): pass def _need_archive(self, data): tpe = data["importer_type"] return any(sub.importer_type.archive_required for sub in tpe.importer_types.all()) class TargetKeyForm(forms.ModelForm): class Meta: model = models.TargetKey fields = ("target", "key", "value") widgets = { "key": forms.TextInput(attrs={"readonly": "readonly"}), } target = widgets.SelectReadonlyField(model=models.ImportTarget, label=_("Target")) value = widgets.Select2SimpleField(label=_("Value"), required=False) remember = forms.ChoiceField(label=_("Remember"), choices=[], required=False) NULL_VALUE = "" def __init__(self, *args, **kwargs): self.user = kwargs.pop("user") super(TargetKeyForm, self).__init__(*args, **kwargs) instance = getattr(self, "instance", None) self.associated_import = None if instance and instance.pk: model = instance.target.associated_model ishtaruser = getattr(self.user, "ishtaruser", None) if ( model and ishtaruser.has_permission( f"{model._meta.app_label}.change_{model._meta.model_name}" ) and hasattr(model, "admin_url") ): self.admin_url = instance.target.associated_model.admin_url() self.associated_import = instance.associated_import self.fields["target"].choices = [ (instance.target.pk, instance.target.verbose_name) ] self.fields["key"].widget.attrs["readonly"] = True self.fields["key"].widget.attrs["title"] = str(instance) self.fields["value"].choices = list(instance.target.get_choices()) if not instance.target.column.required: self.fields["value"].choices.insert(1, (self.NULL_VALUE, _("Set to NULL"))) self.fields["key"].required = False self.fields["target"].required = False self.fields["value"].required = False choices = [ ("import", _("this import only")), ("me", _("me")), ] if ( self.associated_import and self.associated_import.associated_group and ( self.associated_import.associated_group.all_user_can_modify or self.user.is_superuser ) ): choices += [ ( "group", str(_("the current group: {}")).format( self.associated_import.associated_group ), ) ] if self.user.is_superuser: choices += [("all", _("all users"))] choices += [("all-importers", _("all users and importers"))] self.fields["remember"].choices = choices self.fields["remember"].widget.attrs["class"] = "auto" self.remember_choices = choices def clean_target(self): instance = getattr(self, "instance", None) if instance and instance.pk: return instance.target else: return self.cleaned_data["target"] def clean_key(self): instance = getattr(self, "instance", None) if instance and instance.pk: return instance.key else: return self.cleaned_data["key"] def save(self, commit=True): try: super().save(commit) except ImporterError: return if not self.cleaned_data.get("value") or not self.user: return if self.cleaned_data["value"] == self.NULL_VALUE: self.instance.value = None self.instance.is_set = True can_edit_group = ( self.associated_import and self.associated_import.associated_group and ( self.associated_import.associated_group.all_user_can_modify or self.user.is_superuser ) ) remember = self.cleaned_data.get("remember") self.instance.associated_import = self.associated_import force_importer_type, force_all = False, False if remember == "import" and self.associated_import: self.instance.associated_user = None self.instance.associated_group = None elif remember == "group" and can_edit_group: self.instance.associated_user = None self.instance.associated_group = self.associated_import.associated_group elif remember == "all" and self.user.is_superuser: self.instance.associated_user = None self.instance.associated_group = None force_importer_type = True elif remember == "all-importers" and self.user.is_superuser: self.instance.associated_user = None self.instance.associated_group = None force_all = True else: # for me! self.instance.associated_user = self.user.ishtaruser self.instance.associated_group = None self.instance.create_itemkey(force_importer_type=force_importer_type, force_all=force_all) self.instance.save() class PreImportForm(IshtarForm): def __init__(self, *args, **kwargs): self._headers = {} self._types = [] self.import_item = kwargs.pop("import_item") super().__init__(*args, **kwargs) readonly = self.import_item.state not in ("C", "AP", "A") for column in self.column_query.order_by("col_number"): q = column.targets if not q.count(): continue target = q.all()[0] field_name, widget_name = FORMATER_WIDGETS_DCT[ target.formater_type.formater_type ] attrs = { "label": column.label, "required": column.required, } q = models.ImportColumnValue.objects.filter( column=column, import_item=self.import_item ) if q.count(): value = q.all()[0].value if target.formater_type.many_split: if value.startswith("['") and value.endswith("']"): value = value[2:-2].split("', '") else: value = "" attrs["initial"] = value if column.description: attrs["help_text"] = column.description if widget_name: attrs["widget"] = getattr(forms, widget_name) key = f"col_{- column.col_number}" form_field = getattr(forms, field_name) model = target.formater_type.associated_model if model: if target.formater_type.many_split: form_field = widgets.Select2MultipleField self._types.append( FieldType(key, model, is_multiple=bool(target.formater_type.many_split)) ) self.fields[key] = form_field(**attrs) if readonly: self.fields[key].widget.attrs["readonly"] = True if not self._headers: self._headers[key] = FormHeader(self.import_item) if self.import_item.importer_type.pre_import_message: self._headers[ key ].help_message = self.import_item.importer_type.pre_import_message self._init_types() self._post_init() @property def column_query(self): return self.import_item.importer_type.columns.filter(col_number__lte=0) def save(self): for column in self.column_query.all(): key = f"col_{-column.col_number}" if key not in self.cleaned_data: continue col_value, __ = models.ImportColumnValue.objects.get_or_create( column=column, import_item=self.import_item ) col_value.value = self.cleaned_data[key] col_value.save() return self.import_item class TargetKeyFormset(BaseModelFormSet): def __init__(self, *args, **kwargs): self.user = kwargs.pop("user") super(TargetKeyFormset, self).__init__(*args, **kwargs) def get_form_kwargs(self, index): kwargs = super(TargetKeyFormset, self).get_form_kwargs(index) kwargs["user"] = self.user return kwargs class OrganizationForm(ManageOldType, NewItemForm): prefix = "organization" form_label = _("Organization") associated_models = { "organization_type": models.OrganizationType, } format_models = { "precise_town_id": models.Town, } name = forms.CharField(label=_("Name"), max_length=300, validators=[name_validator]) organization_type = forms.ChoiceField(label=_("Organization type"), choices=[]) url = forms.URLField(label=_("Web address"), required=False) grammatical_gender = forms.ChoiceField( label=_("Grammatical gender"), choices=[("", "--")] + list(models.GENDER), required=False, help_text=_("Can be used by templates"), ) museum_museofile_id = forms.CharField(label=_("Museofile ID"), required=False) address = forms.CharField(label=_("Address"), widget=forms.Textarea, required=False) address_complement = forms.CharField( label=_("Address complement"), widget=forms.Textarea, required=False ) postal_code = forms.CharField(label=_("Postal code"), max_length=10, required=False) town = forms.CharField(label=_("Town (freeform)"), max_length=30, required=False) precise_town_id = get_town_field(required=False) country = forms.CharField(label=_("Country"), max_length=30, required=False) email = forms.EmailField(label=_("Email"), required=False) phone = forms.CharField(label=_("Phone"), max_length=32, required=False) mobile_phone = forms.CharField( label=_("Mobile phone"), max_length=32, required=False ) PROFILE_FILTER = { "museum": ["museum_museofile_id"] } def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.fields["organization_type"].choices = models.OrganizationType.get_types( initial=self.init_data.get("organization_type") ) self.fields["organization_type"].help_text = models.OrganizationType.get_help() self.limit_fields() def save(self, user, item=None): dct = self.cleaned_data dct["history_modifier"] = user dct["organization_type"] = models.OrganizationType.objects.get( pk=dct["organization_type"] ) if dct["precise_town_id"]: try: dct["precise_town_id"] = models.Town.objects.get( pk=dct["precise_town_id"] ).pk except models.Town.DoesNotExist: dct.pop("precise_town_id") if not item: new_item = models.Organization(**dct) else: new_item = item for k in dct: setattr(new_item, k, dct[k]) new_item.save() return new_item class OrganizationSelect(CustomForm, TableSelect): _model = models.Organization PROFILE_FILTER = { "museum": ["museum_museofile_id"] } search_vector = forms.CharField( label=_("Full text search"), widget=widgets.SearchWidget("ishtar-common", "organization"), ) name = forms.CharField(label=_("Name"), max_length=300) organization_type = forms.ChoiceField(label=_("Type"), choices=[]) precise_town_id = get_town_field() area = forms.ChoiceField(label=_("Area"), choices=[]) museum_museofile_id = forms.CharField(label=_("Museofile ID"), required=False) TYPES = [ FieldType('area', models.Area), ] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.fields["organization_type"].choices = models.OrganizationType.get_types() class OrganizationFormSelection(CustomFormSearch): SEARCH_AND_SELECT = True form_label = _("Organization search") associated_models = {"pk": models.Organization} currents = {"pk": models.Organization} pk = forms.IntegerField( label="", widget=widgets.DataTable( reverse_lazy("get-organization"), OrganizationSelect, models.Organization, source_full=reverse_lazy("get-organization-full"), ), validators=[models.valid_id(models.Organization)], ) class OrganizationFormMultiSelection(MultiSearchForm): form_label = _("Organization search") associated_models = {"pks": models.Organization} pk = forms.CharField( label="", required=True, widget=widgets.DataTable( reverse_lazy("get-organization"), OrganizationSelect, models.Organization, multiple_select=True, source_full=reverse_lazy("get-organization-full"), ), validators=[models.valid_ids(models.Organization)], ) class QAOrganizationFormMulti(QAForm): form_admin_name = _("Organization - Quick action - Modify") form_slug = "organization-quickaction-modify" base_models = ["qa_organization_type"] associated_models = { "qa_organization_type": models.OrganizationType, } MULTI = True REPLACE_FIELDS = ["qa_organization_type", "qa_grammatical_gender", "qa_museum_museofile_id"] SINGLE_FIELDS = ["qa_museum_museofile_id"] PROFILE_FILTER = { "museum": ["qa_museum_museofile_id"] } qa_organization_type = forms.ChoiceField( label=_("Organization type"), required=False ) qa_museum_museofile_id = forms.CharField(label=_("Museofile ID"), required=False) qa_grammatical_gender = forms.ChoiceField( label=_("Grammatical gender"), choices=[("", "--")] + list(models.GENDER), required=False, help_text=_("Can be used by templates"), ) TYPES = [ FieldType("qa_organization_type", models.OrganizationType), ] class ManualMerge(object): def clean_to_merge(self): value = self.cleaned_data.get("to_merge", None) or [] if value: value = value.split(",") values = [] for val in value: try: values.append(int(val)) except ValueError: pass if len(values) < 2: raise forms.ValidationError(_("At least two items have to be " "selected.")) self.cleaned_data["to_merge"] = values return values def get_items(self): items = [] model = self.associated_models["to_merge"] for pk in sorted(self.cleaned_data["to_merge"]): try: items.append(model.objects.get(pk=pk)) except model.DoesNotExist: pass return items class MergeIntoForm(forms.Form): main_item = forms.ChoiceField( label=_("Merge all items into"), choices=[], widget=forms.RadioSelect() ) def __init__(self, *args, **kwargs): self.items = kwargs.pop("items") super(MergeIntoForm, self).__init__(*args, **kwargs) self.fields["main_item"].choices = [] for pk in self.items: try: item = self.associated_model.objects.get(pk=pk) except self.associated_model.DoesNotExist: continue self.fields["main_item"].choices.append( ( item.pk, mark_safe("{} {}".format(simple_link_to_window(item), str(item))), ) ) def merge(self, callback=None, request=None): model = self.associated_model try: main_item = model.objects.get(pk=self.cleaned_data["main_item"]) except model.DoesNotExist: return items, items_pk = [], [main_item.pk] for pk in self.items: if pk == str(main_item.pk): continue try: item = model.objects.get(pk=pk) except model.DoesNotExist: continue items.append(item) items_pk.append(item.pk) if callback: queryset = models.Person.objects.filter(pk__in=items_pk) callback("merge_person", request, "", queryset) for item in items: main_item.merge(item) return main_item class OrgaMergeFormSelection(ManualMerge, forms.Form): SEARCH_AND_SELECT = True form_label = _("Organization to merge") associated_models = {"to_merge": models.Organization} currents = {"to_merge": models.Organization} to_merge = forms.CharField( label="", required=False, widget=widgets.DataTable( reverse_lazy("get-organization"), OrganizationSelect, models.Organization, multiple_select=True, source_full=reverse_lazy("get-organization-full"), ), ) class OrgaMergeIntoForm(MergeIntoForm): associated_model = models.Organization class BaseOrganizationForm(forms.ModelForm): form_prefix = "orga" class Meta: model = models.Organization fields = [ "name", "organization_type", "grammatical_gender", "address", "address_complement", "town", "postal_code", ] class PersonSelect(CustomForm, TableSelect): _model = models.Person search_vector = forms.CharField( label=_("Full text search"), widget=widgets.SearchWidget("ishtar-common", "person"), ) name = forms.CharField(label=_("Name"), max_length=200) surname = forms.CharField(label=_("Surname"), max_length=50) title = forms.ChoiceField(label=_("Title"), choices=[]) salutation = forms.CharField(label=_("Salutation"), max_length=200) email = forms.CharField(label=_("Email"), max_length=75) person_types = forms.ChoiceField(label=_("Type"), choices=[]) attached_to = forms.IntegerField( label=_("Organization"), widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-organization"), associated_model=models.Organization, ), validators=[models.valid_id(models.Organization)], ) precise_town_id = get_town_field(required=False) area = forms.ChoiceField(label=_("Area"), choices=[]) TYPES = [ FieldType("person_types", models.PersonType), FieldType("title", models.TitleType), FieldType('area', models.Area), ] class PersonFormSelection(CustomFormSearch): SEARCH_AND_SELECT = True form_label = _("Person search") associated_models = {"pk": models.Person} currents = {"pk": models.Person} pk = forms.IntegerField( label="", widget=widgets.DataTable( reverse_lazy("get-person"), PersonSelect, models.Person, source_full=reverse_lazy("get-person-full"), ), validators=[models.valid_id(models.Person)], ) class PersonFormMultiSelection(MultiSearchForm): form_label = _("Person search") associated_models = {"pks": models.Person} pk = forms.CharField( label="", required=True, widget=widgets.DataTable( reverse_lazy("get-person"), PersonSelect, models.Person, multiple_select=True, source_full=reverse_lazy("get-person-full"), ), validators=[models.valid_ids(models.Person)], ) class QAPersonFormMulti(QAForm): form_admin_name = _("Person - Quick action - Modify") form_slug = "person-quickaction-modify" base_models = ["qa_title", "qa_person_types"] associated_models = { "qa_title": models.TitleType, "qa_attached_to": models.Organization, "qa_person_types": models.PersonType, } MULTI = True REPLACE_FIELDS = ["qa_title", "qa_attached_to"] qa_title = forms.ChoiceField(label=_("Title"), required=False) qa_attached_to = forms.IntegerField( label=_("Organization"), widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-organization"), associated_model=models.Organization, ), validators=[models.valid_id(models.Organization)], required=False, ) qa_person_types = widgets.Select2MultipleField( label=_("Person types"), required=False ) TYPES = [ FieldType("qa_title", models.TitleType), FieldType("qa_person_types", models.PersonType, is_multiple=True), ] def _get_qa_attached_to(self, value): try: value = models.Organization.objects.get(pk=value).cached_label except models.Organization.DoesNotExist: return "" return value class PersonMergeFormSelection(ManualMerge, forms.Form): SEARCH_AND_SELECT = True form_label = _("Person to merge") associated_models = {"to_merge": models.Person} currents = {"to_merge": models.Person} to_merge = forms.CharField( label="", required=False, widget=widgets.DataTable( reverse_lazy("get-person"), PersonSelect, models.Person, multiple_select=True, source_full=reverse_lazy("get-person-full"), ), ) class PersonMergeIntoForm(MergeIntoForm): associated_model = models.Person class SimplePersonForm(ManageOldType, NewItemForm): extra_form_modals = ["organization"] form_label = _("Identity") associated_models = { "attached_to": models.Organization, "title": models.TitleType, } format_models = { "precise_town_id": models.Town, } title = forms.ChoiceField(label=_("Title"), choices=[], required=False) salutation = forms.CharField(label=_("Salutation"), max_length=200, required=False) surname = forms.CharField( label=_("Surname"), max_length=50, validators=[name_validator] ) name = forms.CharField(label=_("Name"), max_length=200, validators=[name_validator]) raw_name = forms.CharField(label=_("Raw name"), max_length=300, required=False) email = forms.EmailField(label=_("Email"), required=False) phone_desc = forms.CharField( label=_("Phone description"), max_length=300, required=False ) phone = forms.CharField(label=_("Phone"), max_length=32, required=False) phone_desc2 = forms.CharField( label=_("Phone description 2"), max_length=300, required=False ) phone2 = forms.CharField(label=_("Phone 2"), max_length=32, required=False) phone_desc3 = forms.CharField( label=_("Phone description 3"), max_length=300, required=False ) phone3 = forms.CharField(label=_("Phone 3"), max_length=32, required=False) mobile_phone = forms.CharField( label=_("Mobile phone"), max_length=32, required=False ) attached_to = forms.IntegerField( label=_("Current organization"), widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-organization"), associated_model=models.Organization, new=True, ), validators=[models.valid_id(models.Organization)], required=False, ) address = forms.CharField(label=_("Address"), widget=forms.Textarea, required=False) address_complement = forms.CharField( label=_("Address complement"), widget=forms.Textarea, required=False ) postal_code = forms.CharField(label=_("Postal code"), max_length=10, required=False) town = forms.CharField(label=_("Town (freeform)"), max_length=30, required=False) precise_town_id = get_town_field(required=False) country = forms.CharField(label=_("Country"), max_length=30, required=False) alt_address = forms.CharField( label=_("Other address: address"), widget=forms.Textarea, required=False ) alt_address_complement = forms.CharField( label=_("Other address: address complement"), widget=forms.Textarea, required=False, ) alt_postal_code = forms.CharField( label=_("Other address: postal code"), max_length=10, required=False ) alt_town = forms.CharField( label=_("Other address: town"), max_length=30, required=False ) alt_country = forms.CharField( label=_("Other address: country"), max_length=30, required=False ) def __init__(self, *args, **kwargs): super(SimplePersonForm, self).__init__(*args, **kwargs) self.fields["raw_name"].widget.attrs["readonly"] = True self.fields["title"].choices = models.TitleType.get_types( initial=self.init_data.get("title") ) class PersonUserSelect(PersonSelect): ishtaruser__isnull = forms.NullBooleanField(label=_("Already has an account")) profiles__profile_type = forms.ChoiceField(label=_("Profile type"), choices=[]) TYPES = [ FieldType("profiles__profile_type", models.ProfileType), ] class PersonUserFormSelection(PersonFormSelection): SEARCH_AND_SELECT = True form_label = _("Person search") associated_models = {"pk": models.Person} currents = {"pk": models.Person} pk = forms.IntegerField( label="", widget=widgets.DataTable( reverse_lazy("get-person-for-account"), PersonUserSelect, models.Person, table_cols="TABLE_COLS_ACCOUNT", ), validators=[models.valid_id(models.Person)], ) class IshtarUserSelect(TableSelect): _model = models.IshtarUser search_vector = forms.CharField( label=_("Full text search"), widget=widgets.SearchWidget("ishtar-common", "ishtaruser"), ) username = forms.CharField(label=_("Username"), max_length=200) name = forms.CharField(label=_("Name"), max_length=200) surname = forms.CharField(label=_("Surname"), max_length=50) email = forms.CharField(label=_("Email"), max_length=75) person_types = forms.ChoiceField(label=_("Type"), choices=[]) attached_to = forms.IntegerField( label=_("Organization"), widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-organization"), associated_model=models.Organization, ), validators=[models.valid_id(models.Organization)], ) def __init__(self, *args, **kwargs): self.user = kwargs.pop("user") if "user" in kwargs else None super(IshtarUserSelect, self).__init__(*args, **kwargs) self.fields["person_types"].choices = models.PersonType.get_types() class AccountFormSelection(CustomForm, forms.Form): SEARCH_AND_SELECT = True form_label = _("Account search") associated_models = {"pk": models.IshtarUser} currents = {"pk": models.IshtarUser} pk = forms.IntegerField( label="", widget=widgets.DataTable( reverse_lazy("get-ishtaruser"), IshtarUserSelect, models.IshtarUser ), validators=[models.valid_id(models.IshtarUser)], ) class AccountFormMultiSelection(MultiSearchForm): form_label = _("Account search") associated_models = {"pks": models.IshtarUser} pk_key = "pks" pk = forms.CharField( label="", required=False, widget=widgets.DataTable( reverse_lazy("get-ishtaruser"), IshtarUserSelect, models.IshtarUser ), validators=[models.valid_id(models.IshtarUser)], ) class BasePersonForm(forms.ModelForm): class Meta: model = models.Person fields = [ "title", "salutation", "name", "surname", "address", "address_complement", "town", "postal_code", ] class BaseOrganizationPersonForm(forms.ModelForm): class Meta: model = models.Person fields = ["attached_to", "title", "salutation", "name", "surname"] widgets = { "attached_to": widgets.JQueryPersonOrganization( reverse_lazy("autocomplete-organization"), reverse_lazy("organization_create"), model=models.Organization, attrs={"hidden": True}, new=True, ), } def __init__(self, *args, **kwargs): super(BaseOrganizationPersonForm, self).__init__(*args, **kwargs) def save(self, *args, **kwargs): person = super(BaseOrganizationPersonForm, self).save(*args, **kwargs) instance = person.attached_to form = BaseOrganizationForm( self.data, instance=instance, prefix=BaseOrganizationForm.form_prefix ) if form.is_valid(): orga = form.save() if not person.attached_to: person.attached_to = orga person.save() return person class PersonForm(SimplePersonForm): person_types = forms.MultipleChoiceField( label=_("Person type"), choices=[], required=False, widget=forms.CheckboxSelectMultiple, ) def __init__(self, *args, **kwargs): super(PersonForm, self).__init__(*args, **kwargs) self.fields["person_types"].choices = models.PersonType.get_types( initial=self.init_data.get("person_types"), empty_first=False ) self.fields["person_types"].help_text = models.PersonType.get_help() self.limit_fields() def save(self, user, item=None): dct = self.cleaned_data dct["history_modifier"] = user for key in self.associated_models.keys(): if key in dct: if not dct[key]: dct.pop(key) else: model = self.associated_models[key] try: dct[key] = model.objects.get(pk=dct[key]) except model.DoesNotExist: dct.pop(key) person_types = dct.pop("person_types") if not item: new_item = models.Person.objects.create(**dct) else: for k in dct: setattr(item, k, dct[k]) item.save() item.person_types.clear() new_item = item for pt in person_types: new_item.person_types.add(pt) return new_item class NoOrgaPersonForm(PersonForm): def __init__(self, *args, **kwargs): super(NoOrgaPersonForm, self).__init__(*args, **kwargs) self.fields.pop("attached_to") class PersonTypeForm(ManageOldType, forms.Form): form_label = _("Person type") base_model = "person_type" associated_models = {"person_type": models.PersonType} person_type = forms.MultipleChoiceField( label=_("Person type"), choices=[], required=False, widget=widgets.Select2Multiple, ) def __init__(self, *args, **kwargs): super(PersonTypeForm, self).__init__(*args, **kwargs) self.fields["person_type"].choices = models.PersonType.get_types( initial=self.init_data.get("person_type"), empty_first=False ) self.fields["person_type"].help_text = models.PersonType.get_help() class BiographicalNoteForm(CustomForm, ManageOldType, NewItemForm): form_label = _("Biographical note") form_admin_name = _("Biographical note - 010 - General") form_slug = "biographicalnote-general" extra_form_modals = ["organization", "person"] form_label = _("Identity") associated_models = { "organization": models.Organization, "person": models.Person, } denomination = forms.CharField(label=_("Denomination"), max_length=300) biography = forms.CharField(label=_("Biography"), max_length=300, required=False, widget=forms.Textarea) last_name = forms.CharField(label=_("Last name"), max_length=300, required=False) first_name = forms.CharField(label=_("First name"), max_length=300, required=False) birth_year = forms.IntegerField( label=_("Birth year"), validators=[MinValueValidator(100), max_value_current_year], required=False, ) death_year = forms.IntegerField( label=_("Death year"), validators=[MinValueValidator(100), max_value_current_year], required=False, ) person = forms.IntegerField( label=_("Person"), widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-person"), associated_model=models.Person, new=True, ), validators=[models.valid_id(models.Person)], required=False, ) organization = forms.IntegerField( label=_("Organization"), widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-organization"), associated_model=models.Organization, new=True, ), validators=[models.valid_id(models.Organization)], required=False, ) def save(self, user, item=None): dct = self.cleaned_data dct["history_modifier"] = user for key in self.associated_models.keys(): if key in dct: if not dct[key]: dct.pop(key) else: model = self.associated_models[key] try: dct[key] = model.objects.get(pk=dct[key]) except model.DoesNotExist: dct.pop(key) # get data data = {} for k in list(dct.keys()): if k.startswith("data__"): v = dct.pop(k) keys = k.split("__")[1:] dct = generate_dict_from_list(keys, v) data = update_data(data, dct) if not item: item = models.BiographicalNote.objects.create(**dct) else: for k in dct: setattr(item, k, dct[k]) item.save() # set data if item.data or data: data = update_data(item.data, data) item.data = data item.save() return item class BiographicalNoteEditForm(BiographicalNoteForm, IshtarForm): def __init__(self, *args, **kwargs): self.items = kwargs.pop("items") if "items" in kwargs else None initial = {} self.item = None # init base fields if self.items: self.item = self.items[0] for k in self.base_fields: initial[k] = getattr(self.item, k) kwargs["initial"] = initial super().__init__(*args, **kwargs) # init data fields if not self.item or not self.item.data: return for k in self.fields: if not k.startswith("data__"): continue value = None value = self.item.data for key in k.split("__")[1:]: if not isinstance(value, dict) or key not in value: value = None break value = value[key] if not value: continue self.fields[k].initial = value class AccountForm(IshtarForm): form_label = _("Account") associated_models = {"pk": models.Person} currents = {"pk": models.Person} pk = forms.IntegerField(label="", widget=forms.HiddenInput, required=False) username = forms.CharField(label=_("Account"), max_length=30) email = forms.CharField( label=_("Email"), max_length=75, validators=[validators.validate_email] ) hidden_password = forms.CharField( label=_("New password"), max_length=128, widget=forms.PasswordInput( attrs={"autocomplete": "off", "data-toggle": "password"} ), required=False, validators=[validators.MinLengthValidator(4)], ) hidden_password_confirm = forms.CharField( label=_("New password (confirmation)"), max_length=128, widget=forms.PasswordInput( attrs={"autocomplete": "off", "data-toggle": "password"} ), required=False, ) HEADERS = { "hidden_password": FormHeader( _("New password"), help_message=_( "Keep these fields empty if you do not want to " "change password. On creation, if you leave these " "fields empty, the user will not be able to " "connect." ), ), } def __init__(self, *args, **kwargs): person = None if "initial" in kwargs and "pk" in kwargs["initial"]: try: person = models.Person.objects.get(pk=kwargs["initial"]["pk"]) account = models.IshtarUser.objects.get(person=person).user_ptr if not kwargs["initial"].get("username"): kwargs["initial"]["username"] = account.username if not kwargs["initial"].get("email"): kwargs["initial"]["email"] = account.email except ObjectDoesNotExist: pass if "person" in kwargs: person = kwargs.pop("person") super(AccountForm, self).__init__(*args, **kwargs) if person and (person.raw_name or (person.name and person.surname)): profile = models.IshtarSiteProfile.get_current_profile() if person.name and person.surname: values = [person.name.lower(), person.surname.lower()] else: values = person.raw_name.lower().split(" ") if profile.account_naming_style == "FN" and len(values) > 1: values = values[1:] + [values[0]] self.fields["username"].initial = ".".join([slugify(v) for v in values]) def clean(self): cleaned_data = self.cleaned_data password = cleaned_data.get("hidden_password") if password and password != cleaned_data.get("hidden_password_confirm"): raise forms.ValidationError( _("Your password and confirmation " "password do not match.") ) if not cleaned_data.get("pk"): models.is_unique(User, "username")(cleaned_data.get("username")) if not password: raise forms.ValidationError( _("You must provide a correct " "password.") ) # check username unicity q = User.objects.filter(username=cleaned_data.get("username")) if cleaned_data.get("pk"): q = q.exclude(ishtaruser__person__pk=cleaned_data.get("pk")) if q.count(): raise forms.ValidationError(_("This username already exists.")) return cleaned_data class ProfileForm(ManageOldType): form_label = _("Profiles") base_model = "profile" NO_CUSTOM_FORM = True associated_models = {"profile_type": models.ProfileType, "area": models.Area} profile_type = forms.ChoiceField(label=_("Type"), choices=[]) area = widgets.Select2MultipleField(label=_("Areas"), required=False) name = forms.CharField(label=_("Name"), required=False) expiration_date = DateField(label=_("Expiration date"), required=False) pk = forms.IntegerField(label=" ", widget=forms.HiddenInput, required=False) TYPES = [ FieldType("profile_type", models.ProfileType), FieldType("area", models.Area, is_multiple=True), ] def __init__(self, *args, **kwargs): profile_type = None initial = kwargs.get("initial", None) if initial and "profile_type" in initial: profile_type = initial.get("profile_type") super().__init__(*args, **kwargs) if profile_type: self.fields["profile_type"].widget.choices = [ (k, v) for k, v in self.fields["profile_type"].widget.choices if profile_type == str(k) ] self.fields["profile_type"].widget.attrs.update({"readonly": True}) class ProfileFormsetBase(FormSetWithDeleteSwitches): def clean(self): values = [] if not getattr(self, "cleaned_data", None): return for data in self.cleaned_data: if not data.get("profile_type", None): continue value = (data.get("profile_type", None), data.get("name", None)) if value in values: raise forms.ValidationError(_("Choose different name for profiles.")) values.append(value) return self.cleaned_data ProfileFormset = formset_factory( ProfileForm, can_delete=True, formset=ProfileFormsetBase ) ProfileFormset.form_label = _("Profiles") ProfileFormset.form_admin_name = _("Profiles") ProfileFormset.form_slug = "profiles" ProfileFormset.NO_CUSTOM_FORM = True class FinalAccountForm(FormPermissionForm, forms.Form): final = True form_label = _("Confirm") send_password = forms.BooleanField( label=_("Send the new password by " "email?"), required=False ) def __init__(self, *args, **kwargs): self.is_hidden = True super(FinalAccountForm, self).__init__(*args, **kwargs) class ProfilePersonForm(forms.Form): """ Edit the current profile """ display_forum_entries = forms.BooleanField( label=_("Display forum entries"), required=False ) display_news = forms.BooleanField( label=_("Display news"), required=False ) current_profile = forms.ChoiceField(label=_("Current profile"), choices=[]) name = forms.CharField(label=_("Name"), required=False) profile_type = forms.ChoiceField( label=_("Profile type"), required=False, disabled=True, choices=[] ) auto_pin = forms.BooleanField( label=_("Pin automatically items on creation and modification"), required=False ) # display_pin_menu = forms.BooleanField(label=_("Show pin menu"), required=False) duplicate_profile = forms.BooleanField( label=_("Duplicate this profile"), required=False ) delete_profile = forms.BooleanField( label=_("Delete this profile"), required=False, ) def __init__(self, *args, **kwargs): self.user = kwargs.pop("user") choices, initial = [], kwargs.get("initial", {}) current_profile = None for profile in self.user.ishtaruser.person.profiles.order_by( "name", "profile_type__label" ).all(): if profile.current: current_profile = profile initial["current_profile"] = profile.pk choices.append((profile.pk, str(profile))) if current_profile: initial["name"] = current_profile.name or current_profile.profile_type initial["profile_type"] = current_profile.profile_type.pk initial["auto_pin"] = current_profile.auto_pin initial["display_forum_entries"] = self.user.ishtaruser.display_forum_entries initial["display_news"] = self.user.ishtaruser.display_news # initial["display_pin_menu"] = current_profile.display_pin_menu kwargs["initial"] = initial super(ProfilePersonForm, self).__init__(*args, **kwargs) self.fields["current_profile"].choices = choices current_external_sources = ( [src.pk for src in current_profile.external_sources.all()] if current_profile else [] ) extra_fields = [] q = models_rest.ApiExternalSource.objects.filter(users=self.user.ishtaruser) for src in q.all(): initial = True if src.pk in current_external_sources else False extra_fields.append( ( f"external_source_{src.pk}", forms.BooleanField( label=str(_("External source - {}")).format(src.name), required=False, initial=initial, ), ) ) self.fields[f"external_source_{src.pk}"] = forms.BooleanField( label=str(_("External source - {}")).format(src.name), required=False, initial=initial, ) if extra_fields: # reorder fields fields = OrderedDict() for key in self.fields.keys(): if key == "duplicate_profile": for extra_key, extra_field in extra_fields: fields[extra_key] = extra_fields fields[key] = self.fields[key] self.fields = fields if ( not current_profile or not self.user.ishtaruser.person.profiles.filter( profile_type=current_profile.profile_type ) .exclude(pk=current_profile.pk) .count() ): # cannot delete the current profile if no profile of this type is # available self.fields.pop("delete_profile") if not current_profile: return self.fields["profile_type"].choices = [ (current_profile.profile_type.pk, current_profile.profile_type.name) ] def clean(self): data = self.cleaned_data q = models.UserProfile.objects.filter( person__ishtaruser=self.user.ishtaruser, pk=data["current_profile"] ) if not q.count(): return data profile = q.all()[0] name = data.get("name", "") if ( models.UserProfile.objects.filter( person__ishtaruser=self.user.ishtaruser, name=name ) .exclude(pk=profile.pk) .count() ): raise forms.ValidationError(_("A profile with the same name exists.")) return data def save(self, session): q = models.UserProfile.objects.filter( person__ishtaruser=self.user.ishtaruser, pk=self.cleaned_data["current_profile"], ) if not q.count(): return profile = q.all()[0] # manage deletion if self.cleaned_data.get("delete_profile", None): q = self.user.ishtaruser.person.profiles.filter( profile_type=profile.profile_type ).exclude(pk=profile.pk) if not q.count(): # cannot delete the current profile if no profile of this type # is available return new_current = q.all()[0] new_current.current = True new_current.save() profile.delete() return name = self.cleaned_data["name"] # manage duplication if self.cleaned_data.get("duplicate_profile", None): profile_name = profile.name or profile.profile_type.label if name == profile_name: name += str(_(" (duplicate)")) profile.duplicate(name=name) return profile.current = True profile.name = name profile.auto_pin = self.cleaned_data["auto_pin"] # profile.display_pin_menu = self.cleaned_data["display_pin_menu"] profile.save() profile.external_sources.clear() q = models_rest.ApiExternalSource.objects.filter(users=self.user.ishtaruser) for src in q.all(): if self.cleaned_data.get(f"external_source_{src.pk}", False): profile.external_sources.add(src) self.user.ishtaruser.display_forum_entries = self.cleaned_data["display_forum_entries"] self.user.ishtaruser.display_news = self.cleaned_data["display_news"] self.user.ishtaruser.save() clean_session_cache(session) if "EXTERNAL_SOURCES" in session: session.pop("EXTERNAL_SOURCES") class TownForm(forms.Form): form_label = _("Towns") base_model = "town" NO_CUSTOM_FORM = True associated_models = {"town": models.Town} town = get_town_field(required=False) class TownFormSet(FormSet): def clean(self): """Checks that no towns are duplicated.""" return self.check_duplicate(("town",), _("There are identical towns.")) TownFormset = formset_factory(TownForm, can_delete=True, formset=TownFormSet) TownFormset.form_label = _("Towns") TownFormset.form_admin_name = _("Towns") TownFormset.form_slug = "towns" TownFormset.NO_CUSTOM_FORM = True class MergeFormSet(BaseModelFormSet): from_key = "" to_key = "" def __init__(self, *args, **kwargs): self._cached_list = [] super(MergeFormSet, self).__init__(*args, **kwargs) def merge(self, callback=None, request=None): for form in self.initial_forms: form.merge(callback=callback, request=request) def initial_form_count(self): """ Recopied from django source only get_queryset is changed """ if not (self.data or self.files): return len(self.get_restricted_queryset()) return super(MergeFormSet, self).initial_form_count() def _construct_form(self, i, **kwargs): """ Recopied from django source only get_queryset is changed """ if self.is_bound and i < self.initial_form_count(): # Import goes here instead of module-level because importing # django.db has side effects. # from django.db import connections pk_key = "%s-%s" % (self.add_prefix(i), self.model._meta.pk.name) pk = self.data[pk_key] """pk_field = self.model._meta.pk pk = pk_field.get_db_prep_lookup('exact', pk, connection=connections[self.get_queryset().db])""" pk = self.get_restricted_queryset()[i].pk if isinstance(pk, list): pk = pk[0] kwargs["instance"] = self._existing_object(pk) if i < self.initial_form_count() and not kwargs.get("instance"): kwargs["instance"] = self.get_restricted_queryset()[i] if i >= self.initial_form_count() and self.initial_extra: # Set initial values for extra forms try: kwargs["initial"] = self.initial_extra[i - self.initial_form_count()] except IndexError: pass return super(BaseModelFormSet, self)._construct_form(i, **kwargs) def get_restricted_queryset(self): """ Filter (from, to) when (to, from) is already here """ q = self.queryset if self._cached_list: return self._cached_list existing, res = [], [] # only get one version of each couple for item in q.all(): tpl = [getattr(item, self.from_key).pk, getattr(item, self.to_key).pk] if tpl not in existing: res.append(item) existing.append(list(reversed(tpl))) self._cached_list = res return res class MergeForm(forms.ModelForm): id = forms.IntegerField(label="", widget=forms.HiddenInput, required=False) a_is_duplicate_b = forms.BooleanField(required=False) b_is_duplicate_a = forms.BooleanField(required=False) not_duplicate = forms.BooleanField(required=False) def clean(self): checked = [ True for k in ["a_is_duplicate_b", "b_is_duplicate_a", "not_duplicate"] if self.cleaned_data.get(k) ] if len(checked) > 1: raise forms.ValidationError(_("Only one choice can be checked.")) return self.cleaned_data def merge(self, *args, **kwargs): try: to_item = getattr(self.instance, self.TO_KEY) from_item = getattr(self.instance, self.FROM_KEY) except ObjectDoesNotExist: return callback = kwargs.get("callback", None) if callback: request = kwargs.get("request") queryset = models.Person.objects.filter(pk__in=[to_item.pk, from_item.pk]) callback("merge_person", request, "", queryset) if self.cleaned_data.get("a_is_duplicate_b"): to_item.merge(from_item) elif self.cleaned_data.get("b_is_duplicate_a"): from_item.merge(to_item) elif self.cleaned_data.get("not_duplicate"): from_item.merge_exclusion.add(to_item) else: return try: self.instance.__class__.objects.get( **{self.TO_KEY: from_item, self.FROM_KEY: to_item} ).delete() except ObjectDoesNotExist: pass self.instance.delete() class MergePersonForm(MergeForm): class Meta: model = models.Person fields = [] FROM_KEY = "from_person" TO_KEY = "to_person" class MergeOrganizationForm(MergeForm): class Meta: model = models.Organization fields = [] FROM_KEY = "from_organization" TO_KEY = "to_organization" def get_image_help(help_for_doc=False): if help_for_doc: return max_size_help if not settings.IMAGE_MAX_SIZE: return max_size_help() return ( str( _( "Heavy images are resized to: %(width)dx%(height)d " "(ratio is preserved)." ) % { "width": settings.IMAGE_MAX_SIZE[0], "height": settings.IMAGE_MAX_SIZE[1], } ) + " " + str(max_size_help()) ) ####################### # Document management # ####################### class AddGenericForm(ManageOldType, NewItemForm): model = None # to be defined label = forms.CharField(label=_("Label"), max_length=200) def clean_label(self): value = self.cleaned_data.get("label", None).strip() if self.model.objects.filter(label=value).count(): raise forms.ValidationError(_("This value already exist")) return value def save(self, user): label = self.cleaned_data["label"] base_slug = slugify(label) slug = base_slug idx = 0 while self.model.objects.filter(txt_idx=slug).count(): idx += 1 slug = base_slug + "-" + str(idx) return self.model.objects.create(label=label, txt_idx=slug) class AddDocumentTagForm(AddGenericForm): model = models.DocumentTag form_label = _("Document tag") class DocumentForm(forms.ModelForm, CustomForm, ManageOldType): form_label = _("Documentation") form_admin_name = _("Document - 010 - General") form_slug = "document-general" file_upload = True extra_form_modals = ["author", "person", "organization", "documenttag", "container"] associated_models = { "source_type": models.SourceType, "support_type": models.SupportType, "publisher": models.Organization, "rights_owner": models.Organization, "format_type": models.Format, "shooting_angle": models.ShootingAngle, } pk = forms.IntegerField(label="", required=False, widget=forms.HiddenInput) title = forms.CharField( label=_("Title"), required=False, validators=[validators.MaxLengthValidator(200)], ) source_type = widgets.ModelChoiceField( model=models.SourceType, label=_("Type"), choices=[], required=False ) support_type = widgets.ModelChoiceField( model=models.SupportType, label=_("Medium"), choices=[], required=False ) format_type = widgets.ModelChoiceField( model=models.Format, label=_("Format"), choices=[], required=False ) scale = forms.CharField(label=_("Scale"), max_length=30, required=False) shooting_angle = widgets.ModelChoiceField( model=models.ShootingAngle, label=_("Shooting angle"), choices=[], required=False ) container_id = forms.IntegerField( label=_("Current container"), widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-container"), associated_model=Container, new=True ), validators=[models.valid_id(Container)], required=False, ) container_ref_id = forms.IntegerField( label=_("Reference container"), widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-container"), associated_model=Container, new=True ), validators=[models.valid_id(Container)], required=False, ) authors = widgets.Select2MultipleField( label=_("Authors"), required=False, model=models.Author, remote="autocomplete-author", ) publisher = forms.IntegerField( label=_("Publisher"), widget=widgets.JQueryAutoComplete( reverse_lazy( "autocomplete-organization", args=[ models.organization_type_pks_lazy( settings.ISHTAR_SLUGS["document-publisher"] ) ], ), limit={ "organization_type": [ models.organization_type_pks_lazy( settings.ISHTAR_SLUGS["document-publisher"] ) ] }, tips=models.get_publisher_label, associated_model=models.Organization, ), validators=[models.valid_id(models.Organization)], required=False, ) publishing_year = forms.IntegerField( label=_("Year of publication"), validators=[MinValueValidator(1000), max_value_current_year], required=False, ) licenses = widgets.Select2MultipleField( label=_("Rights of use / licenses"), required=False, model=models.LicenseType ) rights_owner = forms.IntegerField( label=_("Rights owner"), widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-organization"), associated_model=models.Organization, new=True, ), validators=[models.valid_id(models.Organization)], required=False, ) copyright = forms.CharField( label=_("Copyright"), widget=forms.Textarea, required=False ) tags = widgets.Select2MultipleField( label=_("Tags"), required=False, model=models.DocumentTag, remote="autocomplete-documenttag", ) language = widgets.ModelChoiceField( model=models.Language, label=_("Language"), choices=[], required=False ) issn = forms.CharField( label=_("ISSN"), widget=widgets.ISSNWidget, validators=[validators.MaxLengthValidator(9)], required=False, ) isbn = forms.CharField( label=_("ISBN"), widget=widgets.ISBNWidget, validators=[validators.MaxLengthValidator(17)], required=False, ) source = widgets.ModelJQueryAutocompleteField( label=_("Source"), model=models.Document, required=False ) source_free_input = forms.CharField( label=_("Source - free input"), validators=[validators.MaxLengthValidator(500)], required=False, ) source_page_range = forms.CharField( label=_("Source - page range"), validators=[validators.MaxLengthValidator(500)], required=False, help_text=_( 'Unique page: "242", page range: "242-245", multiple ' 'pages: "242;245;249", multiples pages and multiple ' 'pages ranges: "242-245;249;262-265".' ), ) associated_url = forms.URLField( max_length=1000, required=False, label=_("Numerical ressource (web address)") ) image = forms.ImageField( label=_("Image"), help_text=mark_safe(get_image_help()), max_length=255, required=False, widget=widgets.ImageFileInput(), validators=[file_size_validator], ) associated_file = forms.FileField( label=pgettext("Not directory", "File"), max_length=255, required=False, help_text=max_size_help, validators=[file_size_validator], ) reference = forms.CharField( label=_("Reference"), validators=[validators.MaxLengthValidator(100)], required=False, ) internal_reference = forms.CharField( label=_("Internal reference"), validators=[validators.MaxLengthValidator(100)], required=False, ) receipt_date = DateField(label=_("Receipt date"), required=False) creation_date = DateField(label=_("Creation date"), required=False) receipt_date_in_documentation = DateField(label=_("Receipt date in documentation"), required=False) comment = forms.CharField(label=_("Comment"), widget=forms.Textarea, required=False) description = forms.CharField( label=_("Description"), widget=forms.Textarea, required=False ) additional_information = forms.CharField( label=_("Additional information"), widget=forms.Textarea, required=False ) duplicate = forms.NullBooleanField(label=_("Has a duplicate"), required=False) TYPES = [ FieldType("source_type", models.SourceType), FieldType("support_type", models.SupportType), FieldType("shooting_angle", models.ShootingAngle), FieldType("format_type", models.Format), FieldType("language", models.Language), FieldType("licences", models.LicenseType, is_multiple=True), FieldType("tags", models.DocumentTag, is_multiple=True), ] class Meta: model = models.Document fields = [ "title", "source_type", "reference", "internal_reference", "format_type", "support_type", "scale", "shooting_angle", "image", "associated_file", "associated_url", "tags", "authors", "receipt_date", "receipt_date_in_documentation", "creation_date", "publisher", "publishing_year", "isbn", "issn", "licenses", "rights_owner", "copyright", "language", "source", "source_free_input", "source_page_range", "container_id", "container_ref_id", "comment", "description", "additional_information", "duplicate", ] HEADERS = { "finds": FormHeader(_("Related items")), "title": FormHeader(_("Identification")), "format_type": FormHeader(_("Format")), "image": FormHeader(_("Content")), "authors": FormHeader(_("Authors")), "receipt_date": FormHeader(_("Dates")), "publisher": FormHeader(_("Publishing"), collapse=True), "source": FormHeader(_("Source"), collapse=True), "container_id": FormHeader(_("Warehouse"), collapse=True), "comment": FormHeader(_("Advanced"), collapse=True), } OPTIONS_PERMISSIONS = [ # field name, permission, options ("tags", ("ishtar_common.add_documenttag",), {"new": True}), ("authors", ("ishtar_common.add_author",), {"new": True}), ("rights_owner", ("ishtar_common.add_organization",), {"new": True}), ("publisher", ("ishtar_common.add_organization",), {"new": True}), ("container", ("archaeological_warehouse.add_container",), {"new": True}), ("container_ref", ("archaeological_warehouse.add_container",), {"new": True}), ] def __init__(self, *args, **kwargs): main_items_fields = {} if "main_items_fields" in kwargs: main_items_fields = kwargs.pop("main_items_fields") self.user = None if kwargs.get("user", None): self.user = kwargs.pop("user") self.is_instancied = bool(kwargs.get("instance", False)) super().__init__(*args, **kwargs) fields = OrderedDict() for related_key in models.Document.RELATED_MODELS_ALT: model = models.Document._meta.get_field(related_key).related_model fields[related_key] = widgets.Select2MultipleField( model=model, remote=True, label=model.get_label_for_model_plural(), required=False, style="width: 100%", ) if related_key in main_items_fields: for field_key, label in main_items_fields[related_key]: disabled = False if kwargs.get("initial", None) and kwargs["initial"].get( field_key, False ): disabled = True fields[field_key] = forms.BooleanField( label=label, required=False, disabled=disabled ) for k in self.fields: fields[k] = self.fields[k] self.fields = fields def clean_source_page_range(self): value = self.cleaned_data.get("source_page_range", None).replace(" ", "") if value and not re.match(r"^(\d+[-;]*\d*)+$", value): raise forms.ValidationError(_("Incorrect page range.")) return value def get_headers(self): headers = self.HEADERS.copy() if self.is_instancied: headers["finds"] = FormHeader(headers["finds"].label, collapse=True) return headers def get_conditional_filter_fields(self): """ Helper to get values for filtering select widget on select by another widget :return: conditional_fields, excluded_fields, all_values * conditional_fields: {input_key: { input_pk: [(output_key1, "pk1,pk2,..."), (output_key2, "pk1,pk2,...")], input_pk2: ... }, input_key2: {...} } * excluded_fields: {output_key: "pk1,pk2,...", ...} # list pk for all output_key in condition_fields * all_values: {output_key: [(pk1, label1), (pk2, label2), ...]} """ conditional_fields = {} excluded_fields = {} key = "source_type" for doc_type in models.SourceType.objects.filter( available=True, formats__pk__isnull=False ).all(): if key not in conditional_fields: conditional_fields[key] = {} sub_key = doc_type.pk if sub_key in conditional_fields[key]: continue lst = [ str(f.pk) for f in models.Format.objects.filter( available=True, document_types__pk=doc_type.pk ) ] if "format_type" not in excluded_fields: excluded_fields["format_type"] = [] for k in lst: if k not in excluded_fields["format_type"]: excluded_fields["format_type"].append(k) conditional_fields[key][sub_key] = [("format_type", ",".join(lst))] for doc_type in models.SourceType.objects.filter( available=True, supports__pk__isnull=False ).all(): if key not in conditional_fields: conditional_fields[key] = {} lst = [ str(f.pk) for f in models.SupportType.objects.filter( available=True, document_types__pk=doc_type.pk ) ] if "support_type" not in excluded_fields: excluded_fields["support_type"] = [] for k in lst: if k not in excluded_fields["support_type"]: excluded_fields["support_type"].append(k) sub_key = doc_type.pk if sub_key not in conditional_fields[key]: conditional_fields[key][sub_key] = [] conditional_fields[key][sub_key].append(("support_type", ",".join(lst))) for k in excluded_fields: excluded_fields[k] = ",".join(excluded_fields[k]) all_values = { "format_type": [list(tp) for tp in models.Format.get_types()], "support_type": [list(tp) for tp in models.SupportType.get_types()], } return conditional_fields, excluded_fields, all_values def clean(self): cleaned_data = self.cleaned_data if ( not cleaned_data.get("title", None) and not cleaned_data.get("image", None) and not cleaned_data.get("associated_file", None) and not cleaned_data.get("associated_url", None) ): raise forms.ValidationError( _( "You should at least fill one of this field: title, url, " "image or file. If you have provided an image check that " "it is not corrupted." ) ) return cleaned_data def clean_publisher(self): if not self.cleaned_data.get("publisher", None): return try: return models.Organization.objects.get(pk=self.cleaned_data["publisher"]) except models.Organization.DoesNotExist: return def clean_rights_owner(self): if not self.cleaned_data.get("rights_owner", None): return try: return models.Organization.objects.get(pk=self.cleaned_data["rights_owner"]) except models.Organization.DoesNotExist: return def save(self, commit=True): if not self.cleaned_data.get("authors", None): self.cleaned_data["authors"] = [] item = super(DocumentForm, self).save(commit=commit) for related_key in models.Document.RELATED_MODELS: related = getattr(item, related_key) initial = dict([(rel.pk, rel) for rel in related.all()]) new = [int(pk) for pk in sorted(self.cleaned_data.get(related_key, []))] for pk, value in initial.items(): if pk in new: continue related.remove(value) for new_pk in new: related_item = related.model.objects.get(pk=new_pk) if new_pk not in initial.keys(): related.add(related_item) key = "{}_{}_main_image".format(related_key, related_item.pk) if self.cleaned_data.get(key, []) and related_item.main_image != item: related_item.skip_history_when_saving = True related_item.main_image = item related_item.save() item = models.Document.objects.get(pk=item.pk) if self.user and not item.history_creator: item.history_creator = self.user item.history_modifier = self.user item.skip_history_when_saving = True item.save() # resave to regen the attached items return item class DocumentSelect(HistorySelect): _model = models.Document form_admin_name = _("Document - 001 - Search") form_slug = "document-001-search" search_vector = forms.CharField( label=_("Full text search"), widget=widgets.SearchWidget("ishtar-common", "document"), ) authors = forms.IntegerField( widget=widgets.JQueryAutoComplete( "/" + settings.URL_PATH + "autocomplete-author", associated_model=models.Author, ), validators=[models.valid_id(models.Author)], label=_("Author"), required=False, ) title = forms.CharField(label=_("Title")) source_type = forms.ChoiceField(label=_("Type"), choices=[]) reference = forms.CharField(label=_("Reference")) internal_reference = forms.CharField(label=_("Internal reference")) description = forms.CharField(label=_("Description")) format = forms.ChoiceField(label=_("Format"), choices=[]) support = forms.ChoiceField(label=_("Medium"), choices=[]) scale = forms.CharField(label=_("Scale")) shooting_angle = forms.ChoiceField(label=_("Shooting angle"), choices=[]) associated_url = forms.CharField(label=_("Web address")) tag = forms.ChoiceField(label=_("Tag"), choices=[]) publisher = forms.IntegerField( label=_("Publisher"), widget=widgets.JQueryAutoComplete( reverse_lazy( "autocomplete-organization", args=[ models.organization_type_pks_lazy( settings.ISHTAR_SLUGS["document-publisher"] ) ], ), limit={ "organization_type": [ models.organization_type_pks_lazy( settings.ISHTAR_SLUGS["document-publisher"] ) ] }, tips=models.get_publisher_label, associated_model=models.Organization, ), validators=[models.valid_id(models.Organization)], ) publishing_year = forms.IntegerField(label=_("Year of publication")) language = forms.ChoiceField(label=_("Language"), choices=[]) isbn = forms.CharField(label=_("ISBN")) issn = forms.CharField(label=_("ISSN")) licenses = forms.ChoiceField(label=_("Rights of use / licenses"), choices=[]) rights_owner = forms.IntegerField( label=_("Rights owner"), widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-organization"), associated_model=models.Organization, ), validators=[models.valid_id(models.Organization)], ) copyright = forms.CharField(label=_("Copyright")) comment = forms.CharField(label=_("Comment")) additional_information = forms.CharField(label=_("Additional informations")) duplicate = forms.NullBooleanField(label=_("Has a duplicate")) associated_file__isnull = forms.NullBooleanField(label=_("Has a file?")) image__isnull = forms.NullBooleanField(label=_("Has an image?")) source = forms.IntegerField( label=_("Source"), widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-document"), associated_model=models.Document ), validators=[models.valid_id(models.Document)], ) source_free_input = forms.CharField(label=_("Source - free input")) warehouse_container = forms.IntegerField( label=_("Warehouse - Container"), required=False, widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-container"), associated_model=Container ), validators=[models.valid_id(Container)], ) warehouse_container_ref = forms.IntegerField( label=_("Warehouse - Reference container"), required=False, widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-container"), associated_model=Container ), validators=[models.valid_id(Container)], ) operation = forms.IntegerField( label=_("Operation"), required=False, widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-operation"), associated_model=Operation ), validators=[models.valid_id(Operation)], ) operations__operation_type = forms.ChoiceField( label=_("Operation - type"), choices=[] ) operations__year = forms.IntegerField(label=_("Operation - year")) context_record = forms.IntegerField( label=_("Context record"), required=False, widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-contextrecord"), associated_model=ContextRecord ), validators=[models.valid_id(ContextRecord)], ) find_basket = forms.IntegerField( label=_("Basket - Finds"), widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-findbasket"), associated_model=FindBasket ), validators=[models.valid_id(FindBasket)], required=False, ) find = forms.IntegerField( label=_("Find"), required=False, widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-find"), associated_model=Find ), validators=[models.valid_id(Find)], ) find__denomination = forms.CharField(label=_("Find - denomination"), required=False) containers = forms.IntegerField( label=_("Container"), required=False, widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-container"), associated_model=Container ), validators=[models.valid_id(Container)], ) town = get_town_field() area = widgets.Select2SimpleField(label=_("Area")) receipt_date = DateField(label=_("Receipt date")) creation_date = DateField(label=_("Creation date")) receipt_date_in_documentation = DateField(label=_("Receipt date")) TYPES = [ FieldType("source_type", models.SourceType), FieldType("format", models.Format), FieldType("support", models.SupportType), FieldType("tag", models.DocumentTag), FieldType("language", models.Language), FieldType("licenses", models.LicenseType), FieldType("operations__operation_type", models.OperationType), FieldType("area", models.Area), FieldType("shooting_angle", models.ShootingAngle), ] PROFILE_FILTER = { "context_record": ["context_record"], "find": ["find"], "warehouse": ["container"], } class DocumentFormSelection(LockForm, CustomFormSearch): SEARCH_AND_SELECT = True form_label = _("Document search") associated_models = {"pk": models.Document} currents = {"pk": models.Document} pk = forms.IntegerField( label="", required=False, widget=widgets.DataTable( reverse_lazy("get-document"), DocumentSelect, models.Document, gallery=True ), validators=[models.valid_id(models.Document)], ) class DocumentFormMultiSelection(LockForm, MultiSearchForm): form_label = _("Document search") associated_models = {"pks": models.Document} pk_key = "pks" pk = forms.CharField( label="", required=False, widget=widgets.DataTable( reverse_lazy("get-document"), DocumentSelect, models.Document, multiple_select=True, gallery=True, ), validators=[models.valid_ids(models.Document)], ) class QADocumentFormMulti(QAForm): form_admin_name = _("Document - Quick action - Modify") form_slug = "document-quickaction-modify" base_models = ["qa_source_type", "qa_rights_owner", "qa_shooting_angle"] associated_models = { "qa_source_type": models.SourceType, "qa_authors": models.Author, "qa_tags": models.DocumentTag, "qa_rights_owner": models.Organization, "qa_licenses": models.LicenseType, "qa_shooting_angle": models.ShootingAngle, } MULTI = True REPLACE_FIELDS = [ "qa_source_type", "qa_creation_date", "qa_rights_owner", "qa_licenses", "qa_copyright", "qa_shooting_angle", ] qa_source_type = forms.ChoiceField(label=_("Source type"), required=False) qa_authors = widgets.ModelJQueryAutocompleteField( model=models.Author, label=_("Author"), new=True, required=False ) qa_creation_date = DateField(label=_("Creation date"), required=False) qa_format_type = forms.ChoiceField(label=_("Format"), choices=[], required=False) qa_support_type = forms.ChoiceField(label=_("Medium"), choices=[], required=False) qa_scale = forms.CharField(label=_("Scale"), max_length=30, required=False) qa_tags = forms.ChoiceField(label=_("Tags"), choices=[], required=False) qa_licenses = forms.ChoiceField(label=_("Rights of use / licenses"), choices=[], required=False) qa_rights_owner = widgets.ModelJQueryAutocompleteField( model=models.Organization, label=_("Rights owner"), new=True, required=False ) qa_copyright = forms.CharField(label=_("Copyright"), required=False) qa_shooting_angle = forms.ChoiceField(label=_("Shooting angle"), choices=[], required=False) TYPES = [ FieldType("qa_source_type", models.SourceType), FieldType("qa_format_type", models.Format), FieldType("qa_support_type", models.SupportType), FieldType("qa_licenses", models.LicenseType), FieldType("qa_tags", models.DocumentTag), FieldType("qa_shooting_angle", models.ShootingAngle), ] def _get_qa_authors(self, value): try: value = models.Author.objects.get(pk=value).cached_label except models.Author.DoesNotExist: return "" return value def _get_qa_rights_owner(self, value): try: value = models.Organization.objects.get(pk=value).cached_label except models.Organization.DoesNotExist: return "" return value class QADocumentDuplicateForm(IshtarForm): qa_title = forms.CharField(label=_("Title"), max_length=500, required=False) qa_reference = forms.CharField(label=_("Reference"), max_length=500, required=False) qa_source_type = forms.ChoiceField(label=_("Type"), choices=[], required=False) open_edit = forms.BooleanField(label=_("Edit duplicated item"), required=False) open_edit.widget.NO_FORM_CONTROL = True TYPES = [ FieldType("qa_source_type", models.SourceType), ] def __init__(self, *args, **kwargs): self.user = None if "user" in kwargs: self.user = kwargs.pop("user") if hasattr(self.user, "ishtaruser"): self.user = self.user.ishtaruser self.document = kwargs.pop("items")[0] super(QADocumentDuplicateForm, self).__init__(*args, **kwargs) self.fields["qa_title"].initial = self.document.title + str(_(" - duplicate")) if self.document.reference: self.fields["qa_reference"].initial = self.document.reference if self.document.source_type: self.fields["qa_source_type"].initial = self.document.source_type.pk for related_key in models.Document.RELATED_MODELS_ALT: related = getattr(self.document, related_key) if not related.count(): continue model = models.Document._meta.get_field(related_key).related_model initial = [item.pk for item in related.all()] self.fields["qa_" + related_key] = widgets.Select2MultipleField( model=model, remote=True, label=model._meta.verbose_name_plural, required=False, long_widget=True, initial=initial, ) def save(self): data = {"index": None} for k in ["title", "reference"]: data[k] = self.cleaned_data.get("qa_" + k, None) if self.cleaned_data.get("qa_source_type", None): try: data["source_type"] = models.SourceType.objects.get( pk=int(self.cleaned_data["qa_source_type"]), available=True ) except models.SourceType.DoesNotExist: return new = self.document.duplicate_item(self.user, data=data) for related_key in models.Document.RELATED_MODELS_ALT: getattr(new, related_key).clear() values = self.cleaned_data.get("qa_" + related_key, []) model = models.Document._meta.get_field(related_key).related_model for value in values: getattr(new, related_key).add(model.objects.get(pk=value)) new.skip_history_when_saving = True new._cached_label_checked = False new._search_updated = False new._no_move = True new.save() # regen of labels return new class QADocumentPackagingForm(IshtarForm): container = forms.IntegerField( label=_("Container"), widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-container"), associated_model=Container, new=True ), validators=[models.valid_id(Container)], ) container_to_change = forms.ChoiceField( label=_("Change "), required=True, choices=( ("current-and-reference", _("current and reference containers")), ("reference", _("the reference container")), ("current", _("the current container")), ), ) def __init__(self, *args, **kwargs): self.confirm = False self.user = None if "user" in kwargs: self.user = kwargs.pop("user") if hasattr(self.user, "ishtaruser"): self.user = self.user.ishtaruser self.items = kwargs.pop("items") super(QADocumentPackagingForm, self).__init__(*args, **kwargs) def save(self, items, user): container_id = Container.objects.get(pk=self.cleaned_data["container"]).pk container_to_change = self.cleaned_data.get("container_to_change", "") container_attrs = [] if container_to_change in ("reference", "current-and-reference"): container_attrs.append("container_ref_id") if container_to_change in ("current", "current-and-reference"): container_attrs.append("container_id") for document in items: changed = False for container_attr in container_attrs: if getattr(document, container_attr) == container_id: continue setattr(document, container_attr, container_id) changed = True if changed: document.history_modifier = user document.save() class QALockForm(forms.Form): action = forms.ChoiceField( label=_("Action"), choices=(("lock", _("Lock")), ("unlock", _("Unlock"))) ) def __init__(self, *args, **kwargs): self.items = kwargs.pop("items") super(QALockForm, self).__init__(*args, **kwargs) def save(self, items, user): locked = self.cleaned_data["action"] == "lock" for item in items: item.locked = locked item.lock_user = user if locked else None item.skip_history_when_saving = True item.save() class QALinkForm(forms.Form): action = forms.ChoiceField( label=_("Action"), choices=(("link", _("Link items")), ("unlink", _("Unlink items"))) ) account = forms.IntegerField( widget=widgets.JQueryAutoComplete( reverse_lazy( 'autocomplete-user', ), associated_model=User), label=_("User")) def __init__(self, *args, **kwargs): self.items = kwargs.pop("items") super().__init__(*args, **kwargs) def save(self, items, user): try: ishtar_user = models.IshtarUser.objects.get(pk=self.cleaned_data["account"]) except models.IshtarUser.DoesNotExist: return if self.cleaned_data["action"] == "link": for item in items: item.ishtar_users.add(ishtar_user) else: for item in items: item.ishtar_users.remove(ishtar_user) class SourceDeletionForm(FinalForm): confirm_msg = " " confirm_end_msg = _("Would you like to delete this documentation?") ###################### # Authors management # ###################### class AuthorForm(ManageOldType, NewItemForm): form_label = _("Author") associated_models = {"person": models.Person, "author_type": models.AuthorType} NO_CUSTOM_FORM = True person = forms.IntegerField( widget=widgets.JQueryAutoComplete( "/" + settings.URL_PATH + "autocomplete-person", associated_model=models.Person, new=True, ), validators=[models.valid_id(models.Person)], label=_("Person"), ) author_type = forms.ChoiceField(label=_("Author type"), choices=[]) def __init__(self, *args, **kwargs): super(AuthorForm, self).__init__(*args, **kwargs) self.fields["author_type"].choices = models.AuthorType.get_types( initial=self.init_data.get("author_type") ) self.limit_fields() def clean(self): person_id = self.cleaned_data.get("person", None) author_type_id = self.cleaned_data.get("author_type", None) if not person_id or not author_type_id: return self.cleaned_data if models.Author.objects.filter( author_type_id=author_type_id, person_id=person_id ).count(): raise forms.ValidationError(_("This author already exist.")) def save(self, user): dct = self.cleaned_data dct["author_type"] = models.AuthorType.objects.get(pk=dct["author_type"]) dct["person"] = models.Person.objects.get(pk=dct["person"]) new_item = models.Author(**dct) new_item.save() return new_item class AuthorFormSelection(CustomForm, forms.Form): form_label = _("Author selection") NO_CUSTOM_FORM = True base_model = "author" associated_models = {"author": models.Author} author = forms.IntegerField( required=False, widget=widgets.JQueryAutoComplete( "/" + settings.URL_PATH + "autocomplete-author", associated_model=models.Author, new=True, ), validators=[models.valid_id(models.Author)], label=_("Author"), ) class AuthorFormSet(FormSet): def clean(self): """Checks that no author are duplicated.""" return self.check_duplicate(("author",), _("There are identical authors.")) AuthorFormset = formset_factory( AuthorFormSelection, can_delete=True, formset=AuthorFormSet ) AuthorFormset.form_label = _("Authors") AuthorFormset.form_admin_name = _("Authors") AuthorFormset.form_slug = "authors" AuthorFormset.NO_CUSTOM_FORM = True class SearchQueryForm(forms.Form): query = forms.CharField( max_length=None, label=_("Query"), initial="*", widget=forms.HiddenInput ) search_query = forms.ChoiceField(label="", required=False, choices=[]) label = forms.CharField(label="", max_length=None, required=False) is_alert = forms.BooleanField(label=_("Is an alert"), required=False) create_or_update = forms.ChoiceField( choices=(("create", _("Create")), ("update", _("Update"))), initial="create" ) def __init__(self, profile, content_type, *args, **kwargs): self.profile = profile self.content_type = content_type super(SearchQueryForm, self).__init__(*args, **kwargs) self.fields["search_query"].choices = [ (c.pk, c.label) for c in models.SearchQuery.objects.filter( content_type=content_type, profile=profile ).all() ] if not self.fields["search_query"].choices: self.fields.pop("search_query") def clean(self): data = self.cleaned_data if data["create_or_update"] == "create" and not data["label"]: raise forms.ValidationError( _("A label is required for a new " "search query.") ) elif data["create_or_update"] == "update": if not data["search_query"]: raise forms.ValidationError(_("Select the search query to " "update")) q = models.SearchQuery.objects.filter( profile=self.profile, content_type=self.content_type, pk=data["search_query"], ) if not q.count(): raise forms.ValidationError(_("Query does not exist.")) return data def save(self): data = self.cleaned_data if data["create_or_update"] == "create": sq = models.SearchQuery.objects.create( label=data["label"], query=data["query"], profile=self.profile, content_type=self.content_type, is_alert=data["is_alert"], ) else: try: sq = models.SearchQuery.objects.get( profile=self.profile, content_type=self.content_type, pk=data["search_query"], ) except models.SearchQuery.DoesNotExist: raise forms.ValidationError(_("Query does not exist.")) sq.query = data["query"] sq.save() return sq class QRSearchForm(forms.Form): query = forms.CharField(max_length=None, label=_("Query"), initial="*") current_url = forms.CharField(max_length=None, label="", widget=forms.HiddenInput()) def save(self): data = self.cleaned_data url = data["current_url"] parsed_url = urlparse(url) base_url = "{}://{}".format(parsed_url.scheme, parsed_url.netloc) url = base_url + parsed_url.path url += "?stored_search=" + quote(data["query"]) tiny_url = models.TinyUrl.objects.create(link=url) short_url = base_url + reverse("tiny-redirect", args=[tiny_url.get_short_id()]) qr = pyqrcode.create(short_url, version=settings.ISHTAR_QRCODE_VERSION) tmpdir = tempfile.mkdtemp("-qrcode") date = datetime.datetime.today().isoformat().replace(":", "-").replace(".", "") base_filename = "{}-qrcode.png".format(date) filename = os.path.join(tmpdir, base_filename) qr.png(filename, scale=settings.ISHTAR_QRCODE_SCALE) dest_dir = os.path.join(settings.MEDIA_ROOT, "tmp") if not os.path.exists(dest_dir): os.makedirs(dest_dir) shutil.move(filename, dest_dir) return os.path.join(settings.MEDIA_URL, "tmp", base_filename) class GISForm(forms.ModelForm, CustomForm, ManageOldType): form_label = _("Geo item") form_admin_name = _("Geo item - 010 - General") form_slug = "geoitem-general" extra_form_modals = [] associated_models = { "data_type": models.GeoDataType, "origin": models.GeoOriginType, "provider": models.GeoProviderType, "buffer_type": models.GeoBufferType, } pk = forms.IntegerField(label="", required=False, widget=forms.HiddenInput) name = forms.CharField( label=_("Name"), validators=[validators.MaxLengthValidator(500)], ) import_key = forms.CharField( label=_("Import key"), required=False, disabled=True, help_text=_( "An update via import corresponding to the source element and " "this key will overwrite the data." ), ) source_content_type_id = forms.IntegerField( label="", required=True, widget=forms.HiddenInput, disabled=True ) source_id = forms.IntegerField( label="", required=True, widget=forms.HiddenInput, disabled=True ) data_type = widgets.ModelChoiceField( model=models.GeoDataType, label=_("Data type"), choices=[], required=False ) origin = widgets.ModelChoiceField( model=models.GeoOriginType, label=_("Origin"), choices=[], required=False ) provider = widgets.ModelChoiceField( model=models.GeoProviderType, label=_("Provider"), choices=[], required=False ) acquisition_date = DateField(label=_("Acquisition date"), required=False) buffer = forms.FloatField( label=_("Buffer"), required=False, widget=widgets.MeterCentimeterWidget, ) buffer_type = widgets.ModelChoiceField( model=models.GeoBufferType, label=_("Buffer type"), choices=[], required=False ) comment = forms.CharField(label=_("Comment"), widget=forms.Textarea, required=False) estimated_error_x = forms.FloatField( label=_("Estimated error for X"), required=False, widget=widgets.MeterCentimeterWidget, ) estimated_error_y = forms.FloatField( label=_("Estimated error for Y"), required=False, widget=widgets.MeterCentimeterWidget, ) estimated_error_z = forms.FloatField( label=_("Estimated error for Z"), required=False, widget=widgets.MeterCentimeterWidget, ) TYPES = [ FieldType("origin", models.GeoOriginType), FieldType("data_type", models.GeoDataType), FieldType("provider", models.GeoProviderType), FieldType("buffer_type", models.GeoBufferType), ] class Meta: model = models.GeoVectorData exclude = [ "need_update", "imports", "cached_x", "cached_y", "cached_z", "source_content_type", "timestamp_label", "timestamp_geo", "imports_updated", ] HEADERS = { "related_items_ishtar_common_town": FormHeader( _("Related items"), collapse=True ), "name": FormHeader(_("Metadata")), "geo_field": FormHeader(_("Geography")), "buffer": FormHeader(_("Buffer"), collapse=True), } GEO_FIELDS = ( ("point_2d",), ("point_3d",), ("multi_points",), ("multi_line",), ("multi_polygon",), ("x", "z"), ) GEOM_TYPES = { "point_2d": "Point", "point_3d": "Point", "multi_points": "MultiPoint", "multi_line": "MultiLineString", "multi_polygon": "MultiPolygon", } def __init__(self, *args, **kwargs): back_url = "" if "back_url" in kwargs: back_url = kwargs.pop("back_url") find_id = "" if "find_id" in kwargs: find_id = kwargs.pop("find_id") main_items_fields = {} if "main_items_fields" in kwargs: main_items_fields = kwargs.pop("main_items_fields") self.too_many = {} if "too_many" in kwargs: self.too_many = kwargs.pop("too_many") self.user = None self.geom_type = ( kwargs.pop("geom_type") if kwargs.get("geom_type", None) else None ) if kwargs.get("user", None): self.user = kwargs.pop("user") instance = kwargs.get("instance", False) self.is_instancied = bool(instance) self.source_content_type = kwargs.pop("source_content_type", None) self.source_id = kwargs.pop("source_id", None) self.default_center = kwargs.pop("default_center") if "default_center" in kwargs else None super().__init__(*args, **kwargs) if back_url: self.fields["back_url"] = forms.CharField( label="", required=False, widget=forms.HiddenInput, initial=back_url ) if find_id: self.fields["find_id"] = forms.CharField( label="", required=False, widget=forms.HiddenInput, initial=find_id ) if not self.fields["import_key"].initial: self.fields.pop("import_key") if not self.source_content_type: self.fields.pop("source_content_type_id") self.fields.pop("source_id") else: self.fields["source_content_type_id"].initial = self.source_content_type self.fields["source_id"].initial = self.source_id fields = OrderedDict() for related_key in models.GeoVectorData.RELATED_MODELS: model = models.GeoVectorData._meta.get_field(related_key).related_model fields[related_key] = widgets.Select2MultipleField( model=model, remote=True, label=model.get_label_for_model_plural(), required=False, style="width: 100%", ) for related_key in models.GeoVectorData.RELATED_MODELS: if related_key in main_items_fields: for field_key, label in main_items_fields[related_key]: disabled = False if kwargs.get("initial", None) and kwargs["initial"].get( field_key, False ): disabled = True fields[field_key] = forms.BooleanField( label=label, required=False, disabled=disabled ) self.geo_keys = self.get_geo_keys(instance) for k in self.geo_keys: fields[k] = self.fields[k] for k in self.fields: if k not in self.geo_keys: fields[k] = self.fields[k] self.fields = fields def get_geo_keys(self, instance): if instance: return self._get_instance_geo_keys(instance) else: return self._get_base_geo_keys() def _get_base_geo_keys(self): geo_keys = [] extra_geo = ["buffer", "buffer_type"] if self.geom_type == "coordinates": geo_keys = [ "x", "estimated_error_x", "y", "estimated_error_y", "z", "estimated_error_z", "spatial_reference_system", ] base_widget_attrs = {} if self.default_center: base_widget_attrs.update({ "default_lon": self.default_center[0], "default_lat": self.default_center[1], }) else: profile = models.IshtarSiteProfile.get_current_profile() if profile.default_center: # TODO: reverse... base_widget_attrs.update({ "default_lon": profile.default_center.y, "default_lat": profile.default_center.x, }) for keys in self.GEO_FIELDS: if any(key == self.geom_type for key in keys): map_srid = 4326 widget = widgets.OSMWidget widget_attrs = { "map_srid": map_srid, "cols": True, "geom_type": self.GEOM_TYPES[self.geom_type] } widget_attrs.update(base_widget_attrs) self.fields[keys[0]].widget = widget( attrs=widget_attrs) self.fields.pop("spatial_reference_system") geo_keys = keys[:] self.fields.pop("x") self.fields.pop("y") if self.geom_type != "point_3d": self.fields.pop("z") self.fields.pop("estimated_error_x") self.fields.pop("estimated_error_y") self.fields.pop("estimated_error_z") break for geo_fields in self.GEO_FIELDS[:-1]: # -1 -> do not get x - already managed if geo_fields != geo_keys: for geo_field in geo_fields: self.fields.pop(geo_field) if self.geom_type == "point_3d": geo_keys = list(geo_keys) + ["z"] return list(geo_keys) + extra_geo def _get_instance_geo_keys(self, instance): # geo keys for an instanced item geo_keys = [] for keys in self.GEO_FIELDS: if any(key for key in keys if getattr(instance, key) is not None): if keys[0] != "x": geom = getattr(instance, keys[0]) map_srid = geom.srid or 4326 widget = widgets.OSMWidget if map_srid == 4326: widget = widgets.ReversedOSMWidget self.fields[keys[0]].widget = widget( attrs={ "map_srid": map_srid, "geom_type": geom.geom_type, "cols": True, } ) self.fields.pop("spatial_reference_system") geo_keys = keys[:] else: geo_keys = [ "x", "estimated_error_x", "y", "estimated_error_y", "z", "estimated_error_z", "spatial_reference_system", ] for geo_fields in self.GEO_FIELDS: if geo_fields != keys: for geo_field in geo_fields: if geo_field == "x": self.fields.pop("estimated_error_x") self.fields.pop("y") self.fields.pop("estimated_error_y") if geo_field == "z": if geo_keys[0] == "point_3d": geo_keys = list(geo_keys) + ["z"] continue self.fields.pop("estimated_error_z") self.fields.pop(geo_field) break return geo_keys def get_headers(self): headers = self.HEADERS.copy() if self.geo_keys: headers[self.geo_keys[0]] = headers.pop("geo_field") return headers def clean(self): cleaned_data = self.cleaned_data if cleaned_data.get("buffer", None) and not cleaned_data.get( "buffer_type", None ): raise forms.ValidationError(_("If you set a buffer set a buffer type.")) if cleaned_data.get("buffer_type", None) and not cleaned_data.get( "buffer", None ): raise forms.ValidationError(_("If you set a buffer type set a buffer.")) if cleaned_data.get("point_3d"): if "z" not in cleaned_data: raise forms.ValidationError(_("A value is expected for Z.")) value = cleaned_data["point_3d"].ewkt if "POINT Z" not in value: value = value.replace("POINT", "POINT Z") cleaned_data["point_3d"] = f'{value.split(")")[0]} {cleaned_data["z"]})' if "x" not in self.geo_keys: # reverse... geo_value = cleaned_data[self.geo_keys[0]] if geo_value: if not isinstance(geo_value, str): geo_value = geo_value.ewkt if geo_value.startswith("SRID=4326;"): cleaned_data[self.geo_keys[0]] = reverse_coordinates(geo_value) for rel in models.GeoVectorData.RELATED_MODELS: if cleaned_data.get(rel, None): return cleaned_data raise forms.ValidationError( _("A geo item has to be attached at least to one item") ) def save(self, commit=True): if not getattr(self.instance, "source_content_type_id", None): self.instance.source_content_type_id = self.source_content_type if not getattr(self.instance, "source_id", None): self.instance.source_id = self.source_id item = super().save(commit=commit) for related_key in models.GeoVectorData.RELATED_MODELS: related = getattr(item, related_key) initial = dict([(rel.pk, rel) for rel in related.all()]) new = [int(pk) for pk in sorted(self.cleaned_data.get(related_key, []))] full_new = new[:] if related_key in self.too_many: full_new += self.too_many[related_key] for pk, value in initial.items(): if pk in full_new: continue related.remove(value) for new_pk in new: related_item = related.model.objects.get(pk=new_pk) if new_pk not in initial.keys(): related.add(related_item) key = "{}_{}_main_item".format(related_key, related_item.pk) if self.cleaned_data.get(key, []) and related_item.main_geodata != item: related_item.skip_history_when_saving = True related_item.main_geodata = item related_item.save() item = models.GeoVectorData.objects.get(pk=item.pk) if self.user and not getattr(item, "history_creator", None): item.history_creator = self.user item.history_modifier = self.user item.skip_history_when_saving = True item.save() # resave to regen the attached items return item class PreGISForm(IshtarForm): geom_type = forms.ChoiceField( label=_("Geometry type"), choices=( ("coordinates", _("Coordinates")), ("point_2d", _("Point")), ("point_3d", _("Point 3D")), ("multi_points", _("Multi-points")), ("multi_line", _("Multi-lines")), ("multi_polygon", _("Multi-polygons")), ), ) HEADERS = { "geom_type": FormHeader(_("Type")), } def __init__(self, *args, **kwargs): back_url = "" if "back_url" in kwargs: back_url = kwargs.pop("back_url") super().__init__(*args, **kwargs) if back_url: self.fields["back_url"] = forms.CharField( label="", required=False, widget=forms.HiddenInput, initial=back_url )