#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2010-2025 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import csv import json from io import TextIOWrapper, BytesIO import os import requests import shutil import tempfile import urllib import zipfile from rest_framework.authtoken.admin import TokenAdmin from rest_framework.authtoken.models import Token from axes import models as axes_models from axes.admin import AccessAttemptAdmin, AccessLogAdmin from django.conf import settings from django.contrib import admin, messages from django.contrib.admin.views.main import ChangeList from django.contrib.auth.admin import GroupAdmin, UserAdmin from django.contrib.auth.models import Group, User from django.contrib.contenttypes.models import ContentType from django.contrib.sites.admin import SiteAdmin from django.contrib.sites.models import Site from django.contrib.gis.forms import PointField, OSMWidget from django.contrib.gis.geos import GEOSGeometry, MultiPolygon from django.contrib.gis.gdal.error import GDALException from django.contrib.gis.geos.error import GEOSException from django.core.cache import cache from django.core.exceptions import FieldError, FieldDoesNotExist from django.core.serializers import serialize from django.db import connection from django.db.models import Q from django.db.models.fields import ( BooleanField, IntegerField, FloatField, CharField, ) from django.db.models.fields.related import ForeignKey from django.forms import BaseInlineFormSet from django.http import HttpResponseRedirect, HttpResponse, Http404 from django.shortcuts import render from django.urls import re_path, reverse from django.utils import timezone from django.utils.decorators import method_decorator from django.utils.text import slugify from django.utils.safestring import mark_safe from django.utils.translation import gettext_lazy as _ from django.views.decorators.csrf import csrf_protect from django import forms from ishtar_common import models, models_common, models_rest from ishtar_common.apps import admin_site from ishtar_common.model_merging import merge_model_objects from ishtar_common.utils import API_MAIN_CONTENT_TYPES, get_cache, create_slug,\ get_person_gdpr_log, InlineClass from ishtar_common import forms as common_forms, forms_common as other_common_forms from ishtar_common.serializers import restore_serialized, IMPORT_MODEL_LIST from ishtar_common.serializers_utils import generic_get_results, serialization_info from archaeological_files import forms as file_forms from archaeological_operations import forms as operation_forms from archaeological_context_records import forms as context_record_forms from archaeological_finds import ( forms as find_forms, forms_treatments as treatment_forms, ) from archaeological_warehouse import forms as warehouse_forms from ishtar_common.tasks import launch_export, launch_import csrf_protect_m = method_decorator(csrf_protect) ISHTAR_FORMS = [ common_forms, other_common_forms, file_forms, operation_forms, context_record_forms, find_forms, treatment_forms, warehouse_forms, ] class ImportGenericForm(forms.Form): csv_file = forms.FileField( label=_("CSV file"), help_text=_("Only unicode encoding is managed - convert your file first"), ) def custom_titled_filter(title, klass): # klass: admin.BooleanFieldListFilter, admin.RelatedFieldListFilter or ... class Wrapper(klass): def __new__(cls, *args, **kwargs): instance = klass(*args, **kwargs) instance.title = title return instance return Wrapper def change_value(attribute, value, description): """ Action to change a specific value in a list """ def _change_value(modeladmin, request, queryset): for obj in queryset.order_by("pk"): setattr(obj, attribute, value) obj.save() c_url = ( reverse( "admin:%s_%s_changelist" % (modeladmin.model._meta.app_label, modeladmin.model._meta.model_name) ) + "?" + urllib.parse.urlencode(request.GET) ) return HttpResponseRedirect(c_url) _change_value.short_description = description _change_value.__name__ = str(slugify(description)) return _change_value def export_as_csv_action( description=_("Export selected as CSV file"), fields=None, exclude=None, header=True ): """ This function returns an export csv action 'fields' and 'exclude' work like in django ModelForm 'header' is whether or not to output the column names as the first row """ def export_as_csv(modeladmin, request, queryset): """ Generic csv export admin action. based on http://djangosnippets.org/snippets/1697/ """ opts = modeladmin.model._meta if hasattr(modeladmin, "CSV_FIELDS"): field_names = field_order = modeladmin.CSV_FIELDS else: field_names = set([field.name for field in opts.fields]) if fields: fieldset = set(fields) field_names = field_names & fieldset elif exclude: excludeset = set(exclude) field_names = field_names - excludeset if hasattr(modeladmin, "CSV_FIELD_ORDER"): field_order = modeladmin.CSV_FIELD_ORDER field_names = sorted( field_names, key=lambda x: field_order.index(x) if x in field_order else 1000 ) response = HttpResponse(content_type="text/csv") csv_name = str(opts).replace(".", "_") if hasattr(modeladmin, "get_csv_name"): csv_name = modeladmin.get_csv_name() response["Content-Disposition"] = "attachment; filename=%s.csv" % csv_name writer = csv.writer(response) if header: if hasattr(modeladmin, "CSV_HEADER"): writer.writerow(modeladmin.CSV_HEADER) else: writer.writerow(list(field_names)) boolean_fields = [] if hasattr(modeladmin, "CSV_FIELDS_BOOLEAN"): boolean_fields = modeladmin.CSV_FIELDS_BOOLEAN for obj in queryset.order_by("pk"): row = [] for field in field_names: value = getattr(obj, field) if field in boolean_fields: value = str(_("True")) if value else str(_("False")) elif hasattr(value, "txt_idx"): value = getattr(value, "txt_idx") elif hasattr(value, "slug"): value = getattr(value, "txt_idx") elif value is None: value = "" elif isinstance(value, ContentType): value = f"{value.app_label}|{value.model}" elif hasattr(value, "natural_key"): v = value.natural_key() if not isinstance(v, (list, tuple)): value = str(v) else: value = "||".join([str(k) for k in v]) else: value = str(value) row.append(value) writer.writerow(row) return response export_as_csv.short_description = description return export_as_csv def export_as_geojson_action( geometry_field, description=_("Export selected as GeoJSON/JSON file"), fields=None, exclude=None, ): """ This function returns an export GeoJSON action 'fields' work like in django ModelForm """ def export_as_geojson(modeladmin, request, queryset): """ Generic zipped geojson export admin action. """ opts = modeladmin.model._meta if hasattr(modeladmin.model, "geodata_export"): geojson = [] for item in queryset.order_by("pk").all(): geojson += item.geodata_export geojson = json.dumps(geojson, indent=4) else: field_names = set( [field.name for field in opts.fields if field.name != geometry_field] ) if fields: fieldset = set(fields) field_names = field_names & fieldset if exclude: excludeset = set(exclude) field_names = field_names - excludeset geojson = serialize( "geojson", queryset.order_by("pk"), geometry_field=geometry_field, fields=field_names, ).encode("utf-8") basename = str(opts).replace(".", "_") in_memory = BytesIO() czip = zipfile.ZipFile(in_memory, "a") czip.writestr(basename + ".geojson", geojson) # fix for Linux zip files read in Windows for cfile in czip.filelist: cfile.create_system = 0 czip.close() response = HttpResponse(content_type="application/zip") response["Content-Disposition"] = "attachment; filename={}.zip".format(basename) in_memory.seek(0) response.write(in_memory.read()) return response export_as_geojson.short_description = description return export_as_geojson def serialize_action(dir_name, model_list): def _serialize_action(modeladmin, request, queryset): if model_list: modellist = model_list[:] else: modellist = [modeladmin.model] opts = modeladmin.model._meta if getattr(modeladmin, "serialize_filter_queryset", None): queryset = queryset.filter(**modeladmin.serialize_filter_queryset) result = generic_get_results( modellist, dir_name, result_queryset={opts.object_name: queryset} ) basename = str(opts).replace(".", "_") in_memory = BytesIO() current_zip = zipfile.ZipFile(in_memory, "a") for key in result.keys(): __, model_name = key current_zip.writestr(dir_name + os.sep + model_name + ".json", result[key]) # info current_zip.writestr("info.json", json.dumps(serialization_info(), indent=2)) # fix for Linux zip files read in Windows for cfile in current_zip.filelist: cfile.create_system = 0 current_zip.close() response = HttpResponse(content_type="application/zip") response["Content-Disposition"] = "attachment; filename={}.zip".format(basename) in_memory.seek(0) response.write(in_memory.read()) return response return _serialize_action SERIALIZE_DESC = _("Export selected as Ishtar (zipped JSON)") serialize_type_action = serialize_action("types", None) serialize_type_action.short_description = SERIALIZE_DESC class MergeForm(forms.Form): merge_in = forms.ChoiceField(label=_("Merge inside"), choices=[]) def __init__(self, choices, post=None, files=None): if post: super(MergeForm, self).__init__(post, files) else: super(MergeForm, self).__init__() self.fields["merge_in"].choices = choices class MergeActionAdmin: def get_actions(self, request): action_dct = super(MergeActionAdmin, self).get_actions(request) action_dct["merge_selected"] = ( self.admin_merge, "merge_selected", _("Merge selected items"), ) return action_dct def admin_merge(self, modeladmin, request, queryset): return_url = reverse( "admin:%s_%s_changelist" % (self.model._meta.app_label, self.model._meta.model_name) ) if not request.POST: return HttpResponseRedirect(return_url) selected = request.POST.getlist("_selected_action", []) if len(selected) < 2: messages.add_message( request, messages.WARNING, str(_("At least two items have to be selected.")), ) return HttpResponseRedirect(return_url) items = {} choices = [] for pk in selected: obj = self.model.objects.get(pk=pk) choices.append((obj.pk, str(obj))) items[str(obj.pk)] = obj form = None if "apply" in request.POST: form = MergeForm(choices, request.POST, request.FILES) if form.is_valid(): merge_in = items[form.cleaned_data["merge_in"]] merged = [] for key, value in items.items(): if key == str(merge_in.pk): continue merge_model_objects(merge_in, value) merged.append(str(value)) messages.add_message( request, messages.INFO, str(_("{} merged into {}.")).format( " ; ".join(merged), str(merge_in) ), ) return HttpResponseRedirect(return_url) if not form: form = MergeForm(choices) return render( request, "admin/merge.html", { "merge_form": form, "current_action": "merge_selected", "selected_items": selected, }, ) TokenAdmin.raw_id_fields = ("user",) admin_site.register(Token, TokenAdmin) class ChangeParentForm(forms.Form): change_parent = forms.ChoiceField(label=_("Change parent"), choices=[]) def __init__(self, choices, post=None, files=None): super(ChangeParentForm, self).__init__(post, files) self.fields["change_parent"].choices = choices class ChangeParentAdmin: def get_actions(self, request): action_dct = super(ChangeParentAdmin, self).get_actions(request) if hasattr(self.model, "parent"): action_dct["change_parent_selected"] = ( self.change_parent_admin, "change_parent_selected", _("Change parent of selected item(s)"), ) return action_dct def change_parent_admin(self, modeladmin, request, queryset): return_url = reverse( "admin:%s_%s_changelist" % (self.model._meta.app_label, self.model._meta.model_name) ) if not request.POST: return HttpResponseRedirect(return_url) selected_children_pk = request.POST.getlist("_selected_action", []) choices = [] parents = self.model.objects.all() for obj in parents: choices.append((obj.pk, str(obj))) form = None if "apply" in request.POST: form = ChangeParentForm(choices, request.POST, request.FILES) if form.is_valid(): change_parent = form.cleaned_data["change_parent"] change = [] for child_id in selected_children_pk: child = self.model.objects.get(pk=child_id) child.parent_id = change_parent child.save() change.append(str(child)) messages.add_message( request, messages.INFO, str(_("{} parent(s) were changed to {}.")).format( " ; ".join(change), str(self.model.objects.get(pk=change_parent)) ), ) return HttpResponseRedirect(return_url) if not form: form = ChangeParentForm(choices) return render( request, "admin/change_parent.html", { "change_parent_form": form, "current_action": "change_parent_selected", "selected_items": selected_children_pk, }, ) class ImportedObjectAdmin(admin.ModelAdmin): def get_readonly_fields(self, request, obj=None): fields = tuple(super().get_readonly_fields(request, obj) or []) if obj: # editing an existing object return fields + ("imports", "imports_updated") return fields def get_exclude(self, request, obj=None): fields = tuple(super().get_exclude(request, obj) or []) if not obj: return fields + ("imports", "imports_updated") return fields END_FIELDS = ["data", "last_modified", "created", "need_update", "locked", "lock_user", "main_geodata", "geodata", "qrcode"] def get_fields(self, request, obj=None, **kwargs): """ Put END_FIELDS at the end but before readonly fields """ if self.fields: return self.fields fields = super().get_fields(request, **kwargs) readonly_fields = self.get_readonly_fields(request, obj) if readonly_fields: idx_ro = fields.index(readonly_fields[0]) else: idx_ro = len(fields) - 1 for end_field in self.END_FIELDS: try: idx_field = fields.index(end_field) except ValueError: continue fields.insert(idx_ro, end_field) fields.pop(idx_field) for k in fields[:]: if k.startswith("cached_"): idx_field = fields.index(k) fields.insert(idx_ro, k) fields.pop(idx_field) return fields class HistorizedObjectAdmin(ImportedObjectAdmin): readonly_fields = [ "history_creator", "history_modifier", "search_vector", "history_m2m", ] autocomplete_fields = ["lock_user"] def save_model(self, request, obj, form, change): obj.history_modifier = request.user obj.save() class MyGroupAdmin(GroupAdmin): class Media: css = {"all": ("media/admin.css",)} class IshtarUserAdmin(UserAdmin): fieldsets = ( (None, {'fields': ('username', 'password')}), (_('Personal info'), {'fields': ('first_name', 'last_name', 'email')}), (_('Permissions'), { 'fields': ('is_active', 'is_staff', 'is_superuser') }), (_('Important dates'), {'fields': ('last_login', 'date_joined')}), ) pass admin_site.register(User, IshtarUserAdmin) admin_site.register(Group, MyGroupAdmin) admin_site.register(Site, SiteAdmin) if settings.AXES_ENABLED and settings.AXES_ENABLE_ADMIN: admin_site.register(axes_models.AccessAttempt, AccessAttemptAdmin) admin_site.register(axes_models.AccessLog, AccessLogAdmin) class AdminIshtarSiteProfileForm(forms.ModelForm): class Meta: model = models.IshtarSiteProfile exclude = ["display_srs"] default_center = PointField(label=_("Maps - default center"), widget=OSMWidget) class IshtarSiteProfileAdmin(admin.ModelAdmin): list_display = ( "label", "slug", "active", "files", "context_record", "find", "warehouse", "mapping", "preservation", ) model = models.IshtarSiteProfile form = AdminIshtarSiteProfileForm autocomplete_fields = ["no_context_button", "default_location_for_treatment"] fieldsets = ( (None, { "fields": ( "label", "slug", "active", "homepage", "homepage_title", "homepage_statistics_available", "homepage_statistics_available_offline", "homepage_random_image_available", ) }), (_("Detail"), { "classes": ("collapse",), "fields": ( "description", "warning_name", "warning_message", "footer", ), }), (_("Modules"), { "classes": ("collapse",), "fields": ( "files", "archaeological_site", "context_record", "find", "warehouse", "preservation", "mapping", "preventive_operator", "underwater", "museum", ), }), (_("Advanced configuration"), { "classes": ("collapse",), "fields": ( "config", "default_language", "archaeological_site_label", "parent_relations_engine", "delete_image_zip_on_archive", "clean_redundant_document_association", "person_raw_name", "currency", "account_naming_style", "point_precision", "default_center", "default_zoom", "srs", ), }), (_("Advanced features"), { "classes": ("collapse",), "fields": ( "experimental_feature", "gis_connector", "calculate_weight_on_full", "locate_warehouses", "use_town_for_geo", "relation_graph", "parcel_mandatory", "no_context_button", "default_location_for_treatment", ), }), (_("Index"), { "classes": ("collapse",), "description": "
" + \ str( _("Change this with extra care. With incorrect configuration, the " "application might be unusable. Some change need to adapt importers. " "Regeneration of ID might be required.") ) + "
", "fields": ( "operation_prefix", "default_operation_prefix", "operation_region_code", "operation_complete_identifier", "operation_custom_index", "operation_cached_label", "site_complete_identifier", "site_custom_index", "site_cached_label", "file_external_id", "file_complete_identifier", "file_custom_index", "file_cached_label", "parcel_external_id", "parcel_cached_label", "context_record_external_id", "contextrecord_complete_identifier", "contextrecord_custom_index", "contextrecord_cached_label", "base_find_external_id", "basefind_complete_identifier", "basefind_custom_index", "basefind_cached_label", "find_external_id", "find_complete_identifier", "find_use_index", "find_index", "find_custom_index", "find_cached_label", "museum_complete_identifier", "container_external_id", "container_complete_identifier", "container_custom_index", "container_cached_label", "warehouse_external_id", "warehouse_complete_identifier", "warehouse_custom_index", "warehouse_cached_label", "document_external_id", "document_complete_identifier", "document_custom_index", "document_cached_label", ), }), ) class Media: js = ( "ol/ol.js", "ol-layerswitcher/ol-layerswitcher.js", "admin/js/jquery.init.js", "js/ishtar-map.js", ) def save_model(self, request, obj, form, change): # reverse... geo form.cleaned_data["default_center"] = form.cleaned_data["default_center"].transform(4326) point = form.cleaned_data["default_center"] if point: form.cleaned_data["default_center"].x = point.y form.cleaned_data["default_center"].y = point.x super().save_model(request, obj, form, change) admin_site.register(models.IshtarSiteProfile, IshtarSiteProfileAdmin) class DepartmentAdmin(admin.ModelAdmin): list_display = ( "number", "label", ) model = models_common.Department admin_site.register(models_common.Department, DepartmentAdmin) class OrganizationAdmin(HistorizedObjectAdmin): list_display = ("pk", "name", "organization_type") list_filter = ("organization_type",) search_fields = ("name",) exclude = ( "merge_key", "merge_exclusion", "merge_candidate", ) model = models.Organization readonly_fields = HistorizedObjectAdmin.readonly_fields + ["precise_town_id"] admin_site.register(models.Organization, OrganizationAdmin) class ProfileInline(admin.TabularInline): model = models.UserProfile verbose_name = _("Profile") verbose_name_plural = _("Profiles") extra = 1 class PersonAdmin(HistorizedObjectAdmin): list_display = ("pk", "name", "surname", "raw_name", "email") list_filter = ("person_types",) search_fields = ("name", "surname", "email", "raw_name", "attached_to__name") exclude = ( "merge_key", "merge_exclusion", "merge_candidate", ) autocomplete_fields = ["attached_to", "lock_user"] readonly_fields = HistorizedObjectAdmin.readonly_fields + ["cached_label", "precise_town_id"] model = models.Person inlines = [ProfileInline] def get_search_results(self, request, queryset, search_term): page = int(request.GET.get("p", 0)) slice = page * self.list_per_page, (page + 1) * self.list_per_page get_person_gdpr_log("admin_person_consultation", request, None, queryset, slice) return super().get_search_results(request, queryset, search_term) def response_change(self, request, obj): get_person_gdpr_log("admin_person_modify", request, None, self.model.objects.filter(pk=obj.pk)) return super().response_change(request, obj) def changeform_view(self, request, object_id=None, form_url='', extra_context=None): if request.method == "GET": get_person_gdpr_log("admin_person_view", request, None, self.model.objects.filter(pk=object_id)) return super().changeform_view(request, object_id, form_url, extra_context) def delete_model(self, request, obj): get_person_gdpr_log("admin_person_delete", request, None, self.model.objects.filter(pk=obj.pk)) super().delete_model(request, obj) admin_site.register(models.Person, PersonAdmin) class PermissionQueryAdminForm(forms.ModelForm): class Meta: model = models_common.PermissionQuery exclude = [] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) choices = [] for ct in ContentType.objects.all(): klass = ct.model_class() if not klass: continue if (getattr(klass, "SHOW_URL", None) and hasattr(klass, "history_creator_id")) or \ ct.model in ("geovectordata", "document"): choices.append((ct.pk, ct.name)) self.fields["model"].choices = [("", "-" * 9)] + list( sorted(choices, key=lambda x: x[1]) ) def clean(self): super().clean() model = self.cleaned_data["model"] include_upstream_items = self.cleaned_data.get("include_upstream_items", None) if include_upstream_items and model.model in ("organization", "person", "file", "warehouse"): raise forms.ValidationError( _("This model do have attached upstream items. Uncheck " "\"Include upstream items\" field.") ) limit_to_attached_areas = self.cleaned_data.get("limit_to_attached_areas", "") if not limit_to_attached_areas: return self.cleaned_data if model.model not in ("operation", "contextrecord", "find", "archaeologicalsite", "file"): raise forms.ValidationError( _("This model do not accept area limitation. Uncheck " "\"Limit request to attached areas\" field.") ) return self.cleaned_data @admin.register(models_common.PermissionQuery, site=admin_site) class PermissionQueryAdmin(admin.ModelAdmin): prepopulated_fields = {"slug": ("name",)} search_fields = ("model__model__unaccent", "name") list_display = ( "model", "name", "active", "include_associated_items", "include_upstream_items", "limit_to_attached_areas" ) form = PermissionQueryAdminForm def get_content_types_with_sheet(with_empty=False): choices = [] for ct in ContentType.objects.all(): klass = ct.model_class() if not klass: continue if getattr(klass, "SHOW_URL", None): choices.append((ct.pk, ct.name)) lst = list( sorted(choices, key=lambda x: x[1]) ) if with_empty: lst = [("", "-" * 9)] + lst return lst class ContentTypeChoice: def set_content_types_choices(self, field_name): self.fields[field_name].choices = get_content_types_with_sheet() class ContentTypeListFilter(admin.SimpleListFilter): # Only display content types with sheet attached title = _("content type") parameter_name = 'content_type' def lookups(self, request, model_admin): return get_content_types_with_sheet(with_empty=False) def queryset(self, request, queryset): value = self.value() if value: query = {f"{self.parameter_name}_id": self.value()} return queryset.filter(**query) class BaseSheetFilterForm(forms.ModelForm, ContentTypeChoice): content_type_field = "" key = forms.CharField( label=_("Key"), initial="-", help_text=_("Save first to choose a key"), max_length=200, ) def _get_content_type_model(self, instance): raise NotImplementedError() def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.set_content_types_choices(self.content_type_field) instance = kwargs.get("instance") if not instance: return if args: query_dict = args[0] query_dict._mutable = True query_dict.setlist( self.content_type_field, [getattr(instance, self.content_type_field).pk] ) query_dict._mutable = False self.fields[self.content_type_field].widget.attrs = {"disabled": "disabled"} keys = instance.get_keys() model = self._get_content_type_model(instance) help_text = [] for key in keys: try: field = model._meta.get_field(key) except FieldDoesNotExist: field = None if field and getattr(field, "verbose_name", None): key += f" ({field.verbose_name})" help_text.append(key) self.fields["key"].help_text = str(_("Available keys: ")) + " ; ".join( help_text ) + "
" + str(_("You can set many fields using \" ; \" as a separator.")) + "" class SheetFilterForm(BaseSheetFilterForm): class Meta: model = models_common.SheetFilter exclude = [] content_type_field = "content_type" def _get_content_type_model(self, instance): return instance.content_type.model_class() class ContentTypeSearchAdmin: def search_results(self, queryset, search_term): # search with real model name # not efficient but searching only in ~10 models search_terms = search_term.split(" ") for ct in ContentType.objects.all(): klass = ct.model_class() if not klass: continue if getattr(klass, "SHOW_URL", None): name = ct.name.lower() for idx, k in enumerate(reversed(search_terms[:])): if k in name: queryset |= self.model.objects.filter( content_type_id=ct.pk) return queryset def get_search_results(self, request, queryset, search_term): queryset, may_have_duplicates = super().get_search_results( request, queryset, search_term, ) queryset = self.search_results(queryset, search_term) return queryset, may_have_duplicates @admin.register(models_common.SheetFilter, site=admin_site) class SheetFilterAdmin(ContentTypeSearchAdmin, admin.ModelAdmin): form = SheetFilterForm model = models_common.SheetFilter list_display = ("content_type", "exclude_or_include", "key") list_filter = (ContentTypeListFilter,) search_fields = ("key",) class FilteredSheetForm(forms.ModelForm, ContentTypeChoice): class Meta: model = models_common.FilteredSheet exclude = [] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.set_content_types_choices("content_type") def clean(self): filters = self.cleaned_data.get("filters", []) content_type = self.cleaned_data.get("content_type", None) if not filters: return self.cleaned_data exc = None for filtr in filters.all(): if content_type != filtr.content_type: raise forms.ValidationError( _("Bad filter configuration. " "Only use filters of the associacted content type.") ) if not exc: exc = filtr.exclude_or_include elif exc != filtr.exclude_or_include: raise forms.ValidationError( _("You cannot mix exclude and include filters.") ) return self.cleaned_data @admin.register(models_common.FilteredSheet, site=admin_site) class FilteredSheetAdmin(ContentTypeSearchAdmin, admin.ModelAdmin): form = FilteredSheetForm model = models_common.FilteredSheet list_display = ("name", "content_type") list_filter = (ContentTypeListFilter,) autocomplete_fields = ("filters",) search_fields = ("name",) MAIN_ITEM_READONLY_FIELDS = [ "history_creator", "history_modifier", "search_vector", "history_m2m", "imports", "imports_updated", "timestamp_geo", "timestamp_label", ] @admin.register(models.BiographicalNote, site=admin_site) class BiographicalNoteAdmin(admin.ModelAdmin): list_display = ("denomination", "last_name", "first_name") autocomplete_fields = ["person", "organization"] model = models.BiographicalNote readonly_fields = MAIN_ITEM_READONLY_FIELDS exclude = ("search_vector", "created", "last_modified", "need_update", "ishtar_users") prepopulated_fields = {"slug": ("denomination",)} actions = [ export_as_csv_action( exclude=("last_modified", "locked", "timestamp_geo", "lock_user", "timestamp_label", "history_m2m", "history_modifier", "search_vector", "need_update", "id", "created", "history_creator", "person", "organization") ), ] search_fields = ( "denomination", "last_name", "first_name", "slug", ) @admin.register(models.GDPRLog, site=admin_site) class GDPRLogAdmin(admin.ModelAdmin): list_display = ("user", "date", "ip", "routable_ip", "activity") list_filter = ("activity",) search_fields = ("user__username",) actions = [ export_as_csv_action(exclude=("id",)), ] CSV_HEADER = (_("Date"), _("User"), _("IP"), _("Routable IP"), _("Activity"), _("Persons")) CSV_FIELDS = ("date", "user", "ip", "routable_ip", "activity_lbl", "persons_lbl") CSV_FIELDS_BOOLEAN = ("routable_ip",) def has_add_permission(self, request, obj=None): return False def has_change_permission(self, request, obj=None): return False def has_delete_permission(self, request, obj=None): return False def get_csv_name(self): now = timezone.now() lbl = str(_("export-gdpr")) return f"{now.strftime('%Y-%m-%d-%H%M')}-{lbl}" class AuthorAdmin(admin.ModelAdmin): list_display = ["person", "author_type"] list_filter = ("author_type",) search_fields = ( "person__name", "person__surname", "person__attached_to__name", "author_type__label", ) model = models.Author autocomplete_fields = ["person"] admin_site.register(models.Author, AuthorAdmin) class ChangeListForChangeView(ChangeList): def get_filters_params(self, params=None): """ Get the current list queryset parameters from _changelist_filters """ filtered_params = {} lookup_params = super(ChangeListForChangeView, self).get_filters_params(params) if "_changelist_filters" in lookup_params: field_names = [field.name for field in self.model._meta.get_fields()] params = lookup_params.pop("_changelist_filters") for param in params.split("&"): key, value = param.split("=") if not value or key not in field_names: continue filtered_params[key] = value return filtered_params class ImportActionAdmin(admin.ModelAdmin): change_list_template = "admin/gen_change_list.html" import_keys = ["slug", "txt_idx"] def get_urls(self): urls = super(ImportActionAdmin, self).get_urls() my_urls = [ re_path(r"^import-from-csv/$", self.import_generic), ] return my_urls + urls def import_generic(self, request): form = None if "apply" in request.POST: form = ImportGenericForm(request.POST, request.FILES) if form.is_valid(): encoding = request.encoding or "utf-8" csv_file = TextIOWrapper( request.FILES["csv_file"].file, encoding=encoding ) reader = csv.DictReader(csv_file) created, updated, missing_parent = 0, 0, [] for row in reader: slug_col = None for key in self.import_keys: if key in row: slug_col = key break if not slug_col: self.message_user( request, str( _("The CSV file should at least have a " "{} column") ).format("/".join(self.import_keys)), ) return slug = row.pop(slug_col) if "id" in row: row.pop("id") if "pk" in row: row.pop("pk") for k in list(row.keys()): value = row[k] if value == "None": value = "" try: field = self.model._meta.get_field(k) except FieldDoesNotExist: row.pop(k) continue if isinstance(field, CharField): if not value: value = "" elif isinstance(field, IntegerField): value = None if not value else int(value) elif isinstance(field, FloatField): value = None if not value else float(value) elif isinstance(field, BooleanField): if value in ("true", "True", "1"): value = True elif value in ("false", "False", "0"): value = False else: value = None elif isinstance(field, ForeignKey): if value: model = field.remote_field.model if model == ContentType: value = value.split("|") value = ContentType.objects.get(app_label=value[0], model=value[1]) elif "||" in value: try: value = model.objects.get_by_natural_key( *value.split("||") ) except model.DoesNotExist: missing_parent.append(row.pop(k)) break else: for slug_col2 in self.import_keys: try: value = model.objects.get(**{slug_col2: value}) except FieldError: continue except model.DoesNotExist: missing_parent.append(row.pop(k)) break else: value = None row[k] = value values = {slug_col: slug, "defaults": row} obj, c = self.model.objects.get_or_create(**values) if c: created += 1 else: updated += 1 self.model.objects.filter(pk=obj.pk).update(**row) if created: self.message_user(request, str(_("%d item(s) created.")) % created) if updated: self.message_user(request, str(_("%d item(s) updated.")) % updated) if missing_parent: self.message_user( request, str(_("These parents are missing: {}")).format( " ; ".join(missing_parent) ), ) c_url = reverse( "admin:%s_%s_changelist" % (self.model._meta.app_label, self.model._meta.model_name) ) return HttpResponseRedirect(c_url) if not form: form = ImportGenericForm() return render( request, "admin/import_from_file.html", {"file_form": form, "current_action": "import_generic"}, ) class ImportGeoJsonForm(forms.Form): json_file = forms.FileField( label=_("Geojson/JSON file"), help_text=_( "Only unicode encoding is managed - convert your" " file first. The file must be a geojson file or a zip " "containing a geojson file." ), ) numero_insee_prefix = forms.CharField( label=_("Prefix for numero INSEE"), max_length=20, required=False ) numero_insee_name = forms.CharField( label=_("Field name for numero INSEE"), max_length=200, initial="numero_insee" ) name_name = forms.CharField( label=_("Field name for name"), max_length=200, initial="name" ) UNIT_CHOICES = (("1", _("m2")), ("1000", _("km2"))) surface_unit = forms.ChoiceField(label=_("Surface unit"), choices=UNIT_CHOICES) surface_name = forms.CharField( label=_("Field name for surface"), max_length=200, required=False ) year_name = forms.CharField( label=_("Field name for year"), max_length=200, required=False, initial="year", help_text=_("Not required for new town. Leave it empty when not " "available."), ) update = forms.BooleanField( label=_("Update only geometry of existing towns"), required=False, widget=forms.CheckboxInput, ) class ImportGEOJSONActionAdmin(object): def get_urls(self): urls = super(ImportGEOJSONActionAdmin, self).get_urls() my_urls = [ re_path(r"^import-from-geojson/$", self.import_geojson), ] return my_urls + urls def geojson_values(self, request, idx, feature, keys, trace_error=True): for key in ("geometry", "properties"): if key in feature: continue if trace_error: error = str(_('"{}" not found in feature {}')).format(key, idx) self.message_user(request, error, level=messages.ERROR) return False values = {} for key in keys: if not key.endswith("_name"): continue if not keys[key]: values[key[: -len("_name")]] = None continue if keys[key] not in feature["properties"]: if trace_error: error = str(_('"{}" not found in properties of feature {}')).format( keys[key], idx ) self.message_user(request, error, level=messages.ERROR) return False value = feature["properties"][keys[key]] if key == "numero_insee_name" and keys["insee_prefix"]: value = keys["insee_prefix"] + value values[key[: -len("_name")]] = value try: geom = GEOSGeometry(json.dumps(feature["geometry"])) except (GEOSException, GDALException): if trace_error: error = str(_("Bad geometry for feature {}")).format(idx) self.message_user(request, error, level=messages.ERROR) return False if geom.geom_type == "Point": values["point_2d"] = geom elif geom.geom_type == "MultiPolygon": values["multi_polygon"] = geom elif geom.geom_type == "Polygon": values["multi_polygon"] = MultiPolygon(geom) else: if trace_error: error = str(_("Geometry {} not managed for towns - feature {}")).format( geom.geom_type, idx ) self.message_user(request, error, level=messages.ERROR) return False if keys["surface_unit"] and values["surface"]: try: values["surface"] = keys["surface_unit"] * int(values["surface"]) except ValueError: if trace_error: error = str(_("Bad value for surface: {} - feature {}")).format( values["surface"], idx ) self.message_user(request, error, level=messages.ERROR) return False return values def import_geojson_clean(self, tempdir): if not tempdir: return shutil.rmtree(tempdir) def import_geojson_error(self, request, error, base_dct, tempdir=None): self.import_geojson_clean(tempdir) self.message_user(request, error, level=messages.ERROR) return render(request, "admin/import_from_file.html", base_dct) def import_geojson(self, request): form = None if "apply" in request.POST: form = ImportGeoJsonForm(request.POST, request.FILES) if form.is_valid(): json_file_obj = request.FILES["json_file"] base_dct = {"file_form": form, "current_action": "import_geojson", "is_town": True} tempdir = tempfile.mkdtemp() tmpfilename = tempdir + os.sep + "dest_file" with open(tmpfilename, "wb+") as tmpfile: for chunk in json_file_obj.chunks(): tmpfile.write(chunk) json_filename = None if zipfile.is_zipfile(tmpfilename): zfile = zipfile.ZipFile(tmpfilename) for zmember in zfile.namelist(): if os.sep in zmember or ".." in zmember: continue if zmember.endswith("json"): zfile.extract(zmember, tempdir) json_filename = tempdir + os.sep + zmember break if not json_filename: error = _("No json file found in zipfile") return self.import_geojson_error( request, error, base_dct, tempdir ) else: json_filename = tmpfilename keys = { "numero_insee_name": request.POST.get("numero_insee_name"), "name_name": request.POST.get("name_name"), "surface_name": request.POST.get("surface_name", "") or "", "year_name": request.POST.get("year_name", "") or "", "update": request.POST.get("update", "") or "", "insee_prefix": request.POST.get("numero_insee_prefix", None) or "", "surface_unit": int(request.POST.get("surface_unit")), } town_content_type = ContentType.objects.get( app_label="ishtar_common", model="town" ) town_data_type, __ = models.GeoDataType.objects.get_or_create( txt_idx="town-limit", defaults={"label": _("Town limit")} ) with open(json_filename) as json_file_obj: json_file = json_file_obj.read() dct = json.loads(json_file) if "features" not in dct or not dct["features"]: # probably Ishtar JSON created, __, updated, __ = models.Town.geodata_import(dct) else: error_count = 0 created = 0 updated = 0 for idx, feat in enumerate(dct["features"]): trace_error = True if error_count == 6: self.message_user( request, _("Too many errors..."), level=messages.ERROR ) if error_count > 5: trace_error = False values = self.geojson_values( request, idx + 1, feat, keys, trace_error ) if not values: error_count += 1 continue num_insee = values.pop("numero_insee") year = values.pop("year") or None geo_values = {} for k in ["point_2d", "multi_polygon"]: if k in values: geo_values[k] = values.pop(k) t, c = models_common.Town.objects.get_or_create( numero_insee=num_insee, year=year, defaults=values ) if c: created += 1 else: modified = False if not keys["update"]: for k in values: if not getattr(t, k) or \ values[k] != getattr(t, k): setattr(t, k, values[k]) modified = True if modified: updated += 1 t.save() if geo_values: if t.main_geodata: # update for k in geo_values: c_value = getattr(t.main_geodata, k) if not c_value or \ geo_values[k] != c_value.wkt: setattr(t.main_geodata, k, geo_values[k]) t.main_geodata.save() else: gd, __ = models.GeoVectorData.objects.get_or_create( source_id=t.pk, source_content_type=town_content_type, data_type=town_data_type, defaults={"name": t.cached_label} ) for k in geo_values: setattr(gd, k, geo_values[k]) gd.save() t.main_geodata = gd t.save() if created: self.message_user( request, str(_("%d item(s) created.")) % created ) if updated: self.message_user( request, str(_("%d item(s) updated.")) % updated ) self.import_geojson_clean(tempdir) c_url = reverse( "admin:%s_%s_changelist" % (self.model._meta.app_label, self.model._meta.model_name) ) return HttpResponseRedirect(c_url) if not form: form = ImportGeoJsonForm() return render( request, "admin/import_from_file.html", {"file_form": form, "current_action": "import_geojson", "is_town": True}, ) class ImportJSONForm(forms.Form): json_file = forms.FileField( label=_("Zipped JSON file"), help_text=_("Import from a zipped JSON file generated by Ishtar"), ) class ImportJSONActionAdmin(admin.ModelAdmin): change_list_template = "admin/json_change_list.html" import_keys = ["slug", "txt_idx"] def get_urls(self): urls = super(ImportJSONActionAdmin, self).get_urls() my_urls = [ re_path(r"^import-from-json/$", self.import_json), ] return my_urls + urls def import_json(self, request): form = None if "apply" in request.POST: form = ImportJSONForm(request.POST, request.FILES) if form.is_valid(): with tempfile.TemporaryDirectory() as tmpdirname: filename = tmpdirname + os.sep + "export.zip" with open(filename, "wb+") as zipped_file: for chunk in request.FILES["json_file"].chunks(): zipped_file.write(chunk) result = restore_serialized(filename) try: result = restore_serialized(filename) except ValueError as e: self.message_user(request, str(e), level=messages.ERROR) if result: for model, count in result: self.message_user( request, str(_("{} {}(s) created/updated.")).format( count, model ), ) c_url = reverse( "admin:%s_%s_changelist" % (self.model._meta.app_label, self.model._meta.model_name) ) return HttpResponseRedirect(c_url) if not form: form = ImportJSONForm() return render( request, "admin/import_from_file.html", {"file_form": form, "current_action": "import_json"}, ) class AdminTownForm(forms.ModelForm): class Meta: model = models_common.Town exclude = ["imports", "center", "limit"] class TownParentInline(admin.TabularInline): model = models_common.Town.children.through fk_name = "to_town" autocomplete_fields = ["from_town"] verbose_name = _("Parent") verbose_name_plural = _("Parents") extra = 1 class TownAdmin(ImportGEOJSONActionAdmin, ImportActionAdmin): change_list_template = "admin/town_change_list.html" model = models_common.Town list_display = ["name", "year"] search_fields = ["name"] readonly_fields = ["cached_label"] if settings.COUNTRY == "fr": list_display += ["numero_insee"] search_fields += ["numero_insee"] list_filter = ("areas",) form = AdminTownForm autocomplete_fields = ["children", "main_geodata", "geodata", "documents", "main_image"] inlines = [TownParentInline] actions = [ export_as_csv_action(exclude=["center", "limit"]), export_as_geojson_action( "limit", exclude=["center", "departement", "cached_label"] ), ] import_keys = ["slug", "txt_idx", "numero_insee"] admin_site.register(models_common.Town, TownAdmin) class PreviousNextAdmin: def get_changelist_queryset(self, request): # adapt GET to a changelist_view style get = request.GET.get("_changelist_filters", []) if isinstance(get, str): get = [get] self._current_query = {} for value in get: if "=" not in value: continue keys = value.split("=") if len(keys) != 2: continue k, v = keys self._current_query[k] = v fake_request = InlineClass( {"GET": self._current_query, "user": request.user, "resolver_match": request.resolver_match} ) # only set attributes needed cl = self.get_changelist_instance(fake_request) return cl.get_queryset(request) def change_view(self, request, object_id, form_url="", extra_context=None): """ Next and previous button on the change view """ if not extra_context: extra_context = {} ids = list(self.get_changelist_queryset(request).values("pk")) previous, current_is_reached, first = None, False, None extra_context["get_attr"] = "" if request.GET: extra_context["get_attr"] = "?" + request.GET.urlencode() for v in ids: pk = str(v["pk"]) if pk == object_id: current_is_reached = True if previous: extra_context["previous_item"] = previous elif current_is_reached: extra_context["next_item"] = pk break else: if not first: first = pk previous = pk if ( "previous_item" not in extra_context and "next_item" not in extra_context and first ): # on modify current object do not match current criteria # next is the first item extra_context["next_item"] = first return super().change_view(request, object_id, form_url, extra_context) class GeneralTypeAdmin(PreviousNextAdmin, ChangeParentAdmin, ImportActionAdmin, ImportJSONActionAdmin): search_fields = ( "label", "txt_idx", "comment", ) list_filter = ("available",) save_on_top = True actions = [ export_as_csv_action(), serialize_type_action, change_value("available", True, _("Make available")), change_value("available", False, _("Make unavailable")), ] prepopulated_fields = {"txt_idx": ("label",)} LIST_DISPLAY = ["label", "txt_idx", "available", "comment"] extra_list_display = [] CSV_FIELD_ORDER = [ "id", "label", "txt_idx", "parent", "order", "available", "comment", ] def get_list_display(self, request): list_display = list(self.LIST_DISPLAY)[:] if hasattr(self.model, "parent"): list_display.insert(2, "parent") return list_display + self.extra_list_display general_models = [ models.SourceType, models.AuthorType, models.Language, models.LicenseType, models.PersonType, models.ShootingAngle, models_common.GeoProviderType, models_common.GeoDataType, models_common.GeoOriginType, models_common.GeoBufferType, ] for model in general_models: admin_site.register(model, GeneralTypeAdmin) @admin.register(models.OrganizationType, site=admin_site) class PersonTypeAdmin(GeneralTypeAdmin): LIST_DISPLAY = ["label", "grammatical_gender", "txt_idx", "available", "comment"] @admin.register(models.TitleType, site=admin_site) class TitleType(GeneralTypeAdmin): LIST_DISPLAY = [ "label", "long_title", "grammatical_gender", "txt_idx", "available", "comment", ] class GlobalVarAdmin(ImportActionAdmin, ImportJSONActionAdmin): list_display = ["slug", "value", "description"] list_editable = ["value", "description"] save_on_top = True actions = [ export_as_csv_action(), serialize_type_action, ] CSV_FIELD_ORDER = ["slug", "value", "description"] admin_site.register(models.GlobalVar, GlobalVarAdmin) class CreateAreaForm(forms.Form): department_number = forms.IntegerField(label=_("Department number")) area_name = forms.CharField(label=_("Area name"), required=False) area = forms.ChoiceField(label=_("Area"), required=False, choices=[]) def __init__(self, *args, **kwargs): super(CreateAreaForm, self).__init__(*args, **kwargs) self.fields["area"].choices = [("", "--")] + [ (area.pk, area.label) for area in models.Area.objects.order_by("reference") ] def clean(self): area_name = self.cleaned_data.get("area_name", "") area = self.cleaned_data.get("area", 0) if (not area_name and not area) or (area and area_name): raise forms.ValidationError( _("Choose an area or set an area reference.") ) return self.cleaned_data def clean_department_number(self): value = self.cleaned_data.get("department_number", 0) if value < 1 or (value > 95 and (value < 970 or value > 989)): raise forms.ValidationError(_("Invalid department number.")) return value def clean_area_name(self): value = self.cleaned_data.get("area_name", "") if not value: return value if models.Area.objects.filter(label=value).count(): raise forms.ValidationError( _("This name is already used by " "another area.") ) return value class CreateDepartmentActionAdmin(GeneralTypeAdmin): change_list_template = "admin/area_change_list.html" def get_urls(self): urls = super(CreateDepartmentActionAdmin, self).get_urls() my_urls = [ re_path(r"^create-department/$", self.create_area), ] return my_urls + urls def create_area(self, request): form = None if "apply" in request.POST: form = CreateAreaForm(request.POST) if form.is_valid(): area_name = form.cleaned_data.get("area_name", "") dpt_num = form.cleaned_data["department_number"] dpt_num = "0" + str(dpt_num) if dpt_num < 10 else str(dpt_num) if area_name: slug = "dep-" + dpt_num while models.Area.objects.filter(txt_idx=slug).count(): slug += "b" area = models.Area.objects.create(label=area_name, txt_idx=slug) self.message_user( request, str(_('Area "{}" created.')).format(area_name) ) else: area = models.Area.objects.get(id=form.cleaned_data["area"]) current_towns = [a.numero_insee for a in area.towns.all()] nb = 0 for town in ( models.Town.objects.filter(numero_insee__startswith=dpt_num) .exclude(numero_insee__in=current_towns) .all() ): area.towns.add(town) nb += 1 self.message_user( request, str(_('{} town(s) added to "{}".')).format( nb, area_name or area.label ), ) c_url = reverse( "admin:%s_%s_changelist" % (self.model._meta.app_label, self.model._meta.model_name) ) return HttpResponseRedirect(c_url) if not form: form = CreateAreaForm() return render( request, "admin/create_area_dpt.html", {"form": form, "current_action": "create_area"}, ) @admin.register(models.SupportType, site=admin_site) class SupportType(GeneralTypeAdmin): model = models.SupportType autocomplete_fields = ["document_types"] @admin.register(models.Format, site=admin_site) class Format(GeneralTypeAdmin): model = models.Format autocomplete_fields = ["document_types"] @admin.register(models.DocumentTag, site=admin_site) class DocumentTag(MergeActionAdmin, GeneralTypeAdmin): pass class DocumentAdmin(admin.ModelAdmin): model = models.Document search_fields = ("title", "reference", "internal_reference") autocomplete_fields = ("lock_user", "source", "authors") readonly_fields = [ "history_creator", "history_modifier", "search_vector", "history_m2m", "imports", "cached_label", "cache_related_label" ] admin_site.register(models.Document, DocumentAdmin) class AreaAdmin(CreateDepartmentActionAdmin): list_display = ("label", "reference", "parent", "available") search_fields = ("label", "reference") list_filter = ("parent",) model = models.Area autocomplete_fields = ["towns", "parent", "documents", "main_image"] admin_site.register(models.Area, AreaAdmin) model_translations = { "administrativeact": str(_("administrative act")), "contextrecord": str(_("context record")), "document": str(_("document")), "file": str(_("archaeological file")), "find": str(_("find")), "operation": str(_("operation")), "treatment": str(_("treatment")), "treatmentfile": str(_("treament file")), "warehouse": str(_("warehouse")), "geovectordata": str(_("geographic data")), } class ProfileTypeAdmin(GeneralTypeAdmin): model = models.ProfileType filter_vertical = ("groups",) autocomplete_fields = ("permission_queries", "filtered_sheets") def save_related(self, request, form, formsets, change): super().save_related(request, form, formsets, change) # clean "owns" VS "generics" groups form.instance.clean_groups() def check_queries(self, request, obj): model_list = [] error = False for permission_query in obj.permission_queries.all(): model = permission_query.model.model if model in model_list: error = model break model_list.append(model) if not error: return if error in model_translations: error = model_translations[error] error = error.capitalize() messages.add_message( request, messages.ERROR, mark_safe(str(_( "You can attach only one permission query by model.
" "{} has many permission query attached.")).format( error ) ) ) def check_permission(self, request, object_id): # check that all "own" permission has a request associated try: obj = models.ProfileType.objects.get(pk=int(object_id)) except models.ProfileType.DoesNotExist: return Http404() self.check_queries(request, obj) permissions_needed = set() permissions_not_needed = set() for model in ("basefind", "import", "biographicalnote"): for perm_type in ("add", "change", "delete", "view"): permissions_not_needed.add((perm_type, model)) for group in obj.groups.all(): for perm in group.permissions.all(): sp = perm.codename.split("_") perm_type = (sp[0], sp[-1]) if sp[1] == "own": permissions_needed.add(perm_type) else: permissions_not_needed.add(perm_type) for permission_query in obj.permission_queries.all(): model = permission_query.model.model for perm_type, perm_model in list(permissions_needed): if model == perm_model: permissions_needed.remove((perm_type, perm_model)) for permission in list(permissions_needed): if permission in permissions_not_needed: permissions_needed.remove(permission) if permissions_needed: perm_needed = [] for p in sorted(set([model for __, model in permissions_needed])): if p in model_translations: p = model_translations[p] perm_needed.append(p) permission_needed = ", ".join(sorted(perm_needed)) messages.add_message( request, messages.ERROR, mark_safe(str(_( """Permission requests are needed for theses models: {permission_needed}.
Associate the profile type \"{obj}\" with correct permission request otherwise default request will be used (Ishtar v4.4) or no permission will be granted (Ishtar >= v5).""")).format(permission_needed=permission_needed, obj=obj)) ) def change_view(self, request, object_id, form_url="", extra_context=None): returned = super().change_view( request, object_id, form_url=form_url, extra_context=extra_context ) if not request.POST or not request.POST.get("_continue", False): self.check_permission(request, object_id) return returned admin_site.register(models.ProfileType, ProfileTypeAdmin) class ProfileTypeSummaryAdmin(admin.ModelAdmin): change_list_template = "admin/profiletype_summary_change_list.html" search_fields = ("label",) list_filter = ("available", "label") def has_add_permission(self, request, obj=None): return False def changelist_view(self, request, extra_context=None): response = super(ProfileTypeSummaryAdmin, self).changelist_view( request, extra_context=extra_context, ) try: qs = response.context_data["cl"].queryset except (AttributeError, KeyError): return response profile_types = list(qs.order_by("label")) rights = { profile_type.pk: [g.pk for g in profile_type.groups.all()] for profile_type in profile_types } groups = [] ok = mark_safe( 'True'.format( settings.STATIC_URL ) ) for group in models.Group.objects.order_by("name"): gp = [group.name] for profile_type in profile_types: gp.append(ok if group.pk in rights[profile_type.pk] else "-") groups.append(gp) response.context_data.update({"profile_types": profile_types, "groups": groups}) return response admin_site.register(models.ProfileTypeSummary, ProfileTypeSummaryAdmin) class IshtarUserAdmin(admin.ModelAdmin): model = models.IshtarUser search_fields = ("user_ptr__username", "person__raw_name") exclude = ("search_vector",) readonly_fields = ("user_ptr", "latest_news_version",) autocomplete_fields = ["person"] admin_site.register(models.IshtarUser, IshtarUserAdmin) class ImporterDefaultValuesInline(admin.TabularInline): model = models.ImporterDefaultValues class ImporterDefaultAdmin(admin.ModelAdmin): list_display = ("importer_type", "target") model = models.ImporterDefault inlines = (ImporterDefaultValuesInline,) admin_site.register(models.ImporterDefault, ImporterDefaultAdmin) def duplicate_importertype(modeladmin, request, queryset): res = [] for obj in queryset.order_by("pk"): old_pk = obj.pk obj.pk = None obj.slug = create_slug(models.ImporterType, obj.name) obj.name = obj.name + " - duplicate" obj.name = obj.name[:200] obj.save() # create new old_obj = modeladmin.model.objects.get(pk=old_pk) for m in old_obj.created_models.all(): obj.created_models.add(m) for u in old_obj.users.all(): obj.users.add(u) for default in old_obj.defaults.all(): default_pk = default.pk default.pk = None # create new default.importer_type = obj default.save() old_default = default.__class__.objects.get(pk=default_pk) for def_value in old_default.default_values.all(): def_value.pk = None def_value.default_target = default def_value.save() for col in old_obj.columns.all(): col_pk = col.pk col.pk = None # create new col.importer_type = obj col.save() old_col = col.__class__.objects.get(pk=col_pk) for df in old_col.duplicate_fields.all(): df.pk = None # create new df.column = col df.save() for tg in old_col.targets.all(): tg.pk = None # create new tg.column = col tg.save() res.append(str(obj)) messages.add_message( request, messages.INFO, str(_("{} importer type(s) duplicated: {}.")).format( queryset.count(), " ; ".join(res) ), ) c_url = ( reverse( "admin:%s_%s_changelist" % (modeladmin.model._meta.app_label, modeladmin.model._meta.model_name) ) + "?" + urllib.parse.urlencode(request.GET) ) return HttpResponseRedirect(c_url) duplicate_importertype.short_description = _("Duplicate") def generate_libreoffice_template(modeladmin, request, queryset): if queryset.count() != 1: messages.add_message( request, messages.ERROR, str(_("Select only one importer.")) ) c_url = ( reverse( "admin:%s_%s_changelist" % (modeladmin.model._meta.app_label, modeladmin.model._meta.model_name) ) + "?" + urllib.parse.urlencode(request.GET) ) return HttpResponseRedirect(c_url) importer_type = queryset.all()[0] dest_filename = importer_type.get_libreoffice_template() in_memory = BytesIO() with open(dest_filename, "rb") as fle: in_memory.write(fle.read()) filename = dest_filename.split(os.sep)[-1] response = HttpResponse( content_type="application/vnd.oasis.opendocument.spreadsheet" ) response["Content-Disposition"] = "attachment; filename=%s" % filename.replace( " ", "_" ) in_memory.seek(0) response.write(in_memory.read()) return response generate_libreoffice_template.short_description = _("Export as libreoffice template") importer_type_actions = [duplicate_importertype] if settings.USE_LIBREOFFICE: importer_type_actions.append(generate_libreoffice_template) serialize_importer_action = serialize_action("common_imports", IMPORT_MODEL_LIST) serialize_importer_action.short_description = SERIALIZE_DESC @admin.register(models.ImporterType, site=admin_site) class ImporterTypeAdmin(ImportJSONActionAdmin): list_display = ("name", "associated_models", "available", "importer_groups_label") actions = importer_type_actions + [ serialize_importer_action, change_value("available", True, _("Make available")), change_value("available", False, _("Make unavailable")), ] list_filter = ["available"] search_fields = ["name"] autocomplete_fields = ["users"] prepopulated_fields = {"slug": ("name",)} class ImporterGroupImporterInline(admin.TabularInline): model = models.ImporterGroupImporter extra = 3 @admin.register(models.ImporterGroup, site=admin_site) class ImporterGroupAdmin(ImportJSONActionAdmin): list_display = ("name", "importer_types_label", "available") actions = [ serialize_importer_action, change_value("available", True, _("Make available")), change_value("available", False, _("Make unavailable")), ] list_filter = ["available"] search_fields = ["name"] prepopulated_fields = {"slug": ("name",)} inlines = [ImporterGroupImporterInline] class RegexpAdmin(admin.ModelAdmin): list_display = ("name", "regexp", "description") admin_site.register(models.Regexp, RegexpAdmin) class ValueFormaterAdmin(admin.ModelAdmin): list_display = ("name", "format_string", "description") prepopulated_fields = {"slug": ("name",)} admin_site.register(models.ValueFormater, ValueFormaterAdmin) def duplicate_importercolumn(modeladmin, request, queryset): res = [] for col in queryset.order_by("col_number"): old_pk = col.pk col.pk = None col.label = (col.label or "") + " - duplicate" col.label = col.label[:200] # get the next available col number col_nb = col.col_number + 1 while modeladmin.model.objects.filter( col_number=col_nb, importer_type=col.importer_type ).count(): col_nb += 1 col.col_number = col_nb col.save() # create new old_col = modeladmin.model.objects.get(pk=old_pk) for df in old_col.duplicate_fields.all(): df.pk = None # create new df.column = col df.save() for tg in old_col.targets.all(): tg.pk = None # create new tg.column = col tg.save() res.append(str(col)) messages.add_message( request, messages.INFO, str(_("{} importer column(s) duplicated: {}.")).format( queryset.count(), " ; ".join(res) ), ) c_url = ( reverse( "admin:%s_%s_changelist" % (modeladmin.model._meta.app_label, modeladmin.model._meta.model_name) ) + "?" + urllib.parse.urlencode(request.GET) ) return HttpResponseRedirect(c_url) duplicate_importercolumn.short_description = _("Duplicate") def shift_right(modeladmin, request, queryset): for col in queryset.order_by("-col_number"): # get the next available col number col_nb = col.col_number + 1 while modeladmin.model.objects.filter( col_number=col_nb, importer_type=col.importer_type ).count(): col_nb += 1 col.col_number = col_nb col.save() messages.add_message( request, messages.INFO, str(_("{} importer column(s) right-shifted.")).format(queryset.count()), ) c_url = ( reverse( "admin:%s_%s_changelist" % (modeladmin.model._meta.app_label, modeladmin.model._meta.model_name) ) + "?" + urllib.parse.urlencode(request.GET) ) return HttpResponseRedirect(c_url) shift_right.short_description = _("Shift right") def shift_left(modeladmin, request, queryset): errors, oks = 0, 0 for col in queryset.order_by("col_number"): # get the next available col number if col.col_number == 1: errors += 1 continue col_nb = col.col_number - 1 error = False while modeladmin.model.objects.filter( col_number=col_nb, importer_type=col.importer_type ).count(): col_nb -= 1 if col_nb <= 0: errors += 1 continue col.col_number = col_nb col.save() oks += 1 if oks: messages.add_message( request, messages.INFO, str(_("{} importer column(s) left-shifted.")).format(oks), ) if errors: messages.add_message( request, messages.ERROR, str( _("{} importer column(s) not left-shifted: no " "place available.") ).format(errors), ) c_url = ( reverse( "admin:%s_%s_changelist" % (modeladmin.model._meta.app_label, modeladmin.model._meta.model_name) ) + "?" + urllib.parse.urlencode(request.GET) ) return HttpResponseRedirect(c_url) shift_left.short_description = _("Shift left") class ImporterDuplicateFieldInline(admin.TabularInline): model = models.ImporterDuplicateField class ImportTargetForm(forms.ModelForm): class Meta: model = models.ImportTarget exclude = [] widgets = {"comment": forms.TextInput} class ImportTargetInline(admin.TabularInline): model = models.ImportTarget extra = 1 form = ImportTargetForm class ImporterColumnAdmin(PreviousNextAdmin, admin.ModelAdmin): list_display = ( "label", "importer_type", "required", "col_number", "col_string", "description", "formater_type_lbl", "targets_lbl", "duplicate_fields_lbl", ) list_filter = ( ( "importer_type__available", custom_titled_filter(_("Importer available"), admin.BooleanFieldListFilter), ), "importer_type", ) search_fields = ("label",) inlines = (ImportTargetInline, ImporterDuplicateFieldInline) actions = [duplicate_importercolumn, shift_left, shift_right] def get_changeform_initial_data(self, request): """ Get current importer type and the first available column """ initial = super().get_changeform_initial_data(request) base_query = self.get_changelist_queryset(request) importer_type_id = None if base_query.exists(): importer_type_id = base_query.values_list( "importer_type_id", flat=True).all()[0] if not importer_type_id: importer_type_id = self._current_query.get("importer_type__id__exact", None) if importer_type_id and base_query.exclude( importer_type_id=importer_type_id).exists(): # not only one importer_type_id in current queryset return initial try: initial["importer_type"] = models.ImporterType.objects.get( id=importer_type_id) except models.ImporterType.DoesNotExist: return initial with connection.cursor() as cursor: query = """ SELECT min(col_number) + 1 FROM ishtar_common_importercolumn ic_a WHERE importer_type_id = %s AND NOT EXISTS ( SELECT col_number FROM ishtar_common_importercolumn ic_b WHERE ic_b.col_number = ic_a.col_number + 1 AND importer_type_id = %s )""" cursor.execute(query, [importer_type_id, importer_type_id]) row = cursor.fetchone() if row: initial["col_number"] = row[0] return initial admin_site.register(models.ImporterColumn, ImporterColumnAdmin) class ImporterModelAdmin(ImportActionAdmin, ImportJSONActionAdmin): list_display = ("name", "klass") import_keys = ["klass"] model = models.ImporterModel actions = [ export_as_csv_action(), serialize_type_action, ] CSV_FIELD_ORDER = [ "id", "klass", "name", ] admin_site.register(models.ImporterModel, ImporterModelAdmin) class FormaterTypeAdmin(admin.ModelAdmin): list_display = ("formater_type", "options") admin_site.register(models.FormaterType, FormaterTypeAdmin) class ImportAdmin(admin.ModelAdmin): list_display = ( "name", "importer_type", "imported_file", "user", "state", "creation_date", ) autocomplete_fields = ["user"] search_fields = ("name", "importer_type__name") admin_site.register(models.Import, ImportAdmin) @admin.register(models.ImportGroup, site=admin_site) class ImportGroupAdmin(admin.ModelAdmin): list_display = ( "name", "importer_type", "imported_file", "user", "state", "creation_date", ) autocomplete_fields = ["user"] class MediaExporterForm(forms.ModelForm): class Meta: model = models.MediaExporter exclude = [] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.fields["associated_model"].choices = [ ('', '-' * 9)] + [ (model.pk, str(model)) for model in models.ImporterModel.objects.all() if hasattr(model.get_class(), "documents") ] @admin.register(models.MediaExporter, site=admin_site) class MediaExporterAdmin(admin.ModelAdmin): list_display = ("name", "associated_model", "available") actions = [ change_value("available", True, _("Make available")), change_value("available", False, _("Make unavailable")), ] list_filter = ["available"] search_fields = ["name"] autocomplete_fields = ["users"] prepopulated_fields = {"slug": ("name",)} form = MediaExporterForm class OperationTypeAdmin(GeneralTypeAdmin): extra_list_display = ["order", "preventive"] model = models.OperationType admin_site.register(models.OperationType, OperationTypeAdmin) class SpatialReferenceSystemAdmin(GeneralTypeAdmin): extra_list_display = ["order", "srid"] model = models.SpatialReferenceSystem admin_site.register(models.SpatialReferenceSystem, SpatialReferenceSystemAdmin) @admin.register(models.ItemKey, site=admin_site) class ItemKeyAdmin(ImportJSONActionAdmin): list_display = ( "content_type", "content_type_model", "key", "content_object", "linked_to_all", "linked_to_importer_type", "linked_to_importer", "linked_to_user" ) search_fields = ("key", "content_type__model") list_filter = (("content_type", admin.RelatedOnlyFieldListFilter),) autocomplete_fields = ["user", "ishtar_import"] actions = [ serialize_type_action ] serialize_filter_queryset = { "user__isnull": True, "group__isnull": True, "ishtar_import__isnull": True } @admin.register(models.TargetKey, site=admin_site) class TargetKeyAdmin(admin.ModelAdmin): list_display = ("target", "importer_type", "column_nb", "key", "value", "is_set") list_filter = ("is_set", "target__column__importer_type") search_fields = ("target__target", "value", "key") @admin.register(models.ImportColumnValue, site=admin_site) class ImportColumnValue(admin.ModelAdmin): list_display = ("import_item", "column") class JsonContentTypeFormMixin(object): class Meta: model = models.JsonDataSection exclude = [] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) choices = [] for choice, label in self.fields["content_type"].choices: if not choice: choices.append(('', label)) continue try: pk = int(choice.value) except ValueError: choices.append(('', label)) continue ct = ContentType.objects.get(pk=pk) model_class = ct.model_class() if hasattr(model_class, "data") and not hasattr( model_class, "history_type" ): choices.append((pk, label)) self.fields["content_type"].choices = sorted(choices, key=lambda x: x[1]) class JsonDataSectionForm(JsonContentTypeFormMixin, forms.ModelForm): class Meta: model = models.JsonDataSection exclude = [] class JsonDataSectionAdmin(admin.ModelAdmin): list_display = ["name", "content_type", "order"] form = JsonDataSectionForm admin_site.register(models.JsonDataSection, JsonDataSectionAdmin) class JsonDataFieldForm(JsonContentTypeFormMixin, forms.ModelForm): class Meta: model = models.JsonDataField exclude = [] serialize_json_action = serialize_action( "types", [models.JsonDataSection, models.JsonDataField] ) serialize_json_action.short_description = SERIALIZE_DESC class JsonDataFieldAdmin(ImportActionAdmin, ImportJSONActionAdmin): import_keys = ["key"] list_display = [ "name", "content_type", "key", "display", "value_type", "search_index", "order", "section", ] actions = [ export_as_csv_action(), serialize_json_action, change_value("display", True, _("Display selected")), change_value("display", False, _("Hide selected")), ] list_filter = ["value_type", "search_index"] form = JsonDataFieldForm model = models.JsonDataField admin_site.register(models.JsonDataField, JsonDataFieldAdmin) def get_choices_form(): cache_key, value = get_cache(models.CustomForm, ["associated-forms"]) if value: return value register, register_fields = models.CustomForm.register() forms = [ (slug, models.CustomForm._register[slug].form_admin_name) for slug in register.keys() ] forms = sorted(forms, key=lambda x: x[1]) cache.set(cache_key, forms, settings.CACHE_TIMEOUT) return forms class CustomFormForm(forms.ModelForm): class Meta: model = models.CustomForm exclude = [] form = forms.ChoiceField(label=_("Form"), choices=get_choices_form) class ExcludeFieldFormset(BaseInlineFormSet): def get_form_kwargs(self, index): kwargs = super(ExcludeFieldFormset, self).get_form_kwargs(index) if not self.instance or not self.instance.pk: return kwargs form = self.instance.get_form_class() if not form: kwargs["choices"] = [] return kwargs kwargs["choices"] = [("", "--")] + form.get_custom_fields() return kwargs class ExcludeFieldForm(forms.ModelForm): class Meta: model = models.ExcludedField exclude = [] field = forms.ChoiceField(label=_("Field")) def __init__(self, *args, **kwargs): choices = kwargs.pop("choices") if "choices" in kwargs else [] super(ExcludeFieldForm, self).__init__(*args, **kwargs) self.fields["field"].choices = choices class ExcludeFieldInline(admin.TabularInline): model = models.ExcludedField extra = 2 form = ExcludeFieldForm formset = ExcludeFieldFormset class JsonFieldFormset(BaseInlineFormSet): def get_form_kwargs(self, index): kwargs = super(JsonFieldFormset, self).get_form_kwargs(index) if not self.instance or not self.instance.pk: return kwargs kwargs["choices"] = [("", "--")] + self.instance.get_available_json_fields() return kwargs class JsonFieldForm(forms.ModelForm): class Meta: model = models.CustomFormJsonField exclude = [] def __init__(self, *args, **kwargs): choices = kwargs.pop("choices") if "choices" in kwargs else [] super(JsonFieldForm, self).__init__(*args, **kwargs) self.fields["json_field"].choices = choices class JsonFieldInline(admin.TabularInline): model = models.CustomFormJsonField extra = 2 form = JsonFieldForm formset = JsonFieldFormset class CustomFormAdmin(admin.ModelAdmin): list_display = [ "name", "form", "available", "enabled", "apply_to_all", "users_lbl", "user_types_lbl", ] fields = ( "name", "form", "available", "enabled", "header", "apply_to_all", "users", "user_types", "profile_types", ) autocomplete_fields = ["users"] form = CustomFormForm inlines = [ExcludeFieldInline, JsonFieldInline] def get_inline_instances(self, request, obj=None): # no inline on creation if not obj: return [] return super(CustomFormAdmin, self).get_inline_instances(request, obj=obj) def get_readonly_fields(self, request, obj=None): if obj: return ("form", "user_types") return ("user_types",) admin_site.register(models.CustomForm, CustomFormAdmin) class AdministrationScriptAdmin(admin.ModelAdmin): list_display = ["name", "path"] def get_readonly_fields(self, request, obj=None): if obj: return ("path",) return [] admin_site.register(models.AdministrationScript, AdministrationScriptAdmin) class AdministrationTaskAdmin(admin.ModelAdmin): readonly_fields = ( "state", "creation_date", "launch_date", "finished_date", "result", ) list_display = [ "script", "state", "creation_date", "launch_date", "finished_date", "result", ] list_filter = ["script", "state"] def get_readonly_fields(self, request, obj=None): if obj: return ("script",) + self.readonly_fields return self.readonly_fields admin_site.register(models.AdministrationTask, AdministrationTaskAdmin) def launch_export_action(modeladmin, request, queryset): model = modeladmin.model back_url = ( reverse( "admin:%s_%s_changelist" % (model._meta.app_label, model._meta.model_name) ) + "?" + urllib.parse.urlencode(request.GET) ) if queryset.count() != 1: messages.add_message(request, messages.ERROR, str(_("Select only one task."))) return HttpResponseRedirect(back_url) export_task = queryset.all()[0] if export_task.state != "C": messages.add_message( request, messages.ERROR, str(_("Export already exported/scheduled.")) ) return HttpResponseRedirect(back_url) export_task.state = "S" export_task.save() if not settings.USE_BACKGROUND_TASK: return launch_export(export_task.pk) return launch_export.delay(export_task.pk) launch_export_action.short_description = _("Launch export") class ExportTaskAdmin(admin.ModelAdmin): readonly_fields = ("result", "result_info") exclude = ("creation_date", "launch_date", "finished_date") list_display = [ "label", "state", "result_info", "result", "creation_date", "launch_date", "finished_date", ] list_filter = ["state"] actions = [launch_export_action] autocomplete_fields = ["lock_user"] admin_site.register(models.ExportTask, ExportTaskAdmin) def launch_import_action(modeladmin, request, queryset): model = modeladmin.model back_url = ( reverse( "admin:%s_%s_changelist" % (model._meta.app_label, model._meta.model_name) ) + "?" + urllib.parse.urlencode(request.GET) ) if queryset.count() != 1: messages.add_message(request, messages.ERROR, str(_("Select only one task."))) return HttpResponseRedirect(back_url) import_task = queryset.all()[0] if import_task.state != "C": messages.add_message( request, messages.ERROR, str(_("Import already imported/scheduled.")) ) return HttpResponseRedirect(back_url) import_task.state = "S" import_task.save() if not settings.USE_BACKGROUND_TASK: return launch_import(import_task.pk) return launch_import.delay(import_task.pk) launch_import_action.short_description = _("Launch import") class ImportTaskAdmin(admin.ModelAdmin): exclude = ("creation_date", "launch_date", "finished_date") list_display = [ "creation_date", "source", "state", "import_user", "launch_date", "finished_date", ] list_filter = ["state"] autocomplete_fields = ["import_user"] actions = [launch_import_action] class Media: js = ("js/admin/archive_import.js",) admin_site.register(models.ImportTask, ImportTaskAdmin) class UserProfileAdmin(admin.ModelAdmin): list_display = ["person", "profile_type", "area_labels"] list_filter = ["profile_type"] search_fields = ["person__raw_name"] model = models.UserProfile autocomplete_fields = ["areas"] admin_site.register(models.UserProfile, UserProfileAdmin) class DocumentTemplateAdmin(admin.ModelAdmin): list_display = ["name", "associated_model", "available", "for_labels"] list_filter = ["available", "associated_model"] prepopulated_fields = {"slug": ("name",)} admin_site.register(models.DocumentTemplate, DocumentTemplateAdmin) @admin.register(models_rest.UserRequestToken, site=admin_site) class UserRequestTokenAdmin(admin.ModelAdmin): list_display = ("user", "access_type", "created") readonly_fields = ("key",) @admin.register(models_rest.UserToken, site=admin_site) class UserTokenAdmin(admin.ModelAdmin): list_display = ("user", "access_type", "name", "last_ip", "last_access") readonly_fields = ("key", "last_ip", "last_access") class ApiUserAdmin(admin.ModelAdmin): list_display = ("user_ptr", "ip") admin_site.register(models_rest.ApiUser, ApiUserAdmin) def get_api_choices(): pks = [] for app_label, model_name in API_MAIN_CONTENT_TYPES: try: ct = ContentType.objects.get(app_label=app_label, model=model_name) pks.append(ct.pk) except ContentType.DoesNotExist: pass return {"pk__in": pks} class ApiSearchModelAdminForm(forms.ModelForm): class Meta: model = models_rest.ApiUser exclude = ["table_format"] content_type = forms.ModelChoiceField( label=_("Content type"), queryset=ContentType.objects, limit_choices_to=get_api_choices ) def __init__(self, *args, **kwargs): super(ApiSearchModelAdminForm, self).__init__(*args, **kwargs) if self.instance.id: model_name = self.instance.content_type.model.replace("_", "") models_name = [] if model_name == "find": models_name.append( f"{self.instance.content_type.app_label}.models_finds.{model_name}" ) models_name.append( f"{self.instance.content_type.app_label}.models.{model_name}" ) q = None for model_name in models_name: q1 = Q(associated_models__klass__iexact=model_name) if not q: q = q1 else: q |= q1 q = models.ImporterType.objects.filter(q) # self.fields['table_format'].queryset = q self.fields['export'].queryset = q class ApiSearchModelAdmin(admin.ModelAdmin): form = ApiSearchModelAdminForm list_display = ("user", "content_type") def get_readonly_fields(self, request, obj=None): fields = tuple(super().get_readonly_fields(request, obj) or []) if obj: # editing an existing object return fields + tuple(["content_type"]) return fields admin_site.register(models_rest.ApiSearchModel, ApiSearchModelAdmin) def send_error_message(request, msg, message_type=messages.ERROR): messages.add_message( request, message_type, msg, ) def update_types_from_source(modeladmin, request, queryset): return_url = ( reverse( "admin:%s_%s_changelist" % (modeladmin.model._meta.app_label, modeladmin.model._meta.model_name) ) + "?" + urllib.parse.urlencode(request.GET) ) if queryset.count() != 1: send_error_message( request, str(_("Select only one source.")), message_type=messages.WARNING, ) return HttpResponseRedirect(return_url) source = queryset.all()[0] created, updated, deleted = 0, 0, 0 missing_models, missing_types = [], [] config = {} for item_type in ("operation", "contextrecord", "file", "find", "warehouse"): curl = source.url if not curl.endswith("/"): curl += "/" curl += f"api/facets/{item_type}/" try: response = requests.get( curl, timeout=20, headers={"Authorization": f"Token {source.key}"}, ) except requests.exceptions.Timeout: send_error_message( request, str(_("Timeout: failed to join {}.")).format(curl), ) continue if response.status_code != 200: send_error_message( request, str( _( "Bad response for {} - {}. Response status code {} != 200. " "No access to {}. If it is not expected, check your key." ) ).format(source.name, curl, response.status_code, item_type), message_type=messages.WARNING ) continue try: content = response.json() except ValueError: send_error_message( request, str(_("Response of {} is not a valid JSON message.")).format(curl), ) continue if "config" in content: for k in content["config"]: if not content["config"][k]: continue if k not in config: config[k] = "" elif config[k]: config[k] += "||" config[k] += content["config"][k] result = source.update_matches(content) if result.get("created", None): created += result['created'] if result.get("updated", None): updated += result["updated"] if result.get("deleted", None): deleted += result["deleted"] if result.get("search_model do not exist", None): missing_models += result["search_model do not exist"] if result.get("type do not exist", None): missing_types += result["type do not exist"] modified = False for k in config: if getattr(source, k) != config[k]: modified = True setattr(source, k, config[k]) if modified: source.save() messages.add_message( request, messages.INFO, str(_("Table/exports configuration updated")), ) if created: messages.add_message( request, messages.INFO, str(_(f"{created} matches created")), ) if updated: messages.add_message( request, messages.INFO, str(_(f"{updated} matches updated")), ) if deleted: messages.add_message( request, messages.INFO, str(_(f"{deleted} matches deleted")), ) if missing_models: missing_models = ", ".join(set(missing_models)) messages.add_message( request, messages.INFO, str( _( f"Theses search models have not been found: {missing_models}. Not the same Ishtar version?" ) ), ) if missing_types: missing_types = ", ".join(set(missing_types)) messages.add_message( request, messages.INFO, str( _( f"Theses types have not been found: {missing_types}. Not the same Ishtar version?" ) ), ) return HttpResponseRedirect(return_url) update_types_from_source.short_description = _( "Update table, export format and types from source" ) def generate_match_document(modeladmin, request, queryset): return_url = ( reverse( f"admin:{modeladmin.model._meta.app_label}_{modeladmin.model._meta.model_name}_changelist" ) + "?" + urllib.parse.urlencode(request.GET) ) if queryset.count() != 1: send_error_message( request, str(_("Select only one source.")), message_type=messages.WARNING, ) return HttpResponseRedirect(return_url) src_doc = queryset.all()[0].generate_match_document() in_memory = BytesIO() if not src_doc: send_error_message( request, str(_("Document not generated: is the LibreOffice daemon configured and running?")), message_type=messages.ERROR, ) return HttpResponseRedirect(return_url) with open(src_doc, "rb") as fle: in_memory.write(fle.read()) filename = src_doc.split(os.sep)[-1] response = HttpResponse( content_type="application/vnd.oasis.opendocument.spreadsheet" ) response["Content-Disposition"] = "attachment; filename=%s" % filename.replace( " ", "_" ) in_memory.seek(0) response.write(in_memory.read()) return response generate_match_document.short_description = _("Generate match document") def update_association_from_match_document(modeladmin, request, queryset): return_url = ( reverse( "admin:%s_%s_changelist" % (modeladmin.model._meta.app_label, modeladmin.model._meta.model_name) ) + "?" + urllib.parse.urlencode(request.GET) ) if queryset.count() != 1: return send_error_message( request, str(_("Select only one source.")), return_url, message_type=messages.WARNING, ) result = queryset.all()[0].update_from_match_document() if not result: messages.add_message( request, messages.ERROR, str(_("Error on update. Cannot open match document.")), ) else: if result["errors"]: errors = ( "

" + str(_("Error on type update from match document:")) + "

" ) errors += "" messages.add_message(request, messages.ERROR, mark_safe(errors)) messages.add_message( request, messages.WARNING, str(_(f"{result['updated']} match key(s) updated.")), ) return HttpResponseRedirect(return_url) update_association_from_match_document.short_description = _( "Update association from match document" ) class ApiExternalSourceAdmin(admin.ModelAdmin): model = models_rest.ApiExternalSource actions = [ update_types_from_source, generate_match_document, update_association_from_match_document, ] exclude = ("search_columns", "search_columns_label", "exports", "exports_label") autocomplete_fields = ["users"] list_display = ("name", "url", "key") admin_site.register(models_rest.ApiExternalSource, ApiExternalSourceAdmin) class ApiKeyMatchAdmin(admin.ModelAdmin): model = models_rest.ApiKeyMatch list_display = [ "source", "search_model", "associated_type", "local_slug", "local_label", "distant_slug", "distant_label", "do_not_match", ] list_filter = ["source", "do_not_match"] search_fields = ["associated_type__model", "distant_slug", "distant_label"] actions = [ change_value("do_not_match", False, _("Enable match")), change_value("do_not_match", True, _("Disable match")), ] admin_site.register(models_rest.ApiKeyMatch, ApiKeyMatchAdmin) class ApiSheetFilterForm(BaseSheetFilterForm): api_search_model = forms.ModelChoiceField( models_rest.ApiSearchModel.objects, label=_("API - Remote access - Search model"), ) class Meta: model = models_rest.ApiSheetFilter fields = ["api_search_model", "key"] content_type_field = "api_search_model" def _get_content_type_model(self, instance): return instance.api_search_model.content_type.model_class() class ApiSheetFilterAdmin(admin.ModelAdmin): form = ApiSheetFilterForm model = models_rest.ApiSheetFilter list_display = ["api_search_model", "key"] list_filter = ["api_search_model"] admin_site.register(models_rest.ApiSheetFilter, ApiSheetFilterAdmin) class GeoVectorDataForm(forms.ModelForm): class Meta: model = models_common.GeoVectorData exclude = [] def clean(self): fields = [ "x", "point_2d", "point_3d", "multi_points", "multi_line", "multi_polygon", ] non_empty = [f for f in fields if self.cleaned_data.get(f, None)] if len(non_empty) > 1: raise forms.ValidationError( _( "Only one type of geographic data is accepted. " "Create different objects if you have many." ) ) if bool(self.cleaned_data.get("x", None)) != bool( self.cleaned_data.get("y", None) ): raise forms.ValidationError( _("You cannot set only x or only y coordinate.") ) return self.cleaned_data class GeoVectorDataAdmin(admin.ModelAdmin): model = models_common.GeoVectorData search_fields = ["name"] exclude = ["imports", "imports_updated", "timestamp_geo", "timestamp_label"] list_display = ["name", "origin", "data_type", "provider", "source_content_type"] list_filter = ["origin", "data_type", "provider"] admin_site.register(models_common.GeoVectorData, GeoVectorDataAdmin) class GeoDataItem(admin.ModelAdmin): autocomplete_fields = ["main_geodata", "geodata"] class MainGeoDataItem(GeoDataItem): def get_exclude(self, request, obj=None): exclude = super().get_exclude(request, obj) return tuple(exclude or []) + tuple([ "x", "y", "z", "estimated_error_x", "estimated_error_y", "estimated_error_z", "spatial_reference_system", "point", "point_2d", "point_source", "point_source_item", "multi_polygon", "multi_polygon_source", "multi_polygon_source_item", ])