#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2012-2026 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. from io import BytesIO import os import tempfile from django import forms from django.conf import settings from django.contrib import messages from django.http import HttpResponse, HttpResponseRedirect from django.shortcuts import get_object_or_404, render from django.urls import re_path, reverse from django.utils.translation import gettext_lazy as _ from ishtar_common.apps import admin_site from ishtar_common.admin import ( HistorizedObjectAdmin, GeneralTypeAdmin, export_as_csv_action, serialize_type_action, ImportActionAdmin, ) from . import models class FileAdmin(HistorizedObjectAdmin): list_display = ["year", "numeric_reference", "file_type", "name"] if settings.COUNTRY == "fr": list_display += ["saisine_type", "permit_reference"] list_filter = ["file_type", "year"] if settings.COUNTRY == "fr": list_filter += ["saisine_type"] search_fields = ("cached_label", "name", "towns__name", "permit_reference") autocomplete_fields = HistorizedObjectAdmin.autocomplete_fields + [ "in_charge", "general_contractor", "corporation_general_contractor", "responsible_town_planning_service", "planning_service", "organization", "scientist", "main_town", "towns", "related_file", ] readonly_fields = HistorizedObjectAdmin.readonly_fields + [ "raw_general_contractor", "raw_town_planning_service", "cached_label", "imported_line", ] exclude = ["documents", "main_image"] model = models.File admin_site.register(models.File, FileAdmin) general_models = [ models.FileType, models.PermitType, models.GenericEquipmentServiceType, models.OperationTypeForRoyalties, models.AgreementType, models.DevelopmentType, models.MonitoringJustificationType ] if settings.COUNTRY == "fr": general_models.append(models.SaisineType) for model in general_models: admin_site.register(model, GeneralTypeAdmin) class JobAdmin(GeneralTypeAdmin): list_filter = ("available", "price_agreement") LIST_DISPLAY = [ "label", "price_agreement", "permanent_contract", "order", "ground_daily_cost", "daily_cost", "default_daily_need_on_ground", "default_daily_need", "child", "available", ] admin_site.register(models.Job, JobAdmin) class EquipmentServiceTypeAdmin(GeneralTypeAdmin): list_filter = ("available", "generic_equipment_type") extra_list_display = ["generic_equipment_type", "order"] admin_site.register(models.EquipmentServiceType, EquipmentServiceTypeAdmin) class EquipmentServiceCostAdmin(ImportActionAdmin): search_fields = ( "equipment_service_type__label", "service_provider", ) list_filter = ("available", "price_agreement") list_display = [ "equipment_service_type", "price_agreement", "specificity", "parent", "unitary_cost", "unit", "flat_rate", "default_quantity_by_day", "available", ] actions = [export_as_csv_action(), serialize_type_action] model = models.EquipmentServiceCost admin_site.register(models.EquipmentServiceCost, EquipmentServiceCostAdmin) class CopyPriceForm(forms.Form): source = forms.ChoiceField( label=_("Copy from"), choices=[], ) destination = forms.ChoiceField( label=_("To"), choices=[], ) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) choices = [ (idx, label) for idx, label in models.PriceAgreement.objects.values_list("id", "label") ] self.fields["source"].choices = choices self.fields["destination"].choices = choices def clean(self): if self.cleaned_data.get("source", None) \ == self.cleaned_data.get("destination", None): raise forms.ValidationError( _("Source and destination must be different.") ) return self.cleaned_data class CopyPriceAgreementAdmin(GeneralTypeAdmin): change_list_template = "admin/copy_price_change_list.html" def get_urls(self): urls = super(CopyPriceAgreementAdmin, self).get_urls() my_urls = [ re_path(r"^copy-price-agreement/$", self.copy_price_agreement), ] return my_urls + urls def copy_price_agreement(self, request): form = None if not request.user.is_superuser and ( not hasattr(request.user, "ishtaruser") or not request.user.ishtaruser.has_permission( "archaeological_files.change_priceagreement")): self.message_user( request, str(_("Cannot change price agreement.")) ) return HttpResponseRedirect(reverse("admin:login")) if "apply" in request.POST: form = CopyPriceForm(request.POST) if form.is_valid(): created, already_here = 0, 0 source = get_object_or_404( models.PriceAgreement, pk=form.cleaned_data["source"] ) destination = get_object_or_404( models.PriceAgreement, pk=form.cleaned_data["destination"] ) for model, slug_name, has_child in ( (models.Job, "txt_idx", True), (models.EquipmentServiceCost, "slug", False) ): base_query = model.objects.filter(price_agreement_id=source.pk) if not has_child: items_queries = [base_query] else: items_queries = [ base_query.filter(child__isnull=True), base_query.filter(child__isnull=False), ] children = {} for items in items_queries: for item in items.all(): slug = getattr(item, slug_name).split("--")[0] + "--" + \ str(destination.pk) q = model.objects.filter(**{slug_name: slug}) if q.count(): already_here += 1 continue current_pk = item.pk new_item = item new_item.pk = None setattr(new_item, slug_name, slug) new_item.price_agreement_id = destination.pk if has_child and new_item.child: if new_item.child.pk not in children: # previous child attached to a bad contract # cannot correct automatically pass else: new_item.child_id = children[new_item.child.pk] new_item.save() created += 1 if has_child and not new_item.child: children[current_pk] = new_item.pk if created: self.message_user( request, str(_("{} item(s) created.")).format(created) ) if already_here: self.message_user( request, str(_("{} item(s) already in database.")).format( already_here) ) c_url = reverse( "admin:{}_{}_changelist".format( self.model._meta.app_label, self.model._meta.model_name ) ) return HttpResponseRedirect(c_url) if not form: form = CopyPriceForm() return render( request, "admin/copy_price.html", {"file_form": form, "current_action": "import_generic"}, ) def export_prices(modeladmin, request, queryset): if len(queryset) != 1: error = str(_("Select only one price agreement.")) modeladmin.message_user(request, error, level=messages.WARNING) return price_agreement = queryset[0] with tempfile.TemporaryDirectory(prefix="ishtar-prices") as tmpdir: pa_doc = price_agreement.generate_summary_document(tmpdir) if not pa_doc: modeladmin.message_user( request, str(_("Document not generated: is the LibreOffice daemon configured and running?")), level=messages.ERROR ) return in_memory = BytesIO() with open(pa_doc, "rb") as fle: in_memory.write(fle.read()) filename = pa_doc.split(os.sep)[-1].replace(" ", "_") response = HttpResponse( content_type="application/vnd.oasis.opendocument.spreadsheet" ) response["Content-Disposition"] = f"attachment; filename={filename}" in_memory.seek(0) response.write(in_memory.read()) return response export_prices.short_description = _("Export prices") class PriceAgreementAdmin(CopyPriceAgreementAdmin): list_filter = ("available",) extra_list_display = [ "start_date", "end_date", ] actions = [export_prices] model = models.PriceAgreement admin_site.register(models.PriceAgreement, PriceAgreementAdmin)