#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2016-2025 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import datetime import lxml.etree import lxml.builder import os import shutil import tempfile import zipfile from django.conf import settings from django.contrib.gis.db import models from django.contrib.postgres.indexes import GinIndex from django.db.models import Max, Q from django.db.models.signals import post_save, post_delete, pre_delete, m2m_changed from django.template.defaultfilters import slugify from django.utils import timezone from django.urls import reverse from ishtar_common.utils import gettext_lazy as _, pgettext_lazy from archaeological_finds.models_finds import Find, FindBasket, TreatmentType, \ FindTreatment from archaeological_operations.models import ClosedItem, Operation from archaeological_context_records.models import Dating from archaeological_warehouse.models import Warehouse, Container from ishtar_common.model_managers import ExternalIdManager from ishtar_common.models import ( BaseHistorizedItem, DashboardFormItem, Document, document_attached_changed, DocumentItem, GeneralType, HistoryModel, ImageModel, MainItem, OrderedType, Organization, OwnPerms, Person, post_save_cache, SearchVectorConfig, ValueGetter, ) from ishtar_common.models_common import CompleteIdentifierItem, HistoricalRecords from ishtar_common.utils import ( cached_label_changed, get_current_year, m2m_historization_changed, SearchAltName, update_data, ) class TreatmentState(GeneralType): executed = models.BooleanField(_("Treatment is executed"), default=False) order = models.IntegerField(verbose_name=_("Order"), default=10) class Meta: verbose_name = _("Treatment state type - deprecated") verbose_name_plural = _("Treatment state types - deprecated") ordering = ( "order", "label", ) ADMIN_SECTION = _("Treatments") post_save.connect(post_save_cache, sender=TreatmentState) post_delete.connect(post_save_cache, sender=TreatmentState) class TreatmentInputStatus(OrderedType): executed = models.BooleanField(_("Treatment is executed"), default=False) class Meta: verbose_name = _("Treatment - Input status type") verbose_name_plural = _("Treatment - Input status types") ordering = ( "order", "label", ) ADMIN_SECTION = _("Treatments") @classmethod def get_default(cls): q = cls.objects.filter(executed=False) if not q.count(): return None return q.all()[0].pk @classmethod def get_validated_state(cls): treat_state, __ = cls.objects.get_or_create( txt_idx='validated', defaults={ 'label': _("Validated"), 'executed': True, 'available': True}) return treat_state post_save.connect(post_save_cache, sender=TreatmentInputStatus) post_delete.connect(post_save_cache, sender=TreatmentInputStatus) class TreatmentStatus(OrderedType): class Meta: verbose_name = _("Treatment - Treatment status type") verbose_name_plural = _("Treatment - Treatment status types") ordering = ( "order", "label", ) ADMIN_SECTION = _("Treatments") @classmethod def get_completed_state(cls): treat_state, __ = cls.objects.get_or_create( txt_idx='completed', defaults={ 'label': _("Completed"), 'available': True}) return treat_state post_save.connect(post_save_cache, sender=TreatmentStatus) post_delete.connect(post_save_cache, sender=TreatmentStatus) class AssociatedFindBasket: @property def associated_basket(self): if not self.associated_basket_id: return try: return FindBasket.objects.get(pk=self.associated_basket_id) except FindBasket.DoesNotExist: return def pre_save_basket(self): associated_basket = self.associated_basket if self.associated_basket_id and not associated_basket: self.associated_basket_id = None elif associated_basket and associated_basket.pk != self.associated_basket_id: self.associated_basket_id = associated_basket.pk @classmethod def has_q_associated_basket_item_ids(cls): # used by permission queries return cls.objects.filter( associated_basket_id__in=FindBasket.objects.filter( items__pk__isnull=False).values_list("pk", flat=True) ) @classmethod def q_associated_basket_item_ids(cls, ids): # used by permission queries return cls.objects.filter( associated_basket_id__in=FindBasket.objects.filter( items__pk__in=ids ) ) def get_extra_values(self, prefix="", no_values=False, filtr=None, **kwargs): values = {} basket_key = prefix + "associated_basket_items" if filtr and basket_key not in filtr: return values basket = self.associated_basket if not basket: return values values[basket_key] = [ item.get_values(no_values=True, filtr=filtr, **kwargs) for item in basket.items.distinct().all() ] return values class Treatment( DashboardFormItem, AssociatedFindBasket, DocumentItem, BaseHistorizedItem, CompleteIdentifierItem, ImageModel, OwnPerms, ValueGetter, MainItem, ): SLUG = "treatment" APP = "archaeological-finds" MODEL = SLUG SHOW_URL = "show-treatment" TABLE_COLS = ( "year", "index", "treatment_types__label", "treatment_status__label", "label", "scientific_monitoring_manager__cached_label", "person__cached_label", "start_date", "downstream_cached_label", "upstream_cached_label", ) REVERSED_BOOL_FIELDS = [ "documents__image__isnull", "documents__associated_file__isnull", "documents__associated_url__isnull", ] EXTRA_REQUEST_KEYS = { "downstream_cached_label": "downstream__cached_label", "upstream_cached_label": "upstream__cached_label", "person__cached_label": "person__cached_label", "scientific_monitoring_manager__cached_label": "scientific_monitoring_manager__cached_label", "person__pk": "person__pk", # used by dynamic_table_documents } COL_LABELS = { "downstream_cached_label": _("Downstream find"), "upstream_cached_label": _("Upstream find"), "treatment_types__label": _("Type"), "treatment_status__label": _("State"), "person__cached_label": _("Responsible of the treatment"), "scientific_monitoring_manager__cached_label": _( "Scientific monitoring manager" ), } # extra keys than can be passed to save method EXTRA_SAVED_KEYS = ( "items", "user", "resulting_find", "upstream_items", "resulting_finds", "upstream_item", "treatment_type_list", ) # alternative names of fields for searches ALT_NAMES = { "label": SearchAltName( pgettext_lazy("key for text search", "label"), "label__iexact" ), "reference": SearchAltName( pgettext_lazy("key for text search", "reference"), "reference__iexact", ), "year": SearchAltName(pgettext_lazy("key for text search", "year"), "year"), "index": SearchAltName(pgettext_lazy("key for text search", "index"), "index"), "treatment_types": SearchAltName( pgettext_lazy("key for text search", "type"), "treatment_types__label__iexact", ), "scientific_monitoring_manager": SearchAltName( pgettext_lazy("key for text search", "scientific-manager"), "scientific_monitoring_manager__cached_label__iexact", related_name="scientific_monitoring_manager" ), "input_status": SearchAltName( pgettext_lazy("key for text search", "input-status"), "input_status__label__iexact", ), "treatment_status": SearchAltName( pgettext_lazy("key for text search", "treatment-status"), "treatment_status__label__iexact", ), "location": SearchAltName( pgettext_lazy("key for text search", "location"), "location__name__iexact", ), "person": SearchAltName( pgettext_lazy("key for text search", "responsible-of-treatment"), "person__cached_label__iexact", ), "organization": SearchAltName( pgettext_lazy("key for text search", "organization"), "organization__name__iexact", ), "start_date": SearchAltName( pgettext_lazy("key for text search", "start"), "start_date", ), "end_date": SearchAltName( pgettext_lazy("key for text search", "end"), "end_date", ), } ALT_NAMES.update(BaseHistorizedItem.ALT_NAMES) ALT_NAMES.update(DocumentItem.ALT_NAMES) DATED_FIELDS = BaseHistorizedItem.DATED_FIELDS + [ "start_date", "end_date", "creation_date" ] DATETIME_FIELDS = BaseHistorizedItem.DATETIME_FIELDS + ["creation_date"] DEFAULT_SEARCH_FORM = ("archaeological_finds.forms_treatments", "TreatmentSelect") HISTORICAL_M2M = [ "treatment_types", ] NUMBER_FIELDS = ["year", "index"] BASE_SEARCH_VECTORS = [ SearchVectorConfig("treatment_types__label"), SearchVectorConfig("treatment_status__label"), SearchVectorConfig("label"), SearchVectorConfig("goal", "local"), SearchVectorConfig("external_id"), SearchVectorConfig("comment", "local"), SearchVectorConfig("description", "local"), SearchVectorConfig("reference"), ] PROPERTY_SEARCH_VECTORS = [ SearchVectorConfig("year_index"), ] INT_SEARCH_VECTORS = [ SearchVectorConfig("year"), SearchVectorConfig("index"), ] M2M_SEARCH_VECTORS = [ SearchVectorConfig("downstream__cached_label"), SearchVectorConfig("upstream__cached_label"), ] PARENT_SEARCH_VECTORS = ["person", "organization"] GET_VALUES_M2M = ["treatment_types"] UPPER_PERMISSIONS = [ (("archaeological_finds", "treatmentfile"), "file_id"), (("archaeological_finds", "find"), "downstream__pk"), (("archaeological_finds", "find"), "upstream__pk"), (("archaeological_finds", "find"), "finds__pk"), ] objects = ExternalIdManager() label = models.CharField(_("Label"), blank=True, null=True, max_length=200) reference = models.CharField( _("Reference"), blank=True, null=True, max_length=200 ) year = models.IntegerField(_("Year"), default=get_current_year) index = models.IntegerField(_("Index"), default=1) file = models.ForeignKey( "TreatmentFile", related_name="treatments", blank=True, null=True, on_delete=models.SET_NULL, verbose_name=_("Associated request"), ) treatment_types = models.ManyToManyField( TreatmentType, verbose_name=_("Treatment type") ) treatment_state = models.ForeignKey( TreatmentState, verbose_name=_("State - deprecated use input_status and treatment_status"), blank=True, null=True, on_delete=models.PROTECT, ) input_status = models.ForeignKey( TreatmentInputStatus, verbose_name=_("Input status"), default=TreatmentInputStatus.get_default, blank=True, null=True, on_delete=models.SET_NULL, ) treatment_status = models.ForeignKey( TreatmentStatus, verbose_name=_("Treatment status"), blank=True, null=True, on_delete=models.SET_NULL, ) executed = models.BooleanField(_("Treatment have been executed"), default=False) location = models.ForeignKey( Warehouse, verbose_name=_("Location"), blank=True, null=True, on_delete=models.SET_NULL, help_text=_( "Location where the treatment is done. Target warehouse for " "a move." ), ) person = models.ForeignKey( Person, verbose_name=_("Responsible of the treatment"), blank=True, null=True, on_delete=models.SET_NULL, related_name="treatments", ) scientific_monitoring_manager = models.ForeignKey( Person, verbose_name=_("Scientific monitoring manager"), blank=True, null=True, on_delete=models.SET_NULL, related_name="manage_treatments", ) organization = models.ForeignKey( Organization, verbose_name=_("Organization"), blank=True, null=True, on_delete=models.SET_NULL, related_name="treatments", ) external_id = models.CharField( _("External ID"), blank=True, null=True, max_length=200 ) comment = models.TextField(_("Comment"), blank=True, default="") description = models.TextField(_("Description"), blank=True, default="") goal = models.TextField(_("Goal"), blank=True, default="") start_date = models.DateField(_("Start date"), blank=True, null=True) end_date = models.DateField(_("End date"), blank=True, null=True) creation_date = models.DateTimeField(default=timezone.now) container = models.ForeignKey( Container, verbose_name=_("Container"), on_delete=models.SET_NULL, blank=True, null=True, ) estimated_cost = models.FloatField(_("Estimated cost"), blank=True, null=True) quoted_cost = models.FloatField(_("Quoted cost"), blank=True, null=True) realized_cost = models.FloatField(_("Realized cost"), blank=True, null=True) insurance_cost = models.FloatField(_("Insurance cost"), blank=True, null=True) documents = models.ManyToManyField( Document, related_name="treatments", verbose_name=_("Documents"), blank=True ) main_image = models.ForeignKey( Document, related_name="main_image_treatments", on_delete=models.SET_NULL, verbose_name=_("Main image"), blank=True, null=True, ) # prevent circular imports... associated_basket_id = models.PositiveIntegerField( verbose_name=_("Basket ID"), blank=True, null=True ) # associated_basket = models.ForeignKey(FindBasket) history = HistoricalRecords(bases=[HistoryModel]) class Meta: verbose_name = _("Treatment") verbose_name_plural = _("Treatments") unique_together = ("year", "index") permissions = ( ("view_own_treatment", "Can view own Treatment"), ("change_own_treatment", "Can change own Treatment"), ("delete_own_treatment", "Can delete own Treatment"), ) ordering = ("-year", "-index", "-start_date") indexes = [ GinIndex(fields=["data"]), ] ADMIN_SECTION = _("Treatments") def __str__(self): return self.cached_label or "" @property def short_class_name(self): return _("TREATMENT") @property def limited_finds(self): return self.finds.all()[:15] def natural_key(self): return (self.external_id,) @property def year_index(self): return "{}-{}".format(self.year, self.index) @classmethod def get_query_owns(cls, ishtaruser): return ( Q(history_creator=ishtaruser.user_ptr) | Q(person__ishtaruser=ishtaruser) ) & Q(end_date__isnull=True) @classmethod def get_owns( cls, user, menu_filtr=None, limit=None, values=None, get_short_menu_class=None, no_auth_check=False, query=False ): replace_query = None if menu_filtr: if "treatmentfile" in menu_filtr: replace_query = Q(file=menu_filtr["treatmentfile"]) if "find" in menu_filtr and "basket" not in str(menu_filtr["find"]): q = Q(upstream=menu_filtr["find"]) | Q(downstream=menu_filtr["find"]) replace_query = replace_query | q if replace_query else q owns = super(Treatment, cls).get_owns( user, replace_query=replace_query, limit=limit, values=values, get_short_menu_class=get_short_menu_class, no_auth_check=no_auth_check, query=query ) if query: return owns return cls._return_get_owns(owns, values, get_short_menu_class) def get_query_operations(self): return Operation.objects.filter( context_record__base_finds__find__downstream_treatment=self ) def _generate_cached_label(self): label = self._profile_generate_cached_label() if label: return label items = [ str(getattr(self, k)) for k in ["year", "index", "reference", "label"] if getattr(self, k) ] return "{} | {}".format("-".join(items), self.treatment_types_lbl()) def _get_base_image_path( self, ): return "{}/{}/{}".format(self.SLUG, self.year, self.index) def treatment_types_lbl(self): """ Treatment types label :return: string """ return " ; ".join(str(t) for t in self.treatment_types.all()) treatment_types_lbl.short_description = _("Treatment types") treatment_types_lbl.admin_order_field = "treatment_types__label" def downstream_lbl(self): """ Downstream finds label :return: string """ return " ; ".join(f.cached_label for f in self.downstream.all()) downstream_lbl.short_description = _("Downstream finds") downstream_lbl.admin_order_field = "downstream__cached_label" def upstream_lbl(self): """ Upstream finds label :return: string """ return " ; ".join(f.cached_label for f in self.upstream.all()) upstream_lbl.short_description = _("Upstream finds") upstream_lbl.admin_order_field = "upstream__cached_label" def get_extra_actions(self, request): # url, base_text, icon, extra_text, extra css class, is a quick action actions = super(Treatment, self).get_extra_actions(request) if self.can_do(request, "archaeological_operations.add_administrativeact"): actions += [ ( reverse("treatment-add-adminact", args=[self.pk]), _("Add associated administrative act"), "fa fa-plus", _("admin. act"), "", False, ), ] return actions def get_extra_values(self, prefix="", no_values=False, filtr=None, **kwargs): values = super().get_extra_values( prefix=prefix, no_values=False, filtr=None, **kwargs ) if not filtr or prefix + "upstream_finds" in filtr: values[prefix + "upstream_finds"] = " ; ".join( str(up) for up in self.upstream.all() ) if not filtr or prefix + "downstream_finds" in filtr: values[prefix + "downstream_finds"] = " ; ".join( str(up) for up in self.downstream.all() ) if not filtr or prefix + "operations" in filtr: values[prefix + "operations"] = " ; ".join( str(ope) for ope in self.get_query_operations().all() ) if "associatedfind_" not in prefix and self.upstream.count(): find = self.upstream.all()[0] new_prefix = prefix + "associatedfind_" values.update( find.get_values( prefix=new_prefix, no_values=True, filtr=filtr, **kwargs ) ) return values def pre_save(self): super(Treatment, self).pre_save() # is not new if self.pk is not None: return self.index = 1 q = Treatment.objects.filter(year=self.year) if q.count(): self.index = q.all().aggregate(Max("index"))["index__max"] + 1 def _create_n_1_resulting_find( self, resulting_find, upstream_items, treatment_types ): """ Manage creation of n<->1 treatment """ m2m = {} base_fields = [f.name for f in Find._meta.get_fields()] for k in list(resulting_find.keys()): # if not in base fields should be a m2m if k not in base_fields: values = resulting_find.pop(k) if values: m2m[k + "s"] = values resulting_find["history_modifier"] = self.history_modifier new_find = Find.objects.create(**resulting_find) for k, v in m2m.items(): m2m_field = getattr(new_find, k) try: for value in m2m[k]: m2m_field.add(value) except TypeError: m2m_field.add(v) create_new_find = bool([tp for tp in treatment_types if tp.create_new_find]) current_base_finds = [] current_documents = [] for upstream_item in upstream_items: # datings are not explicitly part of the resulting_find # need to reassociate with no duplicate for dating in upstream_item.datings.all(): is_present = any( Dating.is_identical(current_dating, dating) for current_dating in new_find.datings.all() ) if is_present: continue dating.pk = None # duplicate dating.save() new_find.datings.add(dating) # associate base finds for base_find in upstream_item.base_finds.all(): if base_find.pk in current_base_finds: continue current_base_finds.append(base_find.pk) new_find.base_finds.add(base_find) # documents for document in upstream_item.documents.all(): if document.pk in current_documents: continue current_documents.append(document.pk) new_find.documents.add(document) # data new_find.data = update_data(new_find.data, upstream_item.data, merge=True) if create_new_find: upstream_item.downstream_treatment = self upstream_item.history_modifier = self.history_modifier upstream_item.save() else: self.finds.add(upstream_item) new_find.upstream_treatment = self new_find.skip_history_when_saving = True new_find.save() def _create_1_n_resulting_find( self, resulting_finds, upstream_item, user, treatment_types ): """ Manage creation of 1<->n treatment """ new_items = [] start_number = resulting_finds["start_number"] for idx in range(resulting_finds["number"]): label = resulting_finds["label"] + str(start_number + idx) new_find = Find.objects.get(pk=upstream_item.pk).duplicate(user) new_find.upstream_treatment = self new_find.label = label new_find.skip_history_when_saving = True new_find.save() new_items.append(new_find) create_new_find = bool([tp for tp in treatment_types if tp.create_new_find]) if create_new_find: upstream_item.downstream_treatment = self upstream_item.skip_history_when_saving = True upstream_item.save() else: self.finds.add(upstream_item) if getattr(user, "ishtaruser", None): b = FindBasket.objects.create( label=resulting_finds["basket_name"], user=user.ishtaruser ) for item in new_items: b.items.add(item) @property def associated_filename(self): return "-".join( str(slugify(getattr(self, attr))) for attr in ("year", "index", "label") ) def get_find_treatment_list(self): return FindTreatment.objects.filter(treatment=self).all() def clean_cache(self): self._saved_container_attributes, self._is_loan_return, self._is_new_find_creator = None, None, None self._is_current_container_changer, self._is_reference_container_changer = None, None @property def is_loan(self): if getattr(self, "_is_loan", None) is None: self._is_loan = any( 1 for tp in self.treatment_types.all() if not tp.change_reference_location and tp.change_current_location ) return self._is_loan @property def is_loan_return(self): if getattr(self, "_is_loan_return", None) is None: self._is_loan_return = any( 1 for tp in self.treatment_types.all() if tp.restore_reference_location ) return self._is_loan_return @property def is_new_find_creator(self): if getattr(self, "_is_new_find_creator", None) is None: self._is_new_find_creator = any( 1 for tp in self.treatment_types.all() if tp.create_new_find ) return self._is_new_find_creator @property def is_current_container_changer(self): if getattr(self, "_is_current_container_changer", None) is None: self._is_current_container_changer = self.treatment_types.filter( change_current_location=True ).exists() return self._is_current_container_changer @property def is_reference_container_changer(self): if getattr(self, "_is_reference_container_changer", None) is None: self._is_reference_container_changer = self.treatment_types.filter( change_reference_location=True).exists() return self._is_reference_container_changer def save(self, *args, **kwargs): self.pre_save_basket() items, user, extra_args_for_new, resulting_find = [], None, [], None upstream_items, upstream_item, resulting_finds = [], None, None treatment_types, return_new = [], False if "items" in kwargs: items = kwargs.pop("items") if "resulting_find" in kwargs: resulting_find = kwargs.pop("resulting_find") if "resulting_finds" in kwargs: resulting_finds = kwargs.pop("resulting_finds") if "upstream_items" in kwargs: upstream_items = kwargs.pop("upstream_items") if "upstream_item" in kwargs: upstream_item = kwargs.pop("upstream_item") if "user" in kwargs: user = kwargs.pop("user") if "extra_args_for_new" in kwargs: extra_args_for_new = kwargs.pop("extra_args_for_new") if "treatment_type_list" in kwargs: treatment_types = kwargs.pop("treatment_type_list") if "return_new" in kwargs: return_new = kwargs.pop("return_new") self.pre_save() super().save(*args, **kwargs) # reinit cached values self.clean_cache() to_be_executed = not self.executed and self.input_status.executed updated = [] # baskets if hasattr(items, "items"): items = items.items.all() if hasattr(upstream_items, "items"): upstream_items = upstream_items.items.all() if not items and self.finds.count(): items = list(self.finds.all()) if not treatment_types and self.treatment_types.count(): treatment_types = list(self.treatment_types.all()) # execute the treatment if upstream_items and resulting_find: if not to_be_executed: # should not happen but bad validation check... return self._create_n_1_resulting_find( resulting_find, upstream_items, treatment_types ) self.executed = True self.save() return if upstream_item and resulting_finds: if not to_be_executed: # should not happen but bad validation check... return self._create_1_n_resulting_find( resulting_finds, upstream_item, self.history_modifier, treatment_types ) self.executed = True self.save() return create_new_find = self.is_new_find_creator new_items = [] for item in items: if not create_new_find or not to_be_executed: self.finds.add(item) else: self.finds.clear() new = item.duplicate(user) item.downstream_treatment = self item.save() new.upstream_treatment = self for k in extra_args_for_new: setattr(new, k, extra_args_for_new[k]) new.save() updated.append(new.pk) new_items.append(new) # update baskets for basket in FindBasket.objects.filter(items__pk=item.pk).all(): basket.items.remove(item) basket.items.add(new) if to_be_executed or self.executed: self.verify_find_container_history() if not to_be_executed: if return_new: return new_items return # manage loan return if self.loan_return(updated=updated): self.executed = True self.save() # manage containers elif self.container and self.move_finds_to_new_container(updated=updated): self.executed = True self.save() if return_new: return new_items return @property def saved_container_attributes(self): """ Return container attribute to change depending on treatment types """ if getattr(self, "_saved_container_attributes", None) is None: container_attrs = [] if self.is_current_container_changer: container_attrs.append("container") if self.is_reference_container_changer: container_attrs.append("container_ref") self._saved_container_attributes = container_attrs return self._saved_container_attributes def verify_find_container_history(self): container_attrs = self.saved_container_attributes if (not self.container or not container_attrs) and not self.is_loan_return: return False q = self._get_finds_query_for_treatments() if not q.exists(): return False for find in q.all(): q2 = FindTreatment.objects.filter(find=find, treatment=self) for find_treatment in q2.all(): find_treatment.generate_full_location() def _get_finds_query_for_treatments(self): if self.is_new_find_creator: return Find.objects.filter(upstream_treatment=self) return Find.objects.filter(treatments=self) def loan_return(self, updated=None): """ Manage loan return - change find location using treatment info :param treatment_types: if not provided re-fetched from database :param updated: list of already updated finds :return: True if container changed """ if not updated: updated = [] if not self.is_loan_return: return False q = self._get_finds_query_for_treatments() for find in q.all(): if find.container_ref and find.container != find.container_ref: find.container = find.container_ref if find.pk in updated: # don't record twice history find.skip_history_when_saving = True else: updated.append(find.pk) find.save() return True def move_finds_to_new_container(self, container_attrs=None, updated=None): """ Change find location using treatment info :param updated: list of already updated finds to update :return: True if container changed """ if not updated: updated = [] if not container_attrs: container_attrs = self.saved_container_attributes if not container_attrs: return False q = self._get_finds_query_for_treatments() for find in q.all(): for container_attr in container_attrs: setattr(find, container_attr, self.container) if find.pk in updated: # don't record twice history find.skip_history_when_saving = True else: updated.append(find.pk) find.save() return True post_save.connect(cached_label_changed, sender=Treatment) def pre_delete_treatment(sender, **kwargs): treatment = kwargs.get("instance") for find in Find.objects.filter(upstream_treatment=treatment).all(): if find.downstream_treatment: # a new treatment have be done since the deleted treatment # TODO ! # raise NotImplemented() pass find.delete() for find in Find.objects.filter(downstream_treatment=treatment).all(): find.downstream_treatment = None find.save() pre_delete.connect(pre_delete_treatment, sender=Treatment) m2m_changed.connect(document_attached_changed, sender=Treatment.documents.through) for attr in Treatment.HISTORICAL_M2M: m2m_changed.connect( m2m_historization_changed, sender=getattr(Treatment, attr).through ) class AbsFindTreatments(models.Model): find = models.ForeignKey( Find, verbose_name=_("Find"), related_name="%(class)s_related", on_delete=models.DO_NOTHING, ) treatment = models.OneToOneField( Treatment, verbose_name=_("Treatment"), primary_key=True, on_delete=models.DO_NOTHING, ) # primary_key is set to prevent django to ask for an id column # treatment is not a real primary key treatment_nb = models.IntegerField(_("Order")) TABLE_COLS = ["treatment__" + col for col in Treatment.TABLE_COLS] + [ "treatment_nb" ] COL_LABELS = { "treatment__treatment_type": _("Treatment type"), "treatment__start_date": _("Start date"), "treatment__end_date": _("End date"), "treatment__location": _("Location"), "treatment__container": _("Container"), "treatment__person": _("Doer"), "treatment__upstream": _("Related finds"), "treatment__downstream": _("Related finds"), } class Meta: abstract = True def __str__(self): return "{} - {} [{}]".format(self.find, self.treatment, self.treatment_nb) class FindNonModifTreatments(AbsFindTreatments): CREATE_SQL = """ CREATE VIEW find_nonmodif_treatments_tree AS WITH RECURSIVE rel_tree AS ( SELECT f.id AS find_id, of.id AS old_find_id, f.downstream_treatment_id, f.upstream_treatment_id, 1 AS level, ARRAY[]::integer[] AS path_info FROM archaeological_finds_find f INNER JOIN archaeological_finds_find of ON of.downstream_treatment_id = f.upstream_treatment_id WHERE f.downstream_treatment_id is NULL AND f.upstream_treatment_id is NOT NULL UNION ALL SELECT c.id AS find_id, p.old_find_id, c.downstream_treatment_id, c.upstream_treatment_id, p.level + 1, p.path_info||c.downstream_treatment_id FROM archaeological_finds_find c JOIN rel_tree p ON c.downstream_treatment_id = p.upstream_treatment_id AND (p.path_info = ARRAY[]::integer[] OR NOT (c.downstream_treatment_id = ANY(p.path_info[0:array_upper(p.path_info, 1)-1])) ) ) SELECT DISTINCT find_id, old_find_id, path_info, level FROM rel_tree UNION ALL SELECT id AS find_id, id AS old_find_id, ARRAY[]::integer[] AS path_info, 0 AS level FROM archaeological_finds_find f WHERE f.downstream_treatment_id is NULL ORDER BY find_id; CREATE VIEW find_nonmodif_treatments AS SELECT DISTINCT y.find_id, y.old_find_id, ft.treatment_id as treatment_id, 1 AS treatment_nb FROM (SELECT * FROM find_nonmodif_treatments_tree) y INNER JOIN archaeological_finds_find_treatments ft ON ft.find_id = y.old_find_id ORDER BY y.find_id, ft.treatment_id; -- deactivate deletion CREATE RULE find_nonmodif_treatments_del AS ON DELETE TO find_nonmodif_treatments DO INSTEAD DELETE FROM archaeological_finds_find where id=NULL; """ DELETE_SQL = """ DROP VIEW IF EXISTS find_nonmodif_treatments; DROP VIEW IF EXISTS find_nonmodif_treatments_tree; """ TABLE_COLS = [ "treatment__treatment_type", "treatment__upstream", "treatment__start_date", "treatment__end_date", "treatment__location", "treatment__container", "treatment__person", "treatment_nb", ] # search parameters EXTRA_REQUEST_KEYS = {"find_id": "find_id"} class Meta: managed = False db_table = "find_nonmodif_treatments" unique_together = ("find", "treatment") ordering = ("find", "-treatment_nb") class FindUpstreamTreatments(AbsFindTreatments): CREATE_SQL = """ CREATE VIEW find_uptreatments_tree AS WITH RECURSIVE rel_tree AS ( SELECT id AS find_id, upstream_treatment_id, downstream_treatment_id, 1 AS level, ARRAY[]::integer[] AS path_info FROM archaeological_finds_find WHERE upstream_treatment_id is NULL AND downstream_treatment_id is NOT NULL UNION ALL SELECT c.id AS find_id, c.upstream_treatment_id, c.downstream_treatment_id, p.level + 1, p.path_info||c.upstream_treatment_id FROM archaeological_finds_find c JOIN rel_tree p ON c.upstream_treatment_id = p.downstream_treatment_id AND (p.path_info = ARRAY[]::integer[] OR NOT (c.upstream_treatment_id = ANY(p.path_info[0:array_upper(p.path_info, 1)-1])) ) ) SELECT DISTINCT find_id, path_info, level FROM rel_tree ORDER BY find_id; CREATE VIEW find_uptreatments AS SELECT DISTINCT find_id, path_info[nb] AS treatment_id, level - nb + 1 AS treatment_nb FROM (SELECT *, generate_subscripts(path_info, 1) AS nb FROM find_uptreatments_tree) y WHERE path_info[nb] is not NULL ORDER BY find_id, treatment_id; -- deactivate deletion CREATE RULE find_uptreatments_del AS ON DELETE TO find_uptreatments DO INSTEAD DELETE FROM archaeological_finds_find where id=NULL; """ DELETE_SQL = """ DROP VIEW IF EXISTS find_uptreatments; DROP VIEW IF EXISTS find_uptreatments_tree; """ TABLE_COLS = [ "treatment__treatment_type", "treatment__upstream", "treatment__start_date", "treatment__end_date", "treatment__location", "treatment__container", "treatment__person", "treatment_nb", ] # search parameters EXTRA_REQUEST_KEYS = {"find_id": "find_id"} class Meta: managed = False db_table = "find_uptreatments" unique_together = ("find", "treatment") ordering = ("find", "-treatment_nb") class FindDownstreamTreatments(AbsFindTreatments): CREATE_SQL = """ CREATE VIEW find_downtreatments_tree AS WITH RECURSIVE rel_tree AS ( SELECT id AS find_id, downstream_treatment_id, upstream_treatment_id, 1 AS level, ARRAY[]::integer[] AS path_info FROM archaeological_finds_find WHERE downstream_treatment_id is NULL AND upstream_treatment_id is NOT NULL UNION ALL SELECT c.id AS find_id, c.downstream_treatment_id, c.upstream_treatment_id, p.level + 1, p.path_info||c.downstream_treatment_id FROM archaeological_finds_find c JOIN rel_tree p ON c.downstream_treatment_id = p.upstream_treatment_id AND (p.path_info = ARRAY[]::integer[] OR NOT (c.downstream_treatment_id = ANY(p.path_info[0:array_upper(p.path_info, 1)-1])) ) ) SELECT DISTINCT find_id, path_info, level FROM rel_tree ORDER BY find_id; CREATE VIEW find_downtreatments AS SELECT DISTINCT find_id, path_info[nb] AS treatment_id, level - nb + 1 AS treatment_nb FROM (SELECT *, generate_subscripts(path_info, 1) AS nb FROM find_downtreatments_tree) y WHERE path_info[nb] is not NULL ORDER BY find_id, treatment_id; -- deactivate deletion CREATE RULE find_downtreatments_del AS ON DELETE TO find_downtreatments DO INSTEAD DELETE FROM archaeological_finds_find where id=NULL; """ DELETE_SQL = """ DROP VIEW IF EXISTS find_downtreatments; DROP VIEW IF EXISTS find_downtreatments_tree; """ TABLE_COLS = [ "treatment__treatment_type", "treatment__downstream", "treatment__start_date", "treatment__end_date", "treatment__location", "treatment__container", "treatment__person", "treatment_nb", ] # search parameters EXTRA_REQUEST_KEYS = {"find_id": "find_id"} class Meta: managed = False db_table = "find_downtreatments" unique_together = ("find", "treatment") ordering = ("find", "-treatment_nb") class FindTreatments(AbsFindTreatments): CREATE_SQL = """ CREATE VIEW find_treatments AS SELECT find_id, treatment_id, treatment_nb, TRUE as upstream FROM find_uptreatments UNION SELECT find_id, treatment_id, treatment_nb, FALSE as upstream FROM find_downtreatments ORDER BY find_id, treatment_id, upstream; -- deactivate deletion CREATE RULE find_treatments_del AS ON DELETE TO find_treatments DO INSTEAD DELETE FROM archaeological_finds_find where id=NULL; """ DELETE_SQL = """ DROP VIEW IF EXISTS find_treatments; """ upstream = models.BooleanField(_("Is upstream")) class Meta: managed = False db_table = "find_treatments" unique_together = ("find", "treatment") ordering = ("find", "upstream", "-treatment_nb") class TreatmentFileType(GeneralType): treatment_type = models.ForeignKey( TreatmentType, blank=True, null=True, on_delete=models.SET_NULL ) is_exhibition = models.BooleanField(_("Is an exhibition"), default=False) class Meta: verbose_name = _("Treatment request type") verbose_name_plural = _("Treatment request types") ordering = ("label",) ADMIN_SECTION = _("Treatments") post_save.connect(post_save_cache, sender=TreatmentFileType) post_delete.connect(post_save_cache, sender=TreatmentFileType) class TreatmentFile( DashboardFormItem, AssociatedFindBasket, ClosedItem, DocumentItem, BaseHistorizedItem, CompleteIdentifierItem, OwnPerms, ValueGetter, MainItem, ): SLUG = "treatmentfile" APP = "archaeological-finds" MODEL = SLUG SHOW_URL = "show-treatmentfile" DELETE_URL = "delete-treatmentfile" TABLE_COLS = ["type", "year", "index", "internal_reference", "name"] BASE_SEARCH_VECTORS = [ SearchVectorConfig("type__label"), SearchVectorConfig("internal_reference"), SearchVectorConfig("name"), SearchVectorConfig("comment", "local"), ] INT_SEARCH_VECTORS = [ SearchVectorConfig("year"), SearchVectorConfig("index"), ] PARENT_SEARCH_VECTORS = ["in_charge", "applicant", "applicant_organisation"] NUMBER_FIELDS = ["year", "index"] DATED_FIELDS = BaseHistorizedItem.DATED_FIELDS + [ "end_date", "exhibition_start_date", "exhibition_end_date", ] EXTRA_REQUEST_KEYS = { "in_charge__pk": "in_charge__pk", # used by dynamic_table_documents "applicant__pk": "applicant__pk", # used by dynamic_table_documents } REVERSED_BOOL_FIELDS = [ "documents__image__isnull", "documents__associated_file__isnull", "documents__associated_url__isnull", ] UPPER_PERMISSIONS = [ (("archaeological_finds", "find"), "q_associated_basket_item_ids"), ] # alternative names of fields for searches ALT_NAMES = { "name": SearchAltName( pgettext_lazy("key for text search", "name"), "name__iexact" ), "internal_reference": SearchAltName( pgettext_lazy("key for text search", "reference"), "internal_reference__iexact", ), "year": SearchAltName(pgettext_lazy("key for text search", "year"), "year"), "index": SearchAltName(pgettext_lazy("key for text search", "index"), "index"), "type": SearchAltName( pgettext_lazy("key for text search", "type"), "type__label__iexact" ), "in_charge": SearchAltName( pgettext_lazy("key for text search", "in-charge"), "in_charge__cached_label__iexact", ), "applicant": SearchAltName( pgettext_lazy("key for text search", "applicant"), "applicant__cached_label__iexact", related_name="applicant" ), "applicant_organisation": SearchAltName( pgettext_lazy("key for text search", "applicant-organisation"), "applicant_organisation__cached_label__iexact", related_name="applicant_organisation" ), "end_date": SearchAltName( pgettext_lazy("key for text search", "end-date"), "end_date", ), "exhibition_start": SearchAltName( pgettext_lazy("key for text search", "exhibition-start"), "exhibition_start_date", ), "exhibition_end": SearchAltName( pgettext_lazy("key for text search", "exhibition-end"), "exhibition_end_date", ), } ALT_NAMES.update(BaseHistorizedItem.ALT_NAMES) ALT_NAMES.update(DocumentItem.ALT_NAMES) SHEET_EMPTY_KEYS = ["name"] DEFAULT_SEARCH_FORM = ("archaeological_finds.forms_treatments", "TreatmentFileSelect") # fields year = models.IntegerField(_("Year"), default=get_current_year) index = models.IntegerField(_("Index"), default=1) internal_reference = models.CharField( _("Internal reference"), blank=True, null=True, max_length=200 ) external_id = models.CharField( _("Reference"), blank=True, null=True, max_length=200 ) name = models.TextField(_("Name"), blank=True, default="") type = models.ForeignKey( TreatmentFileType, verbose_name=_("Treatment request type"), on_delete=models.PROTECT, ) in_charge = models.ForeignKey( Person, related_name="treatmentfile_responsability", verbose_name=_("Person in charge"), on_delete=models.SET_NULL, blank=True, null=True, ) applicant = models.ForeignKey( Person, related_name="treatmentfile_applicant", verbose_name=_("Applicant"), on_delete=models.SET_NULL, blank=True, null=True, ) applicant_organisation = models.ForeignKey( Organization, related_name="treatmentfile_applicant", verbose_name=_("Applicant organisation"), on_delete=models.SET_NULL, blank=True, null=True, ) end_date = models.DateField(_("End date"), null=True, blank=True) creation_date = models.DateField( _("Creation date"), default=datetime.date.today, blank=True, null=True ) reception_date = models.DateField(_("Reception date"), blank=True, null=True) # exhibition exhibition_name = models.TextField(_("Exhibition name"), blank=True, default="") exhibition_start_date = models.DateField( _("Exhibition start date"), blank=True, null=True ) exhibition_end_date = models.DateField( _("Exhibition end date"), blank=True, null=True ) exhibition_location = models.ForeignKey( Warehouse, verbose_name=_("Exhibition location"), blank=True, null=True, on_delete=models.SET_NULL, ) insurance_provider = models.ForeignKey( Organization, related_name="insurance_provider_of", verbose_name=_("Insurance provider"), on_delete=models.SET_NULL, blank=True, null=True, ) comment = models.TextField(_("Comment"), blank=True, default="") documents = models.ManyToManyField( Document, related_name="treatment_files", verbose_name=_("Documents"), blank=True, ) main_image = models.ForeignKey( Document, related_name="main_image_treatment_files", on_delete=models.SET_NULL, verbose_name=_("Main image"), blank=True, null=True, ) # prevent circular imports... associated_basket_id = models.PositiveIntegerField( verbose_name=_("Basket ID"), blank=True, null=True ) # associated_basket = models.ForeignKey(FindBasket) history = HistoricalRecords() class Meta: verbose_name = _("Treatment request") verbose_name_plural = _("Treatment requests") unique_together = ("year", "index") permissions = ( ("view_own_treatmentfile", "Can view own Treatment request"), ("change_own_treatmentfile", "Can change own Treatment request"), ("delete_own_treatmentfile", "Can delete own Treatment request"), ) ordering = ("cached_label",) indexes = [ GinIndex(fields=["data"]), ] ADMIN_SECTION = _("Treatments") def __str__(self): return self.cached_label or "" def natural_key(self): return (self.year, self.index) def is_gam_exportable(self): self.gam_errors = [] is_ok = True return is_ok def gam_export(self): filename = f"export_gam-{self.exhibition_start_date.year}-{slugify(self.name).replace('-', '_')}.zip" maker = lxml.builder.ElementMaker() document = maker.proposition( maker.action("CREATION"), maker.categorie("PRET"), ) content = b'\n' content += lxml.etree.tostring(document, pretty_print=True) gam_dir = os.path.join(settings.MEDIA_ROOT, "GAM") if not os.path.exists(gam_dir): os.mkdir(gam_dir) media_dir = os.path.join("GAM", str(datetime.date.today().year)) full_media_dir = os.path.join(settings.MEDIA_ROOT, media_dir) if not os.path.exists(full_media_dir): os.mkdir(full_media_dir) final_name = os.path.join(full_media_dir, filename) with tempfile.TemporaryDirectory() as tmp_dir_name: new_file = os.path.join(tmp_dir_name, filename) with zipfile.ZipFile( new_file, mode="a", compression=zipfile.ZIP_DEFLATED) as zf: zf.writestr("pret.xml", content) shutil.move(new_file, final_name) return f"{settings.MEDIA_URL}{media_dir}/{filename}" @property def short_class_name(self): return _("Treatment request") @classmethod def class_verbose_name(cls): # TODO: should be unecessary if MainItem return cls._meta.verbose_name @classmethod def get_query_owns(cls, ishtaruser): return ( Q(history_creator=ishtaruser.user_ptr) | Q(in_charge__ishtaruser=ishtaruser) ) & Q(end_date__isnull=True) @property def associated_filename(self): return "-".join( str(slugify(getattr(self, attr))) for attr in ("year", "index", "internal_reference", "name") if getattr(self, attr) ) def get_extra_actions(self, request): # url, base_text, icon, extra_text, extra css class, is a quick action actions = super(TreatmentFile, self).get_extra_actions(request) if self.can_do(request, "archaeological_operations.add_administrativeact"): actions += [ ( reverse("treatmentfile-add-adminact", args=[self.pk]), _("Add associated administrative act"), "fa fa-plus", _("admin. act"), "", False, ), ] if not self.associated_basket: return actions if ( self.type.treatment_type and self.treatments.filter( treatment_types__pk=self.type.treatment_type.pk ).count() ): # a treatment of this type already exists return actions can_edit_find = self.can_do(request, "archaeological_finds.change_find") if can_edit_find: actions += [ ( reverse("treatmentfile-add-treatment", args=[self.pk]), _("Add associated treatment"), "fa fa-flask", "", "", False, ), ] return actions @classmethod def get_owns( cls, user, menu_filtr=None, limit=None, values=None, get_short_menu_class=None, no_auth_check=False, query=False ): owns = super(TreatmentFile, cls).get_owns( user, limit=limit, values=values, get_short_menu_class=get_short_menu_class, no_auth_check=no_auth_check, query=query ) if query: return owns return cls._return_get_owns(owns, values, get_short_menu_class) def _generate_cached_label(self): label = self._profile_generate_cached_label() if label: return label items = [ str(getattr(self, k)) for k in ["year", "index", "internal_reference", "name"] if getattr(self, k) ] return settings.JOINT.join(items) def _get_base_image_path( self, ): return "{}/{}/{}".format(self.SLUG, self.year, self.index) def pre_save(self): # is not new if self.pk is not None: return self.index = 1 q = TreatmentFile.objects.filter(year=self.year) if q.count(): self.index = q.all().aggregate(Max("index"))["index__max"] + 1 def save(self, *args, **kwargs): self.pre_save() self.pre_save_basket() super().save(*args, **kwargs) m2m_changed.connect(document_attached_changed, sender=TreatmentFile.documents.through) post_save.connect(cached_label_changed, sender=TreatmentFile) class ExhibitionType(GeneralType): treatment_file_type = models.ForeignKey( TreatmentFileType, verbose_name=_("Treatment request type"), on_delete=models.PROTECT, ) class Meta: verbose_name = _("Exhibition type") verbose_name_plural = _("Exhibition types") ordering = ("label",) ADMIN_SECTION = _("Treatments") class Exhibition( DocumentItem, BaseHistorizedItem, CompleteIdentifierItem, OwnPerms, ValueGetter, MainItem, AssociatedFindBasket, ): SLUG = "exhibition" APP = "archaeological-finds" MODEL = SLUG SHOW_URL = "show-exhibition" # DELETE_URL = "delete-exhibition" TABLE_COLS = ["year", "name", "reference"] BASE_SEARCH_VECTORS = [ SearchVectorConfig("exhibition_type__label"), SearchVectorConfig("reference"), SearchVectorConfig("name"), SearchVectorConfig("comment", "local"), ] name = models.TextField(_("Name")) exhibition_type = models.ForeignKey( ExhibitionType, verbose_name=_("Exhibition type"), on_delete=models.PROTECT, ) year = models.IntegerField(_("Year"), default=get_current_year) reference = models.TextField( _("Reference"), blank=True, null=True, default="-" ) in_charge = models.ForeignKey( Person, related_name="exhibitions", verbose_name=_("Person in charge"), on_delete=models.SET_NULL, blank=True, null=True, ) # prevent circular imports... associated_basket_id = models.PositiveIntegerField( verbose_name=_("Basket ID"), blank=True, null=True, help_text=_("Reference basket") ) description = models.TextField(_("Description"), blank=True, default="") comment = models.TextField(_("Comment"), blank=True, default="") treatment_files = models.ManyToManyField( TreatmentFile, related_name="exhibitions", verbose_name=_("Loans"), blank=True, ) documents = models.ManyToManyField( Document, related_name="exhibitions", verbose_name=_("Documents"), blank=True, ) main_image = models.ForeignKey( Document, related_name="main_image_exhibitions", on_delete=models.SET_NULL, verbose_name=_("Main image"), blank=True, null=True, ) timestamp_geo = None timestamp_label = None complete_identifier = None custom_index = None need_update = None cached_label = None history = HistoricalRecords() class Meta: verbose_name = _("Exhibition") verbose_name_plural = _("Exhibitions") unique_together = ("year", "name") permissions = ( ("view_own_exhibition", "Can view own Exhibition"), ("change_own_exhibition", "Can change own Exhibition"), ("delete_own_exhibition", "Can delete own Exhibition"), ) ordering = ("year", "name") indexes = [ GinIndex(fields=["data"]), ] ADMIN_SECTION = _("Treatments") def __str__(self): return f"{self.name or ''} [{self.year}]" def get_extra_actions(self, request): """ For sheet template: """ actions = super().get_extra_actions(request) can_add_tf = self.can_do( request, "archaeological_finds.add_treatmentfile" ) if can_add_tf: actions += [ ( reverse("exhibition-qa-add-loan", args=[self.pk]), _("Add exhibition loan"), "fa fa-plus", _("exhibition loan"), "", True, ), ] return actions def save(self, *args, **kwargs): self.pre_save_basket() super().save(*args, **kwargs)