#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2010-2016 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import csv from collections import OrderedDict import datetime from io import StringIO import os import pyqrcode import re import requests import shutil import tempfile from urllib.parse import urlparse, quote import zipfile from django import forms from django.contrib.gis import forms as gis_forms from django.conf import settings from django.contrib.auth.models import User from django.contrib.contenttypes.models import ContentType from django.core import validators from django.core.cache import cache from django.core.exceptions import ObjectDoesNotExist from django.core.files import File from django.core.validators import MaxValueValidator, MinValueValidator from django.forms.formsets import formset_factory from django.forms.models import BaseModelFormSet, BaseFormSet from django.shortcuts import reverse from django.utils.text import slugify from django.utils.safestring import mark_safe from django.utils.translation import ugettext_lazy as _, pgettext from . import models, models_rest from . import widgets from bootstrap_datepicker.widgets import DatePicker from ishtar_common.templatetags.link_to_window import simple_link_to_window from .forms import ( FinalForm, FormSet, reverse_lazy, name_validator, TableSelect, ManageOldType, CustomForm, FieldType, FormHeader, FormSetWithDeleteSwitches, BSForm, get_data_from_formset, file_size_validator, HistorySelect, CustomFormSearch, QAForm, IshtarForm, MultiSearchForm, LockForm, ) from ishtar_common.data_importer import ImporterError from ishtar_common.utils import is_downloadable, clean_session_cache, max_size_help, \ reverse_coordinates from archaeological_operations.models import Operation, OperationType from archaeological_context_records.models import ContextRecord from archaeological_finds.models import Find, FindBasket from archaeological_warehouse.models import Container def get_town_field(label=_("Town"), required=True): help_text = _( "

Type name, department code of the " "town you would like to select. The search is insensitive to case." "

\n

Only the first twenty results are displayed but specifying " "the department code is generally sufficient to get the appropriate " "result.

\n

For instance type \"saint denis 93\"" " for getting the french town Saint-Denis in the Seine-Saint-Denis " "department.

" ) # !FIXME hard_link, reverse_lazy doen't seem to work with formsets return forms.IntegerField( widget=widgets.JQueryAutoComplete( "/" + settings.URL_PATH + "autocomplete-town", associated_model=models.Town ), validators=[models.valid_id(models.Town)], label=label, help_text=mark_safe(help_text), required=required, ) def get_advanced_town_field(label=_("Town"), required=True): # !FIXME hard_link, reverse_lazy doen't seem to work with formsets return forms.IntegerField( widget=widgets.JQueryTown( "/" + settings.URL_PATH + "autocomplete-advanced-town" ), validators=[models.valid_id(models.Town)], label=label, required=required, ) def get_person_field(label=_("Person"), required=True, person_types=None): if not person_types: person_types = [] # !FIXME hard_link, reverse_lazy doen't seem to work with formsets widget = None url = "/" + settings.URL_PATH + "autocomplete-person" if person_types: person_types = [ str(models.PersonType.objects.get(txt_idx=person_type).pk) for person_type in person_types ] url += "/" + "_".join(person_types) widget = widgets.JQueryAutoComplete(url, associated_model=models.Person) return forms.IntegerField( widget=widget, label=label, required=required, validators=[models.valid_id(models.Person)], ) class NewItemForm(forms.Form): def __init__(self, *args, **kwargs): self.limits = {} if "limits" in kwargs: limits = kwargs.pop("limits") if limits: for item in limits.split(";"): key, values = item.split("__") self.limits[key] = values.split("-") super(NewItemForm, self).__init__(*args, **kwargs) def limit_fields(self): for key in self.limits: if key in self.fields and hasattr(self.fields[key], "choices"): new_choices = [ (value, lbl) for value, lbl in self.fields[key].choices if str(value) in self.limits[key] ] self.fields[key].choices = new_choices if len(new_choices) == 1: self.fields[key].initial = [new_choices[0][0]] class BaseImportForm(IshtarForm, forms.ModelForm): error_css_class = "error" required_css_class = "required" importer_type = "tab" imported_images_link = forms.URLField( label=_("Associated images (web link to a zip file)"), required=False ) class Meta: model = models.Import fields = ( "name", "importer_type", "imported_file", "encoding", "csv_sep", "imported_images", "imported_images_link", "associated_group", "skip_lines", ) widgets = { "imported_file": widgets.BSClearableFileInput, "imported_images": widgets.BSClearableFileInput, } HEADERS = { "name": FormHeader(_("Import (table)")), } def __init__(self, *args, **kwargs): user = kwargs.pop("user") super(BaseImportForm, self).__init__(*args, **kwargs) groups = models.TargetKeyGroup.objects.filter(available=True) if not user.is_superuser: groups = groups.filter(all_user_can_use=True) if not groups.count(): self.fields.pop("associated_group") else: self.fields["associated_group"].choices = [(None, "--")] + [ (g.pk, str(g)) for g in groups.all() ] self.fields["importer_type"].choices = [("", "--")] + [ (imp.pk, imp.name) for imp in models.ImporterType.objects.filter(available=True, type=self.importer_type) ] if "imported_images" in self.fields: self.fields["imported_images"].validators = [file_size_validator] self.fields["imported_file"].validators = [file_size_validator] self._post_init() def _clean_csv(self, is_csv=False): imported_file = self.cleaned_data.get("imported_file", None) encoding = self.cleaned_data.get("encoding", None) if imported_file and encoding: try: if not imported_file.name.lower().endswith(".csv"): if is_csv: raise AssertionError() else: return imported_file.seek(0) reader = csv.reader(StringIO(imported_file.read().decode(encoding))) for __ in reader: break imported_file.seek(0) except (AssertionError, UnicodeDecodeError) as e: raise forms.ValidationError( _("This is not a valid CSV file. Check file format and encoding.") ) def clean(self): data = self.cleaned_data if ( data.get("conservative_import", None) and data.get("importer_type") and not data.get("importer_type").unicity_keys ): raise forms.ValidationError( _( "This import type have no unicity type defined. " "Conservative import is not possible." ) ) return data class NewImportForm(BaseImportForm): imported_images_link = forms.URLField( label=_("Associated images (web link to a zip file)"), required=False ) class Meta: model = models.Import fields = ( "name", "importer_type", "imported_file", "encoding", "csv_sep", "imported_images", "imported_images_link", "associated_group", "skip_lines", ) HEADERS = { "name": FormHeader(_("Import (table)")), } def clean(self): data = super().clean() if data.get("imported_images_link", None) and data.get("imported_images", None): raise forms.ValidationError( _( "You put either a file or a download link for images " "but not both." ) ) if data.get("imported_images"): try: images = data.get("imported_images") zf = zipfile.ZipFile(images) zf.testzip() except zipfile.BadZipFile: raise forms.ValidationError( _("\"Associated images\" field must be a valid zip file.") ) self._clean_csv(is_csv=True) return data def clean_imported_images_link(self): value = self.cleaned_data.get("imported_images_link", None) if value: try: assert is_downloadable(value) except (AssertionError, requests.exceptions.RequestException): raise forms.ValidationError( _("Invalid link or no file is available for this link.") ) return value def save(self, user, commit=True): self.instance.user = user imported_images_link = ( self.cleaned_data.pop("imported_images_link") if "imported_images_link" in self.cleaned_data else None ) item = super(BaseImportForm, self).save(commit) if not imported_images_link: return item request = requests.get(imported_images_link, stream=True) ntf = tempfile.NamedTemporaryFile() for block in request.iter_content(1024 * 8): if not block: break ntf.write(block) file_name = imported_images_link.split("/")[-1] item.imported_images.save(file_name, File(ntf)) return item class NewImportGISForm(BaseImportForm): error_css_class = "error" required_css_class = "required" importer_type = "gis" class Meta: model = models.Import fields = ( "name", "importer_type", "imported_file", "encoding", "csv_sep", "associated_group", "skip_lines", ) HEADERS = { "name": FormHeader(_("Import (GIS)")), } def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.fields["skip_lines"].initial = 0 def clean_imported_file(self): value = self.cleaned_data.get("imported_file", None) if value: try: ext = value.name.lower().split(".")[-1] assert ext in ("zip", "gpkg", "csv") if ext == "zip": zip_file = zipfile.ZipFile(value) assert not zip_file.testzip() has_correct_file = False for name in zip_file.namelist(): in_ext = name.lower().split(".")[-1] if in_ext in ("shp", "gpkg"): has_correct_file = True break assert has_correct_file except AssertionError: raise forms.ValidationError( _("GIS file must be a zip containing a ShapeFile or GeoPackage file.") ) return value def clean(self): data = super().clean() self._clean_csv() return data def save(self, user, commit=True): self.instance.user = user item = super(NewImportGISForm, self).save(commit) return item class TargetKeyForm(forms.ModelForm): class Meta: model = models.TargetKey fields = ("target", "key", "value") widgets = { "key": forms.TextInput(attrs={"readonly": "readonly"}), } target = widgets.SelectReadonlyField(model=models.ImportTarget, label=_("Target")) value = widgets.Select2SimpleField(label=_("Value"), required=False) remember = forms.ChoiceField(label=_("Remember"), choices=[], required=False) NULL_VALUE = "" def __init__(self, *args, **kwargs): self.user = kwargs.pop("user") super(TargetKeyForm, self).__init__(*args, **kwargs) instance = getattr(self, "instance", None) self.associated_import = None if instance and instance.pk: model = instance.target.associated_model if ( model and self.user.has_perm( "{}.change_{}".format(model._meta.app_label, model._meta.model_name) ) and hasattr(model, "admin_url") ): self.admin_url = instance.target.associated_model.admin_url() self.associated_import = instance.associated_import self.fields["target"].choices = [ (instance.target.pk, instance.target.verbose_name) ] self.fields["key"].widget.attrs["readonly"] = True self.fields["key"].widget.attrs["title"] = str(instance) self.fields["value"].choices = list(instance.target.get_choices()) self.fields["value"].choices.insert(1, (self.NULL_VALUE, _("Set to NULL"))) self.fields["key"].required = False self.fields["target"].required = False self.fields["value"].required = False choices = [ ("import", _("this import only")), ("me", _("me")), ] if ( self.associated_import and self.associated_import.associated_group and ( self.associated_import.associated_group.all_user_can_modify or self.user.is_superuser ) ): choices += [ ( "group", str(_("the current group: {}")).format( self.associated_import.associated_group ), ) ] if self.user.is_superuser: choices += [("all", _("all users"))] self.fields["remember"].choices = choices self.fields["remember"].widget.attrs["class"] = "auto" self.remember_choices = choices def clean_target(self): instance = getattr(self, "instance", None) if instance and instance.pk: return instance.target else: return self.cleaned_data["target"] def clean_key(self): instance = getattr(self, "instance", None) if instance and instance.pk: return instance.key else: return self.cleaned_data["key"] def save(self, commit=True): try: super(TargetKeyForm, self).save(commit) except ImporterError: return if not self.cleaned_data.get("value") or not self.user: return if self.cleaned_data["value"] == self.NULL_VALUE: self.instance.value = None self.instance.is_set = True can_edit_group = ( self.associated_import and self.associated_import.associated_group and ( self.associated_import.associated_group.all_user_can_modify or self.user.is_superuser ) ) remember = self.cleaned_data.get("remember") if remember == "import" and self.associated_import: self.instance.associated_import = self.associated_import self.instance.associated_user = None self.instance.associated_group = None elif remember == "group" and can_edit_group: self.instance.associated_import = None self.instance.associated_user = None self.instance.associated_group = self.associated_import.associated_group elif remember == "all" and self.user.is_superuser: self.instance.associated_import = None self.instance.associated_user = None self.instance.associated_group = None else: # for me! self.instance.associated_import = None self.instance.associated_user = self.user.ishtaruser self.instance.associated_group = None self.instance.save() class TargetKeyFormset(BaseModelFormSet): def __init__(self, *args, **kwargs): self.user = kwargs.pop("user") super(TargetKeyFormset, self).__init__(*args, **kwargs) def get_form_kwargs(self, index): kwargs = super(TargetKeyFormset, self).get_form_kwargs(index) kwargs["user"] = self.user return kwargs class OrganizationForm(ManageOldType, NewItemForm): prefix = "organization" form_label = _("Organization") associated_models = { "organization_type": models.OrganizationType, "precise_town": models.Town, } name = forms.CharField(label=_("Name"), max_length=300, validators=[name_validator]) organization_type = forms.ChoiceField(label=_("Organization type"), choices=[]) url = forms.URLField(label=_("Web address"), required=False) grammatical_gender = forms.ChoiceField( label=_("Grammatical gender"), choices=[("", "--")] + list(models.GENDER), required=False, help_text=_("Can be used by templates"), ) address = forms.CharField(label=_("Address"), widget=forms.Textarea, required=False) address_complement = forms.CharField( label=_("Address complement"), widget=forms.Textarea, required=False ) postal_code = forms.CharField(label=_("Postal code"), max_length=10, required=False) town = forms.CharField(label=_("Town (freeform)"), max_length=30, required=False) precise_town = get_town_field(required=False) country = forms.CharField(label=_("Country"), max_length=30, required=False) email = forms.EmailField(label=_("Email"), required=False) phone = forms.CharField(label=_("Phone"), max_length=32, required=False) mobile_phone = forms.CharField( label=_("Mobile phone"), max_length=32, required=False ) def __init__(self, *args, **kwargs): super(OrganizationForm, self).__init__(*args, **kwargs) self.fields["organization_type"].choices = models.OrganizationType.get_types( initial=self.init_data.get("organization_type") ) self.fields["organization_type"].help_text = models.OrganizationType.get_help() self.limit_fields() def save(self, user, item=None): dct = self.cleaned_data dct["history_modifier"] = user dct["organization_type"] = models.OrganizationType.objects.get( pk=dct["organization_type"] ) if dct["precise_town"]: try: dct["precise_town"] = models.Town.objects.get(pk=dct["precise_town"]) except models.Town.DoesNotExist: dct.pop("precise_town") if not item: new_item = models.Organization(**dct) else: new_item = item for k in dct: setattr(new_item, k, dct[k]) new_item.save() return new_item class OrganizationSelect(CustomForm, TableSelect): _model = models.Organization search_vector = forms.CharField( label=_("Full text search"), widget=widgets.SearchWidget("ishtar-common", "organization"), ) name = forms.CharField(label=_("Name"), max_length=300) organization_type = forms.ChoiceField(label=_("Type"), choices=[]) precise_town = get_town_field() def __init__(self, *args, **kwargs): super(OrganizationSelect, self).__init__(*args, **kwargs) self.fields["organization_type"].choices = models.OrganizationType.get_types() class OrganizationFormSelection(CustomFormSearch): SEARCH_AND_SELECT = True form_label = _("Organization search") associated_models = {"pk": models.Organization} currents = {"pk": models.Organization} pk = forms.IntegerField( label="", widget=widgets.DataTable( reverse_lazy("get-organization"), OrganizationSelect, models.Organization, source_full=reverse_lazy("get-organization-full"), ), validators=[models.valid_id(models.Organization)], ) class OrganizationFormMultiSelection(MultiSearchForm): form_label = _("Organization search") associated_models = {"pks": models.Organization} pk = forms.CharField( label="", required=True, widget=widgets.DataTable( reverse_lazy("get-organization"), OrganizationSelect, models.Organization, multiple_select=True, source_full=reverse_lazy("get-organization-full"), ), validators=[models.valid_ids(models.Organization)], ) class QAOrganizationFormMulti(QAForm): form_admin_name = _("Organization - Quick action - Modify") form_slug = "organization-quickaction-modify" base_models = ["qa_organization_type"] associated_models = { "qa_organization_type": models.OrganizationType, } MULTI = True REPLACE_FIELDS = ["qa_organization_type", "qa_grammatical_gender"] qa_organization_type = forms.ChoiceField( label=_("Organization type"), required=False ) qa_grammatical_gender = forms.ChoiceField( label=_("Grammatical gender"), choices=[("", "--")] + list(models.GENDER), required=False, help_text=_("Can be used by templates"), ) TYPES = [ FieldType("qa_organization_type", models.OrganizationType), ] class ManualMerge(object): def clean_to_merge(self): value = self.cleaned_data.get("to_merge", None) or [] if value: value = value.split(",") values = [] for val in value: try: values.append(int(val)) except ValueError: pass if len(values) < 2: raise forms.ValidationError(_("At least two items have to be " "selected.")) self.cleaned_data["to_merge"] = values return values def get_items(self): items = [] model = self.associated_models["to_merge"] for pk in sorted(self.cleaned_data["to_merge"]): try: items.append(model.objects.get(pk=pk)) except model.DoesNotExist: pass return items class MergeIntoForm(forms.Form): main_item = forms.ChoiceField( label=_("Merge all items into"), choices=[], widget=forms.RadioSelect() ) def __init__(self, *args, **kwargs): self.items = kwargs.pop("items") super(MergeIntoForm, self).__init__(*args, **kwargs) self.fields["main_item"].choices = [] for pk in self.items: try: item = self.associated_model.objects.get(pk=pk) except self.associated_model.DoesNotExist: continue self.fields["main_item"].choices.append( ( item.pk, mark_safe("{} {}".format(simple_link_to_window(item), str(item))), ) ) def merge(self): model = self.associated_model try: main_item = model.objects.get(pk=self.cleaned_data["main_item"]) except model.DoesNotExist: return for pk in self.items: if pk == str(main_item.pk): continue try: item = model.objects.get(pk=pk) except model.DoesNotExist: continue main_item.merge(item) return main_item class OrgaMergeFormSelection(ManualMerge, forms.Form): SEARCH_AND_SELECT = True form_label = _("Organization to merge") associated_models = {"to_merge": models.Organization} currents = {"to_merge": models.Organization} to_merge = forms.CharField( label="", required=False, widget=widgets.DataTable( reverse_lazy("get-organization"), OrganizationSelect, models.Organization, multiple_select=True, source_full=reverse_lazy("get-organization-full"), ), ) class OrgaMergeIntoForm(MergeIntoForm): associated_model = models.Organization class BaseOrganizationForm(forms.ModelForm): form_prefix = "orga" class Meta: model = models.Organization fields = [ "name", "organization_type", "grammatical_gender", "address", "address_complement", "town", "postal_code", ] class PersonSelect(CustomForm, TableSelect): _model = models.Person search_vector = forms.CharField( label=_("Full text search"), widget=widgets.SearchWidget("ishtar-common", "person"), ) name = forms.CharField(label=_("Name"), max_length=200) surname = forms.CharField(label=_("Surname"), max_length=50) email = forms.CharField(label=_("Email"), max_length=75) person_types = forms.ChoiceField(label=_("Type"), choices=[]) attached_to = forms.IntegerField( label=_("Organization"), widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-organization"), associated_model=models.Organization, ), validators=[models.valid_id(models.Organization)], ) def __init__(self, *args, **kwargs): super(PersonSelect, self).__init__(*args, **kwargs) self.fields["person_types"].choices = models.PersonType.get_types() class PersonFormSelection(CustomFormSearch): SEARCH_AND_SELECT = True form_label = _("Person search") associated_models = {"pk": models.Person} currents = {"pk": models.Person} pk = forms.IntegerField( label="", widget=widgets.DataTable( reverse_lazy("get-person"), PersonSelect, models.Person, source_full=reverse_lazy("get-person-full"), ), validators=[models.valid_id(models.Person)], ) class PersonFormMultiSelection(MultiSearchForm): form_label = _("Person search") associated_models = {"pks": models.Person} pk = forms.CharField( label="", required=True, widget=widgets.DataTable( reverse_lazy("get-person"), PersonSelect, models.Person, multiple_select=True, source_full=reverse_lazy("get-person-full"), ), validators=[models.valid_ids(models.Person)], ) class QAPersonFormMulti(QAForm): form_admin_name = _("Person - Quick action - Modify") form_slug = "person-quickaction-modify" base_models = ["qa_title", "qa_person_types"] associated_models = { "qa_title": models.TitleType, "qa_attached_to": models.Organization, "qa_person_types": models.PersonType, } MULTI = True REPLACE_FIELDS = ["qa_title", "qa_attached_to"] qa_title = forms.ChoiceField(label=_("Title"), required=False) qa_attached_to = forms.IntegerField( label=_("Organization"), widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-organization"), associated_model=models.Organization, ), validators=[models.valid_id(models.Organization)], required=False, ) qa_person_types = widgets.Select2MultipleField( label=_("Person types"), required=False ) TYPES = [ FieldType("qa_title", models.TitleType), FieldType("qa_person_types", models.PersonType, is_multiple=True), ] def _get_qa_attached_to(self, value): try: value = models.Organization.objects.get(pk=value).cached_label except models.Organization.DoesNotExist: return "" return value class PersonMergeFormSelection(ManualMerge, forms.Form): SEARCH_AND_SELECT = True form_label = _("Person to merge") associated_models = {"to_merge": models.Person} currents = {"to_merge": models.Person} to_merge = forms.CharField( label="", required=False, widget=widgets.DataTable( reverse_lazy("get-person"), PersonSelect, models.Person, multiple_select=True, source_full=reverse_lazy("get-person-full"), ), ) class PersonMergeIntoForm(MergeIntoForm): associated_model = models.Person class SimplePersonForm(ManageOldType, NewItemForm): extra_form_modals = ["organization"] form_label = _("Identity") associated_models = { "attached_to": models.Organization, "title": models.TitleType, "precise_town": models.Town, } title = forms.ChoiceField(label=_("Title"), choices=[], required=False) salutation = forms.CharField(label=_("Salutation"), max_length=200, required=False) surname = forms.CharField( label=_("Surname"), max_length=50, validators=[name_validator] ) name = forms.CharField(label=_("Name"), max_length=200, validators=[name_validator]) raw_name = forms.CharField(label=_("Raw name"), max_length=300, required=False) email = forms.EmailField(label=_("Email"), required=False) phone_desc = forms.CharField( label=_("Phone description"), max_length=300, required=False ) phone = forms.CharField(label=_("Phone"), max_length=32, required=False) phone_desc2 = forms.CharField( label=_("Phone description 2"), max_length=300, required=False ) phone2 = forms.CharField(label=_("Phone 2"), max_length=32, required=False) phone_desc3 = forms.CharField( label=_("Phone description 3"), max_length=300, required=False ) phone3 = forms.CharField(label=_("Phone 3"), max_length=32, required=False) mobile_phone = forms.CharField( label=_("Mobile phone"), max_length=32, required=False ) attached_to = forms.IntegerField( label=_("Current organization"), widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-organization"), associated_model=models.Organization, new=True, ), validators=[models.valid_id(models.Organization)], required=False, ) address = forms.CharField(label=_("Address"), widget=forms.Textarea, required=False) address_complement = forms.CharField( label=_("Address complement"), widget=forms.Textarea, required=False ) postal_code = forms.CharField(label=_("Postal code"), max_length=10, required=False) town = forms.CharField(label=_("Town (freeform)"), max_length=30, required=False) precise_town = get_town_field(required=False) country = forms.CharField(label=_("Country"), max_length=30, required=False) alt_address = forms.CharField( label=_("Other address: address"), widget=forms.Textarea, required=False ) alt_address_complement = forms.CharField( label=_("Other address: address complement"), widget=forms.Textarea, required=False, ) alt_postal_code = forms.CharField( label=_("Other address: postal code"), max_length=10, required=False ) alt_town = forms.CharField( label=_("Other address: town"), max_length=30, required=False ) alt_country = forms.CharField( label=_("Other address: country"), max_length=30, required=False ) def __init__(self, *args, **kwargs): super(SimplePersonForm, self).__init__(*args, **kwargs) self.fields["raw_name"].widget.attrs["readonly"] = True self.fields["title"].choices = models.TitleType.get_types( initial=self.init_data.get("title") ) class PersonUserSelect(PersonSelect): ishtaruser__isnull = forms.NullBooleanField(label=_("Already has an account")) class PersonUserFormSelection(PersonFormSelection): SEARCH_AND_SELECT = True form_label = _("Person search") associated_models = {"pk": models.Person} currents = {"pk": models.Person} pk = forms.IntegerField( label="", widget=widgets.DataTable( reverse_lazy("get-person-for-account"), PersonUserSelect, models.Person, table_cols="TABLE_COLS_ACCOUNT", ), validators=[models.valid_id(models.Person)], ) class IshtarUserSelect(TableSelect): _model = models.IshtarUser search_vector = forms.CharField( label=_("Full text search"), widget=widgets.SearchWidget("ishtar-common", "ishtaruser"), ) username = forms.CharField(label=_("Username"), max_length=200) name = forms.CharField(label=_("Name"), max_length=200) surname = forms.CharField(label=_("Surname"), max_length=50) email = forms.CharField(label=_("Email"), max_length=75) person_types = forms.ChoiceField(label=_("Type"), choices=[]) attached_to = forms.IntegerField( label=_("Organization"), widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-organization"), associated_model=models.Organization, ), validators=[models.valid_id(models.Organization)], ) def __init__(self, *args, **kwargs): self.user = kwargs.pop("user") if "user" in kwargs else None super(IshtarUserSelect, self).__init__(*args, **kwargs) self.fields["person_types"].choices = models.PersonType.get_types() class AccountFormSelection(forms.Form): SEARCH_AND_SELECT = True form_label = _("Account search") associated_models = {"pk": models.IshtarUser} currents = {"pk": models.IshtarUser} pk = forms.IntegerField( label="", widget=widgets.DataTable( reverse_lazy("get-ishtaruser"), IshtarUserSelect, models.IshtarUser ), validators=[models.valid_id(models.IshtarUser)], ) class AccountFormMultiSelection(MultiSearchForm): form_label = _("Account search") associated_models = {"pks": models.IshtarUser} pk_key = "pks" pk = forms.CharField( label="", required=False, widget=widgets.DataTable( reverse_lazy("get-ishtaruser"), IshtarUserSelect, models.IshtarUser ), validators=[models.valid_id(models.IshtarUser)], ) class BasePersonForm(forms.ModelForm): class Meta: model = models.Person fields = [ "title", "salutation", "name", "surname", "address", "address_complement", "town", "postal_code", ] class BaseOrganizationPersonForm(forms.ModelForm): class Meta: model = models.Person fields = ["attached_to", "title", "salutation", "name", "surname"] widgets = { "attached_to": widgets.JQueryPersonOrganization( reverse_lazy("autocomplete-organization"), reverse_lazy("organization_create"), model=models.Organization, attrs={"hidden": True}, new=True, ), } def __init__(self, *args, **kwargs): super(BaseOrganizationPersonForm, self).__init__(*args, **kwargs) def save(self, *args, **kwargs): person = super(BaseOrganizationPersonForm, self).save(*args, **kwargs) instance = person.attached_to form = BaseOrganizationForm( self.data, instance=instance, prefix=BaseOrganizationForm.form_prefix ) if form.is_valid(): orga = form.save() if not person.attached_to: person.attached_to = orga person.save() return person class PersonForm(SimplePersonForm): person_types = forms.MultipleChoiceField( label=_("Person type"), choices=[], required=False, widget=forms.CheckboxSelectMultiple, ) def __init__(self, *args, **kwargs): super(PersonForm, self).__init__(*args, **kwargs) self.fields["person_types"].choices = models.PersonType.get_types( initial=self.init_data.get("person_types"), empty_first=False ) self.fields["person_types"].help_text = models.PersonType.get_help() self.limit_fields() def save(self, user, item=None): dct = self.cleaned_data dct["history_modifier"] = user for key in self.associated_models.keys(): if key in dct: if not dct[key]: dct.pop(key) else: model = self.associated_models[key] try: dct[key] = model.objects.get(pk=dct[key]) except model.DoesNotExist: dct.pop(key) person_types = dct.pop("person_types") if not item: new_item = models.Person.objects.create(**dct) else: for k in dct: setattr(item, k, dct[k]) item.save() item.person_types.clear() new_item = item for pt in person_types: new_item.person_types.add(pt) return new_item class NoOrgaPersonForm(PersonForm): def __init__(self, *args, **kwargs): super(NoOrgaPersonForm, self).__init__(*args, **kwargs) self.fields.pop("attached_to") class PersonTypeForm(ManageOldType, forms.Form): form_label = _("Person type") base_model = "person_type" associated_models = {"person_type": models.PersonType} person_type = forms.MultipleChoiceField( label=_("Person type"), choices=[], required=False, widget=widgets.Select2Multiple, ) def __init__(self, *args, **kwargs): super(PersonTypeForm, self).__init__(*args, **kwargs) self.fields["person_type"].choices = models.PersonType.get_types( initial=self.init_data.get("person_type"), empty_first=False ) self.fields["person_type"].help_text = models.PersonType.get_help() class AccountForm(IshtarForm): form_label = _("Account") associated_models = {"pk": models.Person} currents = {"pk": models.Person} pk = forms.IntegerField(label="", widget=forms.HiddenInput, required=False) username = forms.CharField(label=_("Account"), max_length=30) email = forms.CharField( label=_("Email"), max_length=75, validators=[validators.validate_email] ) hidden_password = forms.CharField( label=_("New password"), max_length=128, widget=forms.PasswordInput, required=False, validators=[validators.MinLengthValidator(4)], ) hidden_password_confirm = forms.CharField( label=_("New password (confirmation)"), max_length=128, widget=forms.PasswordInput, required=False, ) HEADERS = { "hidden_password": FormHeader( _("New password"), help_message=_( "Keep these fields empty if you do not want to " "change password. On creation, if you leave these " "fields empty, the user will not be able to " "connect." ), ), } def __init__(self, *args, **kwargs): person = None if "initial" in kwargs and "pk" in kwargs["initial"]: try: person = models.Person.objects.get(pk=kwargs["initial"]["pk"]) account = models.IshtarUser.objects.get(person=person).user_ptr if not kwargs["initial"].get("username"): kwargs["initial"]["username"] = account.username if not kwargs["initial"].get("email"): kwargs["initial"]["email"] = account.email except ObjectDoesNotExist: pass if "person" in kwargs: person = kwargs.pop("person") super(AccountForm, self).__init__(*args, **kwargs) if person and (person.raw_name or (person.name and person.surname)): profile = models.IshtarSiteProfile.get_current_profile() if person.name and person.surname: values = [person.name.lower(), person.surname.lower()] else: values = person.raw_name.lower().split(" ") if profile.account_naming_style == "FN" and len(values) > 1: values = values[1:] + [values[0]] self.fields["username"].initial = ".".join(values) def clean(self): cleaned_data = self.cleaned_data password = cleaned_data.get("hidden_password") if password and password != cleaned_data.get("hidden_password_confirm"): raise forms.ValidationError( _("Your password and confirmation " "password do not match.") ) if not cleaned_data.get("pk"): models.is_unique(User, "username")(cleaned_data.get("username")) if not password: raise forms.ValidationError( _("You must provide a correct " "password.") ) # check username unicity q = User.objects.filter( username=cleaned_data.get("username") ) if cleaned_data.get("pk"): q = q.exclude(ishtaruser__person__pk=cleaned_data.get("pk")) if q.count(): raise forms.ValidationError(_("This username already exists.")) return cleaned_data class ProfileForm(ManageOldType): form_label = _("Profiles") base_model = "profile" associated_models = {"profile_type": models.ProfileType, "area": models.Area} profile_type = forms.ChoiceField(label=_("Type"), choices=[]) area = widgets.Select2MultipleField(label=_("Areas"), required=False) name = forms.CharField(label=_("Name"), required=False) pk = forms.IntegerField(label=" ", widget=forms.HiddenInput, required=False) TYPES = [ FieldType("profile_type", models.ProfileType), FieldType("area", models.Area, is_multiple=True), ] class ProfileFormsetBase(FormSetWithDeleteSwitches): def clean(self): values = [] if not getattr(self, "cleaned_data", None): return for data in self.cleaned_data: if not data.get("profile_type", None): continue value = ( data.get("profile_type", None), data.get("name", None) ) if value in values: raise forms.ValidationError(_("Choose different name for profiles.")) values.append(value) return self.cleaned_data ProfileFormset = formset_factory( ProfileForm, can_delete=True, formset=ProfileFormsetBase ) ProfileFormset.form_label = _("Profiles") ProfileFormset.form_admin_name = _("Profiles") ProfileFormset.form_slug = "profiles" class FinalAccountForm(forms.Form): final = True form_label = _("Confirm") send_password = forms.BooleanField( label=_("Send the new password by " "email?"), required=False ) def __init__(self, *args, **kwargs): self.is_hidden = True super(FinalAccountForm, self).__init__(*args, **kwargs) class ProfilePersonForm(forms.Form): """ Edit the current profile """ current_profile = forms.ChoiceField(label=_("Current profile"), choices=[]) name = forms.CharField(label=_("Name"), required=False) profile_type = forms.ChoiceField( label=_("Profile type"), required=False, disabled=True, choices=[] ) auto_pin = forms.BooleanField( label=_("Pin automatically items on creation and modification"), required=False ) # display_pin_menu = forms.BooleanField(label=_("Show pin menu"), required=False) duplicate_profile = forms.BooleanField( label=_("Duplicate this profile"), required=False ) delete_profile = forms.BooleanField( label=_("Delete this profile"), required=False, ) def __init__(self, *args, **kwargs): self.user = kwargs.pop("user") choices, initial = [], kwargs.get("initial", {}) current_profile = None for profile in self.user.ishtaruser.person.profiles.order_by( "name", "profile_type__label" ).all(): if profile.current: current_profile = profile initial["current_profile"] = profile.pk choices.append((profile.pk, str(profile))) if current_profile: initial["name"] = current_profile.name or current_profile.profile_type initial["profile_type"] = current_profile.profile_type.pk initial["auto_pin"] = current_profile.auto_pin # initial["display_pin_menu"] = current_profile.display_pin_menu kwargs["initial"] = initial super(ProfilePersonForm, self).__init__(*args, **kwargs) self.fields["current_profile"].choices = choices current_external_sources = ( [src.pk for src in current_profile.external_sources.all()] if current_profile else [] ) extra_fields = [] q = models_rest.ApiExternalSource.objects.filter(users=self.user.ishtaruser) for src in q.all(): initial = True if src.pk in current_external_sources else False extra_fields.append( ( f"external_source_{src.pk}", forms.BooleanField( label=str(_("External source - {}")).format(src.name), required=False, initial=initial, ), ) ) self.fields[f"external_source_{src.pk}"] = forms.BooleanField( label=str(_("External source - {}")).format(src.name), required=False, initial=initial, ) if extra_fields: # reorder fields fields = OrderedDict() for key in self.fields.keys(): if key == "duplicate_profile": for extra_key, extra_field in extra_fields: fields[extra_key] = extra_fields fields[key] = self.fields[key] self.fields = fields if ( not current_profile or not self.user.ishtaruser.person.profiles.filter( profile_type=current_profile.profile_type ) .exclude(pk=current_profile.pk) .count() ): # cannot delete the current profile if no profile of this type is # available self.fields.pop("delete_profile") if not current_profile: return self.fields["profile_type"].choices = [ (current_profile.profile_type.pk, current_profile.profile_type.name) ] def clean(self): data = self.cleaned_data q = models.UserProfile.objects.filter( person__ishtaruser=self.user.ishtaruser, pk=data["current_profile"] ) if not q.count(): return data profile = q.all()[0] name = data.get("name", "") if ( models.UserProfile.objects.filter( person__ishtaruser=self.user.ishtaruser, name=name ) .exclude(pk=profile.pk) .count() ): raise forms.ValidationError(_("A profile with the same name exists.")) return data def save(self, session): q = models.UserProfile.objects.filter( person__ishtaruser=self.user.ishtaruser, pk=self.cleaned_data["current_profile"], ) if not q.count(): return profile = q.all()[0] # manage deletion if self.cleaned_data.get("delete_profile", None): q = self.user.ishtaruser.person.profiles.filter( profile_type=profile.profile_type ).exclude(pk=profile.pk) if not q.count(): # cannot delete the current profile if no profile of this type # is available return new_current = q.all()[0] new_current.current = True new_current.save() profile.delete() return name = self.cleaned_data["name"] # manage duplication if self.cleaned_data.get("duplicate_profile", None): profile_name = profile.name or profile.profile_type.label if name == profile_name: name += str(_(" (duplicate)")) profile.duplicate(name=name) return profile.current = True profile.name = name profile.auto_pin = self.cleaned_data["auto_pin"] # profile.display_pin_menu = self.cleaned_data["display_pin_menu"] profile.save() profile.external_sources.clear() q = models_rest.ApiExternalSource.objects.filter(users=self.user.ishtaruser) for src in q.all(): if self.cleaned_data.get(f"external_source_{src.pk}", False): profile.external_sources.add(src) clean_session_cache(session) if "EXTERNAL_SOURCES" in session: session.pop("EXTERNAL_SOURCES") class TownForm(forms.Form): form_label = _("Towns") base_model = "town" associated_models = {"town": models.Town} town = get_town_field(required=False) class TownFormSet(FormSet): def clean(self): """Checks that no towns are duplicated.""" return self.check_duplicate(("town",), _("There are identical towns.")) TownFormset = formset_factory(TownForm, can_delete=True, formset=TownFormSet) TownFormset.form_label = _("Towns") TownFormset.form_admin_name = _("Towns") TownFormset.form_slug = "towns" class MergeFormSet(BaseModelFormSet): from_key = "" to_key = "" def __init__(self, *args, **kwargs): self._cached_list = [] super(MergeFormSet, self).__init__(*args, **kwargs) def merge(self): for form in self.initial_forms: form.merge() def initial_form_count(self): """ Recopied from django source only get_queryset is changed """ if not (self.data or self.files): return len(self.get_restricted_queryset()) return super(MergeFormSet, self).initial_form_count() def _construct_form(self, i, **kwargs): """ Recopied from django source only get_queryset is changed """ if self.is_bound and i < self.initial_form_count(): # Import goes here instead of module-level because importing # django.db has side effects. # from django.db import connections pk_key = "%s-%s" % (self.add_prefix(i), self.model._meta.pk.name) pk = self.data[pk_key] """pk_field = self.model._meta.pk pk = pk_field.get_db_prep_lookup('exact', pk, connection=connections[self.get_queryset().db])""" pk = self.get_restricted_queryset()[i].pk if isinstance(pk, list): pk = pk[0] kwargs["instance"] = self._existing_object(pk) if i < self.initial_form_count() and not kwargs.get("instance"): kwargs["instance"] = self.get_restricted_queryset()[i] if i >= self.initial_form_count() and self.initial_extra: # Set initial values for extra forms try: kwargs["initial"] = self.initial_extra[i - self.initial_form_count()] except IndexError: pass return super(BaseModelFormSet, self)._construct_form(i, **kwargs) def get_restricted_queryset(self): """ Filter (from, to) when (to, from) is already here """ q = self.queryset if self._cached_list: return self._cached_list existing, res = [], [] # only get one version of each couple for item in q.all(): tpl = [getattr(item, self.from_key).pk, getattr(item, self.to_key).pk] if tpl not in existing: res.append(item) existing.append(list(reversed(tpl))) self._cached_list = res return res class MergeForm(forms.ModelForm): id = forms.IntegerField(label="", widget=forms.HiddenInput, required=False) a_is_duplicate_b = forms.BooleanField(required=False) b_is_duplicate_a = forms.BooleanField(required=False) not_duplicate = forms.BooleanField(required=False) def clean(self): checked = [ True for k in ["a_is_duplicate_b", "b_is_duplicate_a", "not_duplicate"] if self.cleaned_data.get(k) ] if len(checked) > 1: raise forms.ValidationError(_("Only one choice can be checked.")) return self.cleaned_data def merge(self, *args, **kwargs): try: to_item = getattr(self.instance, self.TO_KEY) from_item = getattr(self.instance, self.FROM_KEY) except ObjectDoesNotExist: return if self.cleaned_data.get("a_is_duplicate_b"): to_item.merge(from_item) elif self.cleaned_data.get("b_is_duplicate_a"): from_item.merge(to_item) elif self.cleaned_data.get("not_duplicate"): from_item.merge_exclusion.add(to_item) else: return try: self.instance.__class__.objects.get( **{self.TO_KEY: from_item, self.FROM_KEY: to_item} ).delete() except ObjectDoesNotExist: pass self.instance.delete() class MergePersonForm(MergeForm): class Meta: model = models.Person fields = [] FROM_KEY = "from_person" TO_KEY = "to_person" class MergeOrganizationForm(MergeForm): class Meta: model = models.Organization fields = [] FROM_KEY = "from_organization" TO_KEY = "to_organization" def get_image_help(): if not settings.IMAGE_MAX_SIZE: return max_size_help() return ( str( _( "Heavy images are resized to: %(width)dx%(height)d " "(ratio is preserved)." ) % { "width": settings.IMAGE_MAX_SIZE[0], "height": settings.IMAGE_MAX_SIZE[1], } ) + " " + str(max_size_help()) ) ####################### # Document management # ####################### class AddGenericForm(ManageOldType, NewItemForm): model = None # to be defined label = forms.CharField(label=_("Label"), max_length=200) def clean_label(self): value = self.cleaned_data.get("label", None).strip() if self.model.objects.filter(label=value).count(): raise forms.ValidationError(_("This value already exist")) return value def save(self, user): label = self.cleaned_data["label"] base_slug = slugify(label) slug = base_slug idx = 0 while self.model.objects.filter(txt_idx=slug).count(): idx += 1 slug = base_slug + "-" + str(idx) return self.model.objects.create(label=label, txt_idx=slug) class AddDocumentTagForm(AddGenericForm): model = models.DocumentTag form_label = _("Document tag") def max_value_current_year(value): return MaxValueValidator(datetime.date.today().year)(value) class DocumentForm(forms.ModelForm, CustomForm, ManageOldType): form_label = _("Documentation") form_admin_name = _("Document - General") form_slug = "document-general" file_upload = True extra_form_modals = ["author", "person", "organization", "documenttag", "container"] associated_models = { "source_type": models.SourceType, "support_type": models.SupportType, "publisher": models.Organization, "format_type": models.Format, } pk = forms.IntegerField(label="", required=False, widget=forms.HiddenInput) title = forms.CharField( label=_("Title"), required=False, validators=[validators.MaxLengthValidator(200)], ) source_type = widgets.ModelChoiceField( model=models.SourceType, label=_("Type"), choices=[], required=False ) support_type = widgets.ModelChoiceField( model=models.SupportType, label=_("Medium"), choices=[], required=False ) format_type = widgets.ModelChoiceField( model=models.Format, label=_("Format"), choices=[], required=False ) scale = forms.CharField(label=_("Scale"), max_length=30, required=False) container_id = forms.IntegerField( label=_("Current container"), widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-container"), associated_model=Container, new=True ), validators=[models.valid_id(Container)], required=False, ) container_ref_id = forms.IntegerField( label=_("Reference container"), widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-container"), associated_model=Container, new=True ), validators=[models.valid_id(Container)], required=False, ) authors = widgets.Select2MultipleField( label=_("Authors"), required=False, model=models.Author, remote="autocomplete-author", ) publisher = forms.IntegerField( label=_("Publisher"), widget=widgets.JQueryAutoComplete( reverse_lazy( "autocomplete-organization", args=[ models.organization_type_pks_lazy( settings.ISHTAR_SLUGS["document-publisher"] ) ], ), limit={ "organization_type": [ models.organization_type_pks_lazy( settings.ISHTAR_SLUGS["document-publisher"] ) ] }, tips=models.get_publisher_label, associated_model=models.Organization, ), validators=[models.valid_id(models.Organization)], required=False, ) publishing_year = forms.IntegerField( label=_("Year of publication"), validators=[MinValueValidator(1000), max_value_current_year], required=False, ) licenses = widgets.Select2MultipleField( label=_("Licenses"), required=False, model=models.LicenseType ) tags = widgets.Select2MultipleField( label=_("Tags"), required=False, model=models.DocumentTag, remote="autocomplete-documenttag", ) language = widgets.ModelChoiceField( model=models.Language, label=_("Language"), choices=[], required=False ) issn = forms.CharField( label=_("ISSN"), widget=widgets.ISSNWidget, validators=[validators.MaxLengthValidator(9)], required=False, ) isbn = forms.CharField( label=_("ISBN"), widget=widgets.ISBNWidget, validators=[validators.MaxLengthValidator(17)], required=False, ) source = widgets.ModelJQueryAutocompleteField( label=_("Source"), model=models.Document, required=False ) source_free_input = forms.CharField( label=_("Source - free input"), validators=[validators.MaxLengthValidator(500)], required=False, ) source_page_range = forms.CharField( label=_("Source - page range"), validators=[validators.MaxLengthValidator(500)], required=False, help_text=_( 'Unique page: "242", page range: "242-245", multiple ' 'pages: "242;245;249", multiples pages and multiple ' 'pages ranges: "242-245;249;262-265".' ), ) associated_url = forms.URLField( max_length=1000, required=False, label=_("Numerical ressource (web address)") ) image = forms.ImageField( label=_("Image"), help_text=mark_safe(get_image_help()), max_length=255, required=False, widget=widgets.ImageFileInput(), validators=[file_size_validator], ) associated_file = forms.FileField( label=pgettext("Not directory", "File"), max_length=255, required=False, help_text=max_size_help(), validators=[file_size_validator], ) reference = forms.CharField( label=_("Reference"), validators=[validators.MaxLengthValidator(100)], required=False, ) internal_reference = forms.CharField( label=_("Internal reference"), validators=[validators.MaxLengthValidator(100)], required=False, ) receipt_date = forms.DateField( label=_("Receipt date"), required=False, widget=DatePicker ) creation_date = forms.DateField( label=_("Creation date"), required=False, widget=DatePicker ) receipt_date_in_documentation = forms.DateField( label=_("Receipt date in documentation"), required=False, widget=DatePicker ) comment = forms.CharField(label=_("Comment"), widget=forms.Textarea, required=False) description = forms.CharField( label=_("Description"), widget=forms.Textarea, required=False ) additional_information = forms.CharField( label=_("Additional information"), widget=forms.Textarea, required=False ) duplicate = forms.NullBooleanField(label=_("Has a duplicate"), required=False) TYPES = [ FieldType("source_type", models.SourceType), FieldType("support_type", models.SupportType), FieldType("format_type", models.Format), FieldType("language", models.Language), FieldType("licences", models.LicenseType, is_multiple=True), FieldType("tags", models.DocumentTag, is_multiple=True), ] class Meta: model = models.Document fields = [ "title", "source_type", "reference", "internal_reference", "format_type", "support_type", "scale", "image", "associated_file", "associated_url", "tags", "authors", "receipt_date", "receipt_date_in_documentation", "creation_date", "publisher", "publishing_year", "language", "isbn", "issn", "licenses", "source", "source_free_input", "source_page_range", "container_id", "container_ref_id", "comment", "description", "additional_information", "duplicate", ] HEADERS = { "finds": FormHeader(_("Related items")), "title": FormHeader(_("Identification")), "format_type": FormHeader(_("Format")), "image": FormHeader(_("Content")), "authors": FormHeader(_("Authors")), "receipt_date": FormHeader(_("Dates")), "publisher": FormHeader(_("Publishing"), collapse=True), "source": FormHeader(_("Source"), collapse=True), "container_id": FormHeader(_("Warehouse"), collapse=True), "comment": FormHeader(_("Advanced"), collapse=True), } OPTIONS_PERMISSIONS = [ # field name, permission, options ("tags", ("ishtar_common.add_documenttag",), {"new": True}), ("authors", ("ishtar_common.add_author",), {"new": True}), ("publisher", ("ishtar_common.add_organization",), {"new": True}), ("container", ("archaeological_warehouse.add_container",), {"new": True}), ("container_ref", ("archaeological_warehouse.add_container",), {"new": True}), ] def __init__(self, *args, **kwargs): main_items_fields = {} if "main_items_fields" in kwargs: main_items_fields = kwargs.pop("main_items_fields") self.user = None if kwargs.get("user", None): self.user = kwargs.pop("user") self.is_instancied = bool(kwargs.get("instance", False)) super(DocumentForm, self).__init__(*args, **kwargs) fields = OrderedDict() for related_key in models.Document.RELATED_MODELS_ALT: model = models.Document._meta.get_field(related_key).related_model fields[related_key] = widgets.Select2MultipleField( model=model, remote=True, label=model._meta.verbose_name_plural, required=False, style="width: 100%", ) if related_key in main_items_fields: for field_key, label in main_items_fields[related_key]: disabled = False if kwargs.get("initial", None) and kwargs["initial"].get( field_key, False ): disabled = True fields[field_key] = forms.BooleanField( label=label, required=False, disabled=disabled ) for k in self.fields: fields[k] = self.fields[k] self.fields = fields def clean_source_page_range(self): value = self.cleaned_data.get("source_page_range", None).replace(" ", "") if value and not re.match(r"^(\d+[-;]*\d*)+$", value): raise forms.ValidationError(_("Incorrect page range.")) return value def get_headers(self): headers = self.HEADERS.copy() if self.is_instancied: headers["finds"] = FormHeader(headers["finds"].label, collapse=True) return headers def get_conditional_filter_fields(self): """ Helper to get values for filtering select widget on select by another widget :return: conditional_fields, excluded_fields, all_values * conditional_fields: {input_key: { input_pk: [(output_key1, "pk1,pk2,..."), (output_key2, "pk1,pk2,...")], input_pk2: ... }, input_key2: {...} } * excluded_fields: {output_key: "pk1,pk2,...", ...} # list pk for all output_key in condition_fields * all_values: {output_key: [(pk1, label1), (pk2, label2), ...]} """ conditional_fields = {} excluded_fields = {} key = "source_type" for doc_type in models.SourceType.objects.filter( available=True, formats__pk__isnull=False ).all(): if key not in conditional_fields: conditional_fields[key] = {} sub_key = doc_type.pk if sub_key in conditional_fields[key]: continue lst = [ str(f.pk) for f in models.Format.objects.filter( available=True, document_types__pk=doc_type.pk ) ] if "format_type" not in excluded_fields: excluded_fields["format_type"] = [] for k in lst: if k not in excluded_fields["format_type"]: excluded_fields["format_type"].append(k) conditional_fields[key][sub_key] = [("format_type", ",".join(lst))] for doc_type in models.SourceType.objects.filter( available=True, supports__pk__isnull=False ).all(): if key not in conditional_fields: conditional_fields[key] = {} lst = [ str(f.pk) for f in models.SupportType.objects.filter( available=True, document_types__pk=doc_type.pk ) ] if "support_type" not in excluded_fields: excluded_fields["support_type"] = [] for k in lst: if k not in excluded_fields["support_type"]: excluded_fields["support_type"].append(k) sub_key = doc_type.pk if sub_key not in conditional_fields[key]: conditional_fields[key][sub_key] = [] conditional_fields[key][sub_key].append(("support_type", ",".join(lst))) for k in excluded_fields: excluded_fields[k] = ",".join(excluded_fields[k]) all_values = { "format_type": [list(tp) for tp in models.Format.get_types()], "support_type": [list(tp) for tp in models.SupportType.get_types()], } return conditional_fields, excluded_fields, all_values def clean(self): cleaned_data = self.cleaned_data if ( not cleaned_data.get("title", None) and not cleaned_data.get("image", None) and not cleaned_data.get("associated_file", None) and not cleaned_data.get("associated_url", None) ): raise forms.ValidationError( _( "You should at least fill one of this field: title, url, " "image or file. If you have provided an image check that " "it is not corrupted." ) ) for rel in models.Document.RELATED_MODELS: if cleaned_data.get(rel, None): return cleaned_data raise forms.ValidationError( _("A document has to be attached at least to one item") ) def clean_publisher(self): if not self.cleaned_data.get("publisher", None): return try: return models.Organization.objects.get(pk=self.cleaned_data["publisher"]) except models.Organization.DoesNotExist: return def save(self, commit=True): if not self.cleaned_data.get("authors", None): self.cleaned_data["authors"] = [] item = super(DocumentForm, self).save(commit=commit) for related_key in models.Document.RELATED_MODELS: related = getattr(item, related_key) initial = dict([(rel.pk, rel) for rel in related.all()]) new = [int(pk) for pk in sorted(self.cleaned_data.get(related_key, []))] for pk, value in initial.items(): if pk in new: continue related.remove(value) for new_pk in new: related_item = related.model.objects.get(pk=new_pk) if new_pk not in initial.keys(): related.add(related_item) key = "{}_{}_main_image".format(related_key, related_item.pk) if self.cleaned_data.get(key, []) and related_item.main_image != item: related_item.skip_history_when_saving = True related_item.main_image = item related_item.save() item = models.Document.objects.get(pk=item.pk) if self.user: item.history_creator = self.user item.history_modifier = self.user item.skip_history_when_saving = True item.save() # resave to regen the attached items return item class DocumentSelect(HistorySelect): _model = models.Document form_admin_name = _("Document - 001 - Search") form_slug = "document-001-search" search_vector = forms.CharField( label=_("Full text search"), widget=widgets.SearchWidget("ishtar-common", "document"), ) authors = forms.IntegerField( widget=widgets.JQueryAutoComplete( "/" + settings.URL_PATH + "autocomplete-author", associated_model=models.Author, ), validators=[models.valid_id(models.Author)], label=_("Author"), required=False, ) title = forms.CharField(label=_("Title")) source_type = forms.ChoiceField(label=_("Type"), choices=[]) reference = forms.CharField(label=_("Reference")) internal_reference = forms.CharField(label=_("Internal reference")) description = forms.CharField(label=_("Description")) format = forms.ChoiceField(label=_("Format"), choices=[]) support = forms.ChoiceField(label=_("Medium"), choices=[]) scale = forms.CharField(label=_("Scale")) associated_url = forms.CharField(label=_("Web address")) tag = forms.ChoiceField(label=_("Tag"), choices=[]) publisher = forms.IntegerField( label=_("Publisher"), widget=widgets.JQueryAutoComplete( reverse_lazy( "autocomplete-organization", args=[ models.organization_type_pks_lazy( settings.ISHTAR_SLUGS["document-publisher"] ) ], ), limit={ "organization_type": [ models.organization_type_pks_lazy( settings.ISHTAR_SLUGS["document-publisher"] ) ] }, tips=models.get_publisher_label, associated_model=models.Organization, ), validators=[models.valid_id(models.Organization)], ) publishing_year = forms.IntegerField(label=_("Year of publication")) language = forms.ChoiceField(label=_("Language"), choices=[]) isbn = forms.CharField(label=_("ISBN")) issn = forms.CharField(label=_("ISSN")) licenses = forms.ChoiceField(label=_("License"), choices=[]) comment = forms.CharField(label=_("Comment")) additional_information = forms.CharField(label=_("Additional informations")) duplicate = forms.NullBooleanField(label=_("Has a duplicate")) associated_file__isnull = forms.NullBooleanField(label=_("Has a file?")) image__isnull = forms.NullBooleanField(label=_("Has an image?")) source = forms.IntegerField( label=_("Source"), widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-document"), associated_model=models.Document ), validators=[models.valid_id(models.Document)], ) source_free_input = forms.CharField(label=_("Source - free input")) warehouse_container = forms.IntegerField( label=_("Warehouse - Container"), required=False, widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-container"), associated_model=Container ), validators=[models.valid_id(Container)], ) warehouse_container_ref = forms.IntegerField( label=_("Warehouse - Reference container"), required=False, widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-container"), associated_model=Container ), validators=[models.valid_id(Container)], ) operation = forms.IntegerField( label=_("Operation"), required=False, widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-operation"), associated_model=Operation ), validators=[models.valid_id(Operation)], ) operations__operation_type = forms.ChoiceField( label=_("Operation - type"), choices=[] ) operations__year = forms.IntegerField(label=_("Operation - year")) context_record = forms.IntegerField( label=_("Context record"), required=False, widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-contextrecord"), associated_model=ContextRecord ), validators=[models.valid_id(ContextRecord)], ) find_basket = forms.IntegerField( label=_("Basket - Finds"), widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-findbasket"), associated_model=FindBasket ), validators=[models.valid_id(FindBasket)], required=False, ) find = forms.IntegerField( label=_("Find"), required=False, widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-find"), associated_model=Find ), validators=[models.valid_id(Find)], ) find__denomination = forms.CharField(label=_("Find - denomination"), required=False) containers = forms.IntegerField( label=_("Container"), required=False, widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-container"), associated_model=Container ), validators=[models.valid_id(Container)], ) receipt_date__before = forms.DateField( label=_("Receipt date before"), widget=DatePicker ) receipt_date__after = forms.DateField( label=_("Receipt date after"), widget=DatePicker ) creation_date__before = forms.DateField( label=_("Creation date before"), widget=DatePicker ) creation_date__after = forms.DateField( label=_("Creation date after"), widget=DatePicker ) receipt_date_in_documentation__before = forms.DateField( label=_("Receipt date before"), widget=DatePicker ) receipt_date_in_documentation__after = forms.DateField( label=_("Receipt date after"), widget=DatePicker ) TYPES = [ FieldType("source_type", models.SourceType), FieldType("format", models.Format), FieldType("support", models.SupportType), FieldType("tag", models.DocumentTag), FieldType("language", models.Language), FieldType("licenses", models.LicenseType), FieldType("operations__operation_type", models.OperationType), ] PROFILE_FILTER = { "context_record": ["context_record"], "find": ["find"], "warehouse": ["container"], } class DocumentFormSelection(LockForm, CustomFormSearch): SEARCH_AND_SELECT = True form_label = _("Document search") associated_models = {"pk": models.Document} currents = {"pk": models.Document} pk = forms.IntegerField( label="", required=False, widget=widgets.DataTable( reverse_lazy("get-document"), DocumentSelect, models.Document, gallery=True ), validators=[models.valid_id(models.Document)], ) class DocumentFormMultiSelection(LockForm, MultiSearchForm): form_label = _("Document search") associated_models = {"pks": models.Document} pk_key = "pks" pk = forms.CharField( label="", required=False, widget=widgets.DataTable( reverse_lazy("get-document"), DocumentSelect, models.Document, multiple_select=True, gallery=True, ), validators=[models.valid_ids(models.Document)], ) class QADocumentFormMulti(QAForm): form_admin_name = _("Document - Quick action - Modify") form_slug = "document-quickaction-modify" base_models = ["qa_source_type"] associated_models = { "qa_source_type": models.SourceType, "qa_authors": models.Author, } MULTI = True REPLACE_FIELDS = [ "qa_source_type", "qa_creation_date", ] qa_source_type = forms.ChoiceField(label=_("Source type"), required=False) qa_authors = widgets.ModelJQueryAutocompleteField( model=models.Author, label=_("Author"), new=True, required=False ) qa_creation_date = forms.DateField( label=_("Creation date"), widget=DatePicker, required=False ) qa_format_type = forms.ChoiceField(label=_("Format"), choices=[], required=False) qa_support_type = forms.ChoiceField(label=_("Medium"), choices=[], required=False) qa_scale = forms.CharField(label=_("Scale"), max_length=30, required=False) TYPES = [ FieldType("qa_source_type", models.SourceType), FieldType("qa_format_type", models.Format), FieldType("qa_support_type", models.SupportType), ] def _get_qa_authors(self, value): try: value = models.Author.objects.get(pk=value).cached_label except models.Author.DoesNotExist: return "" return value class QADocumentDuplicateForm(IshtarForm): qa_title = forms.CharField(label=_("Title"), max_length=500, required=False) qa_reference = forms.CharField(label=_("Reference"), max_length=500, required=False) qa_source_type = forms.ChoiceField(label=_("Type"), choices=[], required=False) open_edit = forms.BooleanField(label=_("Edit duplicated item"), required=False) open_edit.widget.NO_FORM_CONTROL = True TYPES = [ FieldType("qa_source_type", models.SourceType), ] def __init__(self, *args, **kwargs): self.user = None if "user" in kwargs: self.user = kwargs.pop("user") if hasattr(self.user, "ishtaruser"): self.user = self.user.ishtaruser self.document = kwargs.pop("items")[0] super(QADocumentDuplicateForm, self).__init__(*args, **kwargs) self.fields["qa_title"].initial = self.document.title + str(_(" - duplicate")) if self.document.reference: self.fields["qa_reference"].initial = self.document.reference if self.document.source_type: self.fields["qa_source_type"].initial = self.document.source_type.pk for related_key in models.Document.RELATED_MODELS_ALT: related = getattr(self.document, related_key) if not related.count(): continue model = models.Document._meta.get_field(related_key).related_model initial = [item.pk for item in related.all()] self.fields["qa_" + related_key] = widgets.Select2MultipleField( model=model, remote=True, label=model._meta.verbose_name_plural, required=False, long_widget=True, initial=initial, ) def save(self): data = {"index": None} for k in ["title", "reference"]: data[k] = self.cleaned_data.get("qa_" + k, None) if self.cleaned_data.get("qa_source_type", None): try: data["source_type"] = models.SourceType.objects.get( pk=int(self.cleaned_data["qa_source_type"]), available=True ) except models.SourceType.DoesNotExist: return new = self.document.duplicate_item(self.user, data=data) for related_key in models.Document.RELATED_MODELS_ALT: getattr(new, related_key).clear() values = self.cleaned_data.get("qa_" + related_key, []) model = models.Document._meta.get_field(related_key).related_model for value in values: getattr(new, related_key).add(model.objects.get(pk=value)) new.skip_history_when_saving = True new._cached_label_checked = False new._search_updated = False new._no_move = True new.save() # regen of labels return new class QADocumentPackagingForm(IshtarForm): container = forms.IntegerField( label=_("Container"), widget=widgets.JQueryAutoComplete( reverse_lazy("autocomplete-container"), associated_model=Container, new=True ), validators=[models.valid_id(Container)], ) container_to_change = forms.ChoiceField( label=_("Change "), required=True, choices=( ("current-and-reference", _("current and reference containers")), ("reference", _("the reference container")), ("current", _("the current container")), ), ) def __init__(self, *args, **kwargs): self.confirm = False self.user = None if "user" in kwargs: self.user = kwargs.pop("user") if hasattr(self.user, "ishtaruser"): self.user = self.user.ishtaruser self.items = kwargs.pop("items") super(QADocumentPackagingForm, self).__init__(*args, **kwargs) def save(self, items, user): container = Container.objects.get(pk=self.cleaned_data["container"]) container_to_change = self.cleaned_data.get("container_to_change", "") container_attrs = [] if container_to_change in ("reference", "current-and-reference"): container_attrs.append("container_ref") if container_to_change in ("current", "current-and-reference"): container_attrs.append("container") for document in items: changed = False for container_attr in container_attrs: if getattr(document, container_attr) == container: continue setattr(document, container_attr, container) changed = True if changed: document.history_modifier = user document.save() class QALockForm(forms.Form): action = forms.ChoiceField( label=_("Action"), choices=(("lock", _("Lock")), ("unlock", _("Unlock"))) ) def __init__(self, *args, **kwargs): self.items = kwargs.pop("items") super(QALockForm, self).__init__(*args, **kwargs) def save(self, items, user): locked = self.cleaned_data["action"] == "lock" for item in items: item.locked = locked item.lock_user = user if locked else None item.skip_history_when_saving = True item.save() class SourceDeletionForm(FinalForm): confirm_msg = " " confirm_end_msg = _("Would you like to delete this documentation?") ###################### # Authors management # ###################### class AuthorForm(ManageOldType, NewItemForm): form_label = _("Author") associated_models = {"person": models.Person, "author_type": models.AuthorType} person = forms.IntegerField( widget=widgets.JQueryAutoComplete( "/" + settings.URL_PATH + "autocomplete-person", associated_model=models.Person, new=True, ), validators=[models.valid_id(models.Person)], label=_("Person"), ) author_type = forms.ChoiceField(label=_("Author type"), choices=[]) def __init__(self, *args, **kwargs): super(AuthorForm, self).__init__(*args, **kwargs) self.fields["author_type"].choices = models.AuthorType.get_types( initial=self.init_data.get("author_type") ) self.limit_fields() def clean(self): person_id = self.cleaned_data.get("person", None) author_type_id = self.cleaned_data.get("author_type", None) if not person_id or not author_type_id: return self.cleaned_data if models.Author.objects.filter( author_type_id=author_type_id, person_id=person_id ).count(): raise forms.ValidationError(_("This author already exist.")) def save(self, user): dct = self.cleaned_data dct["author_type"] = models.AuthorType.objects.get(pk=dct["author_type"]) dct["person"] = models.Person.objects.get(pk=dct["person"]) new_item = models.Author(**dct) new_item.save() return new_item class AuthorFormSelection(forms.Form): form_label = _("Author selection") base_model = "author" associated_models = {"author": models.Author} author = forms.IntegerField( required=False, widget=widgets.JQueryAutoComplete( "/" + settings.URL_PATH + "autocomplete-author", associated_model=models.Author, new=True, ), validators=[models.valid_id(models.Author)], label=_("Author"), ) class AuthorFormSet(FormSet): def clean(self): """Checks that no author are duplicated.""" return self.check_duplicate(("author",), _("There are identical authors.")) AuthorFormset = formset_factory( AuthorFormSelection, can_delete=True, formset=AuthorFormSet ) AuthorFormset.form_label = _("Authors") AuthorFormset.form_admin_name = _("Authors") AuthorFormset.form_slug = "authors" class SearchQueryForm(forms.Form): query = forms.CharField( max_length=None, label=_("Query"), initial="*", widget=forms.HiddenInput ) search_query = forms.ChoiceField(label="", required=False, choices=[]) label = forms.CharField(label="", max_length=None, required=False) is_alert = forms.BooleanField(label=_("Is an alert"), required=False) create_or_update = forms.ChoiceField( choices=(("create", _("Create")), ("update", _("Update"))), initial="create" ) def __init__(self, profile, content_type, *args, **kwargs): self.profile = profile self.content_type = content_type super(SearchQueryForm, self).__init__(*args, **kwargs) self.fields["search_query"].choices = [ (c.pk, c.label) for c in models.SearchQuery.objects.filter( content_type=content_type, profile=profile ).all() ] if not self.fields["search_query"].choices: self.fields.pop("search_query") def clean(self): data = self.cleaned_data if data["create_or_update"] == "create" and not data["label"]: raise forms.ValidationError( _("A label is required for a new " "search query.") ) elif data["create_or_update"] == "update": if not data["search_query"]: raise forms.ValidationError(_("Select the search query to " "update")) q = models.SearchQuery.objects.filter( profile=self.profile, content_type=self.content_type, pk=data["search_query"], ) if not q.count(): raise forms.ValidationError(_("Query does not exist.")) return data def save(self): data = self.cleaned_data if data["create_or_update"] == "create": sq = models.SearchQuery.objects.create( label=data["label"], query=data["query"], profile=self.profile, content_type=self.content_type, is_alert=data["is_alert"], ) else: try: sq = models.SearchQuery.objects.get( profile=self.profile, content_type=self.content_type, pk=data["search_query"], ) except models.SearchQuery.DoesNotExist: raise forms.ValidationError(_("Query does not exist.")) sq.query = data["query"] sq.save() return sq class QRSearchForm(forms.Form): query = forms.CharField(max_length=None, label=_("Query"), initial="*") current_url = forms.CharField(max_length=None, label="", widget=forms.HiddenInput()) def save(self): data = self.cleaned_data url = data["current_url"] parsed_url = urlparse(url) base_url = "{}://{}".format(parsed_url.scheme, parsed_url.netloc) url = base_url + parsed_url.path url += "?stored_search=" + quote(data["query"]) tiny_url = models.TinyUrl.objects.create(link=url) short_url = base_url + reverse("tiny-redirect", args=[tiny_url.get_short_id()]) qr = pyqrcode.create(short_url, version=settings.ISHTAR_QRCODE_VERSION) tmpdir = tempfile.mkdtemp("-qrcode") date = datetime.datetime.today().isoformat().replace(":", "-").replace(".", "") base_filename = "{}-qrcode.png".format(date) filename = os.path.join(tmpdir, base_filename) qr.png(filename, scale=settings.ISHTAR_QRCODE_SCALE) dest_dir = os.path.join(settings.MEDIA_ROOT, "tmp") if not os.path.exists(dest_dir): os.makedirs(dest_dir) shutil.move(filename, dest_dir) return os.path.join(settings.MEDIA_URL, "tmp", base_filename) class GISForm(forms.ModelForm, CustomForm, ManageOldType): form_label = _("Geo item") form_admin_name = _("Geo item - General") form_slug = "geoitem-general" extra_form_modals = [] associated_models = { "data_type": models.GeoDataType, "origin": models.GeoOriginType, "provider": models.GeoProviderType, "buffer_type": models.GeoBufferType, } pk = forms.IntegerField(label="", required=False, widget=forms.HiddenInput) name = forms.CharField( label=_("Name"), validators=[validators.MaxLengthValidator(500)], ) import_key = forms.CharField( label=_("Import key"), required=False, disabled=True, help_text=_("An update via import corresponding to the source element and " "this key will overwrite the data."), ) source_content_type_id = forms.IntegerField( label="", required=True, widget=forms.HiddenInput, disabled=True) source_id = forms.IntegerField( label="", required=True, widget=forms.HiddenInput, disabled=True) data_type = widgets.ModelChoiceField( model=models.GeoDataType, label=_("Data type"), choices=[], required=False ) origin = widgets.ModelChoiceField( model=models.GeoOriginType, label=_("Origin"), choices=[], required=False ) provider = widgets.ModelChoiceField( model=models.GeoProviderType, label=_("Provider"), choices=[], required=False ) buffer = forms.FloatField( label=_("Buffer"), required=False, widget=widgets.MeterCentimeterWidget, ) buffer_type = widgets.ModelChoiceField( model=models.GeoBufferType, label=_("Buffer type"), choices=[], required=False ) comment = forms.CharField(label=_("Comment"), widget=forms.Textarea, required=False) estimated_error_x = forms.FloatField( label=_("Estimated error for X"), required=False, widget=widgets.MeterCentimeterWidget, ) estimated_error_y = forms.FloatField( label=_("Estimated error for Y"), required=False, widget=widgets.MeterCentimeterWidget, ) estimated_error_z = forms.FloatField( label=_("Estimated error for Z"), required=False, widget=widgets.MeterCentimeterWidget, ) TYPES = [ FieldType("origin", models.GeoOriginType), FieldType("data_type", models.GeoDataType), FieldType("provider", models.GeoProviderType), FieldType("buffer_type", models.GeoBufferType), ] class Meta: model = models.GeoVectorData exclude = ["need_update", "imports", "cached_x", "cached_y", "cached_z", "source_content_type"] HEADERS = { "related_items_ishtar_common_town": FormHeader( _("Related items"), collapse=True), "name": FormHeader(_("Meta-data")), "geo_field": FormHeader(_("Geography")), "buffer": FormHeader(_("Buffer"), collapse=True), } GEO_FIELDS = ( ("point_2d",), ("point_3d",), ("multi_points",), ("multi_line",), ("multi_polygon",), ("x", "z"), ) GEOM_TYPES = { "point_2d": "Point", "point_3d": "Point", "multi_points": "MultiPoint", "multi_line": "MultiLineString", "multi_polygon": "MultiPolygon", } def __init__(self, *args, **kwargs): back_url = "" if "back_url" in kwargs: back_url = kwargs.pop("back_url") find_id = "" if "find_id" in kwargs: find_id = kwargs.pop("find_id") main_items_fields = {} if "main_items_fields" in kwargs: main_items_fields = kwargs.pop("main_items_fields") self.too_many = {} if "too_many" in kwargs: self.too_many = kwargs.pop("too_many") self.user = None self.geom_type = kwargs.pop("geom_type") \ if kwargs.get("geom_type", None) else None if kwargs.get("user", None): self.user = kwargs.pop("user") instance = kwargs.get("instance", False) self.is_instancied = bool(instance) self.source_content_type = kwargs.pop("source_content_type", None) self.source_id = kwargs.pop("source_id", None) super(GISForm, self).__init__(*args, **kwargs) if back_url: self.fields["back_url"] = forms.CharField( label="", required=False, widget=forms.HiddenInput, initial=back_url) if find_id: self.fields["find_id"] = forms.CharField( label="", required=False, widget=forms.HiddenInput, initial=find_id) if not self.fields["import_key"].initial: self.fields.pop("import_key") if not self.source_content_type: self.fields.pop("source_content_type_id") self.fields.pop("source_id") else: self.fields["source_content_type_id"].initial = self.source_content_type self.fields["source_id"].initial = self.source_id self.geo_keys = self.get_geo_keys(instance) fields = OrderedDict() for related_key in models.GeoVectorData.RELATED_MODELS: model = models.GeoVectorData._meta.get_field(related_key).related_model fields[related_key] = widgets.Select2MultipleField( model=model, remote=True, label=model._meta.verbose_name_plural, required=False, style="width: 100%", ) for related_key in models.GeoVectorData.RELATED_MODELS: if related_key in main_items_fields: for field_key, label in main_items_fields[related_key]: disabled = False if kwargs.get("initial", None) and kwargs["initial"].get( field_key, False ): disabled = True fields[field_key] = forms.BooleanField( label=label, required=False, disabled=disabled ) for k in self.geo_keys: fields[k] = self.fields[k] for k in self.fields: if k not in self.geo_keys: fields[k] = self.fields[k] self.fields = fields def get_geo_keys(self, instance): if instance: return self._get_instance_geo_keys(instance) else: return self._get_base_geo_keys() def _get_base_geo_keys(self): geo_keys = [] extra_geo = ["buffer", "buffer_type"] if self.geom_type == "coordinates": geo_keys = [ "x", "estimated_error_x", "y", "estimated_error_y", "z", "estimated_error_z", "spatial_reference_system", ] for keys in self.GEO_FIELDS: if any(key == self.geom_type for key in keys): map_srid = 4326 widget = gis_forms.OSMWidget self.fields[keys[0]].widget = widget( attrs={"map_srid": map_srid, "cols": True, "geom_type": self.GEOM_TYPES[self.geom_type]}) self.fields.pop("spatial_reference_system") geo_keys = keys[:] self.fields.pop("x") self.fields.pop("y") if self.geom_type != "point_3d": self.fields.pop("z") self.fields.pop("estimated_error_x") self.fields.pop("estimated_error_y") self.fields.pop("estimated_error_z") break for geo_fields in self.GEO_FIELDS[:-1]: # -1 -> do not get x - already managed if geo_fields != geo_keys: for geo_field in geo_fields: self.fields.pop(geo_field) if self.geom_type == "point_3d": geo_keys = list(geo_keys) + ["z"] return list(geo_keys) + extra_geo def _get_instance_geo_keys(self, instance): # geo keys for an instanced item geo_keys = [] for keys in self.GEO_FIELDS: if any(key for key in keys if getattr(instance, key) is not None): if keys[0] != "x": geom = getattr(instance, keys[0]) map_srid = geom.srid or 4326 widget = gis_forms.OSMWidget if map_srid == 4326: widget = widgets.ReversedOSMWidget self.fields[keys[0]].widget = widget( attrs={"map_srid": map_srid, "geom_type": geom.geom_type, "cols": True}) self.fields.pop("spatial_reference_system") geo_keys = keys[:] else: geo_keys = [ "x", "estimated_error_x", "y", "estimated_error_y", "z", "estimated_error_z", "spatial_reference_system", ] for geo_fields in self.GEO_FIELDS: if geo_fields != keys: for geo_field in geo_fields: if geo_field == "x": self.fields.pop("estimated_error_x") self.fields.pop("y") self.fields.pop("estimated_error_y") if geo_field == "z": if geo_keys[0] == "point_3d": geo_keys = list(geo_keys) + ["z"] continue self.fields.pop("estimated_error_z") self.fields.pop(geo_field) break return geo_keys def get_headers(self): headers = self.HEADERS.copy() if self.geo_keys: headers[self.geo_keys[0]] = headers.pop("geo_field") return headers def clean(self): cleaned_data = self.cleaned_data if cleaned_data.get("buffer", None) \ and not cleaned_data.get("buffer_type", None): raise forms.ValidationError( _("If you set a buffer set a buffer type.") ) if cleaned_data.get("buffer_type", None) \ and not cleaned_data.get("buffer", None): raise forms.ValidationError( _("If you set a buffer type set a buffer.") ) if cleaned_data.get("point_3d"): if "z" not in cleaned_data: raise forms.ValidationError( _("A value is expected for Z.") ) value = cleaned_data["point_3d"].ewkt if "POINT Z" not in value: value = value.replace("POINT", "POINT Z") cleaned_data["point_3d"] = f'{value.split(")")[0]} {cleaned_data["z"]})' if "x" not in self.geo_keys: # reverse... geo_value = cleaned_data[self.geo_keys[0]] if geo_value: if not isinstance(geo_value, str): geo_value = geo_value.ewkt if geo_value.startswith("SRID=4326;"): cleaned_data[self.geo_keys[0]] = reverse_coordinates(geo_value) for rel in models.GeoVectorData.RELATED_MODELS: if cleaned_data.get(rel, None): return cleaned_data raise forms.ValidationError( _("A geo item has to be attached at least to one item") ) def save(self, commit=True): if not getattr(self.instance, "source_content_type_id", None): self.instance.source_content_type_id = self.source_content_type if not getattr(self.instance, "source_id", None): self.instance.source_id = self.source_id item = super().save(commit=commit) for related_key in models.GeoVectorData.RELATED_MODELS: related = getattr(item, related_key) initial = dict([(rel.pk, rel) for rel in related.all()]) new = [int(pk) for pk in sorted(self.cleaned_data.get(related_key, []))] full_new = new[:] if related_key in self.too_many: full_new += self.too_many[related_key] for pk, value in initial.items(): if pk in full_new: continue related.remove(value) for new_pk in new: related_item = related.model.objects.get(pk=new_pk) if new_pk not in initial.keys(): related.add(related_item) key = "{}_{}_main_item".format(related_key, related_item.pk) if self.cleaned_data.get(key, []) and related_item.main_geodata != item: related_item.skip_history_when_saving = True related_item.main_geodata = item related_item.save() item = models.GeoVectorData.objects.get(pk=item.pk) if self.user: item.history_creator = self.user item.history_modifier = self.user item.skip_history_when_saving = True item.save() # resave to regen the attached items return item class PreGISForm(IshtarForm): geom_type = forms.ChoiceField( label=_("Geometry type"), choices=( ("coordinates", _("Coordinates")), ("point_2d", _("Point")), ("point_3d", _("Point 3D")), ("multi_points", _("Multi-points")), ("multi_line", _("Multi-lines")), ("multi_polygon", _("Multi-polygons")), ) ) HEADERS = { "geom_type": FormHeader(_("Type")), } def __init__(self, *args, **kwargs): back_url = "" if "back_url" in kwargs: back_url = kwargs.pop("back_url") super().__init__(*args, **kwargs) if back_url: self.fields["back_url"] = forms.CharField( label="", required=False, widget=forms.HiddenInput, initial=back_url)