#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2016-2017 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import datetime from django.conf import settings from django.contrib.gis.db import models from django.contrib.postgres.indexes import GinIndex from django.db import transaction from django.db.models import Max, Q from django.db.models.signals import post_save, post_delete, pre_delete, m2m_changed from django.template.defaultfilters import slugify from django.urls import reverse from ishtar_common.utils import ugettext_lazy as _, pgettext_lazy, SheetItem from archaeological_finds.models_finds import Find, FindBasket, TreatmentType, FindTreatment from archaeological_operations.models import ClosedItem, Operation from archaeological_context_records.models import Dating from archaeological_warehouse.models import Warehouse, Container from ishtar_common.model_managers import ExternalIdManager from ishtar_common.models import ( Document, GeneralType, ImageModel, BaseHistorizedItem, OwnPerms, Person, Organization, ValueGetter, post_save_cache, ShortMenuItem, DashboardFormItem, document_attached_changed, MainItem, HistoryModel, SearchAltName, SearchVectorConfig, DocumentItem, ) from ishtar_common.models_common import CompleteIdentifierItem, HistoricalRecords from ishtar_common.utils import ( cached_label_changed, get_current_year, update_data, m2m_historization_changed, ) class TreatmentState(GeneralType): executed = models.BooleanField(_("Treatment is executed"), default=False) order = models.IntegerField(verbose_name=_("Order"), default=10) class Meta: verbose_name = _("Treatment state type") verbose_name_plural = _("Treatment state types") ordering = ( "order", "label", ) ADMIN_SECTION = _("Treatments") @classmethod def get_default(cls): q = cls.objects.filter(executed=True) if not q.count(): return None return q.all()[0].pk @classmethod def get_completed_state(cls): treat_state, __ = cls.objects.get_or_create( txt_idx='completed', defaults={ 'label': _("Completed"), 'executed': True, 'available': True}) return treat_state post_save.connect(post_save_cache, sender=TreatmentState) post_delete.connect(post_save_cache, sender=TreatmentState) class Treatment( DashboardFormItem, ValueGetter, DocumentItem, BaseHistorizedItem, CompleteIdentifierItem, ImageModel, OwnPerms, ShortMenuItem, SheetItem, ): SLUG = "treatment" APP = "archaeological-finds" MODEL = SLUG SHOW_URL = "show-treatment" TABLE_COLS = ( "year", "index", "treatment_types__label", "treatment_state__label", "label", "scientific_monitoring_manager__cached_label", "person__cached_label", "start_date", "downstream_cached_label", "upstream_cached_label", ) REVERSED_BOOL_FIELDS = [ "documents__image__isnull", "documents__associated_file__isnull", "documents__associated_url__isnull", ] EXTRA_REQUEST_KEYS = { "downstream_cached_label": "downstream__cached_label", "upstream_cached_label": "upstream__cached_label", "person__cached_label": "person__cached_label", "scientific_monitoring_manager__cached_label": "scientific_monitoring_manager__cached_label", "person__pk": "person__pk", # used by dynamic_table_documents } COL_LABELS = { "downstream_cached_label": _("Downstream find"), "upstream_cached_label": _("Upstream find"), "treatment_types__label": _("Type"), "treatment_state__label": _("State"), "person__cached_label": _("Responsible"), "scientific_monitoring_manager__cached_label": _( "Scientific monitoring manager" ), } # extra keys than can be passed to save method EXTRA_SAVED_KEYS = ( "items", "user", "resulting_find", "upstream_items", "resulting_finds", "upstream_item", "treatment_type_list", ) # alternative names of fields for searches ALT_NAMES = { "label": SearchAltName( pgettext_lazy("key for text search", "label"), "label__iexact" ), "other_reference": SearchAltName( pgettext_lazy("key for text search", "other-reference"), "other_reference__iexact", ), "year": SearchAltName(pgettext_lazy("key for text search", "year"), "year"), "index": SearchAltName(pgettext_lazy("key for text search", "index"), "index"), "treatment_types": SearchAltName( pgettext_lazy("key for text search", "type"), "treatment_types__label__iexact", ), "scientific_monitoring_manager": SearchAltName( pgettext_lazy("key for text search", "scientific-manager"), "scientific_monitoring_manager__cached_label__iexact", related_name="scientific_monitoring_manager" ), } ALT_NAMES.update(BaseHistorizedItem.ALT_NAMES) ALT_NAMES.update(DocumentItem.ALT_NAMES) DATED_FIELDS = BaseHistorizedItem.DATED_FIELDS + [ "start_date", "end_date", "creation_date" ] DATETIME_FIELDS = BaseHistorizedItem.DATETIME_FIELDS + ["creation_date"] DEFAULT_SEARCH_FORM = ("archaeological_finds.forms_treatments", "TreatmentSelect") HISTORICAL_M2M = [ "treatment_types", ] NUMBER_FIELDS = ["year", "index"] BASE_SEARCH_VECTORS = [ SearchVectorConfig("treatment_types__label"), SearchVectorConfig("treatment_state__label"), SearchVectorConfig("label"), SearchVectorConfig("goal", "local"), SearchVectorConfig("external_id"), SearchVectorConfig("comment", "local"), SearchVectorConfig("description", "local"), SearchVectorConfig("other_reference"), ] PROPERTY_SEARCH_VECTORS = [ SearchVectorConfig("year_index"), ] INT_SEARCH_VECTORS = [ SearchVectorConfig("year"), SearchVectorConfig("index"), ] M2M_SEARCH_VECTORS = [ SearchVectorConfig("downstream__cached_label"), SearchVectorConfig("upstream__cached_label"), ] PARENT_SEARCH_VECTORS = ["person", "organization"] GET_VALUES_M2M = ["treatment_types"] objects = ExternalIdManager() label = models.CharField(_("Label"), blank=True, null=True, max_length=200) other_reference = models.CharField( _("Other ref."), blank=True, null=True, max_length=200 ) year = models.IntegerField(_("Year"), default=get_current_year) index = models.IntegerField(_("Index"), default=1) file = models.ForeignKey( "TreatmentFile", related_name="treatments", blank=True, null=True, on_delete=models.SET_NULL, verbose_name=_("Associated request"), ) treatment_types = models.ManyToManyField( TreatmentType, verbose_name=_("Treatment type") ) treatment_state = models.ForeignKey( TreatmentState, verbose_name=_("State"), default=TreatmentState.get_default, on_delete=models.PROTECT, ) executed = models.BooleanField(_("Treatment have been executed"), default=False) location = models.ForeignKey( Warehouse, verbose_name=_("Location"), blank=True, null=True, on_delete=models.SET_NULL, help_text=_( "Location where the treatment is done. Target warehouse for " "a move." ), ) person = models.ForeignKey( Person, verbose_name=_("Responsible"), blank=True, null=True, on_delete=models.SET_NULL, related_name="treatments", ) scientific_monitoring_manager = models.ForeignKey( Person, verbose_name=_("Scientific monitoring manager"), blank=True, null=True, on_delete=models.SET_NULL, related_name="manage_treatments", ) organization = models.ForeignKey( Organization, verbose_name=_("Organization"), blank=True, null=True, on_delete=models.SET_NULL, related_name="treatments", ) external_id = models.CharField( _("External ID"), blank=True, null=True, max_length=200 ) comment = models.TextField(_("Comment"), blank=True, default="") description = models.TextField(_("Description"), blank=True, default="") goal = models.TextField(_("Goal"), blank=True, default="") start_date = models.DateField(_("Start date"), blank=True, null=True) end_date = models.DateField(_("Closing date"), blank=True, null=True) creation_date = models.DateTimeField(default=datetime.datetime.now) container = models.ForeignKey( Container, verbose_name=_("Container"), on_delete=models.SET_NULL, blank=True, null=True, ) estimated_cost = models.FloatField(_("Estimated cost"), blank=True, null=True) quoted_cost = models.FloatField(_("Quoted cost"), blank=True, null=True) realized_cost = models.FloatField(_("Realized cost"), blank=True, null=True) insurance_cost = models.FloatField(_("Insurance cost"), blank=True, null=True) documents = models.ManyToManyField( Document, related_name="treatments", verbose_name=_("Documents"), blank=True ) main_image = models.ForeignKey( Document, related_name="main_image_treatments", on_delete=models.SET_NULL, verbose_name=_("Main image"), blank=True, null=True, ) history = HistoricalRecords(bases=[HistoryModel]) class Meta: verbose_name = _("Treatment") verbose_name_plural = _("Treatments") unique_together = ("year", "index") permissions = ( ("view_own_treatment", "Can view own Treatment"), ("add_own_treatment", "Can add own Treatment"), ("change_own_treatment", "Can change own Treatment"), ("delete_own_treatment", "Can delete own Treatment"), ) ordering = ("-year", "-index", "-start_date") indexes = [ GinIndex(fields=["data"]), ] ADMIN_SECTION = _("Treatments") def __str__(self): return self.cached_label or "" @property def short_class_name(self): return _("TREATMENT") @property def limited_finds(self): return self.finds.all()[:15] def natural_key(self): return (self.external_id,) @property def year_index(self): return "{}-{}".format(self.year, self.index) @classmethod def get_query_owns(cls, ishtaruser): return ( Q(history_creator=ishtaruser.user_ptr) | Q(person__ishtaruser=ishtaruser) ) & Q(end_date__isnull=True) @classmethod def get_owns( cls, user, menu_filtr=None, limit=None, values=None, get_short_menu_class=None ): replace_query = None if menu_filtr: if "treatmentfile" in menu_filtr: replace_query = Q(file=menu_filtr["treatmentfile"]) if "find" in menu_filtr and "basket" not in str(menu_filtr["find"]): q = Q(upstream=menu_filtr["find"]) | Q(downstream=menu_filtr["find"]) replace_query = replace_query | q if replace_query else q owns = super(Treatment, cls).get_owns( user, replace_query=replace_query, limit=limit, values=values, get_short_menu_class=get_short_menu_class, ) return cls._return_get_owns(owns, values, get_short_menu_class) def get_query_operations(self): return Operation.objects.filter( context_record__base_finds__find__downstream_treatment=self ) def _generate_cached_label(self): label = self._profile_generate_cached_label() if label: return label items = [ str(getattr(self, k)) for k in ["year", "index", "other_reference", "label"] if getattr(self, k) ] return "{} | {}".format("-".join(items), self.treatment_types_lbl()) def _get_base_image_path( self, ): return "{}/{}/{}".format(self.SLUG, self.year, self.index) def treatment_types_lbl(self): """ Treatment types label :return: string """ return " ; ".join(str(t) for t in self.treatment_types.all()) treatment_types_lbl.short_description = _("Treatment types") treatment_types_lbl.admin_order_field = "treatment_types__label" def downstream_lbl(self): """ Downstream finds label :return: string """ return " ; ".join(f.cached_label for f in self.downstream.all()) downstream_lbl.short_description = _("Downstream finds") downstream_lbl.admin_order_field = "downstream__cached_label" def upstream_lbl(self): """ Upstream finds label :return: string """ return " ; ".join(f.cached_label for f in self.upstream.all()) upstream_lbl.short_description = _("Upstream finds") upstream_lbl.admin_order_field = "upstream__cached_label" def get_extra_actions(self, request): # url, base_text, icon, extra_text, extra css class, is a quick action actions = super(Treatment, self).get_extra_actions(request) if self.can_do(request, "add_administrativeact"): actions += [ ( reverse("treatment-add-adminact", args=[self.pk]), _("Add associated administrative act"), "fa fa-plus", _("admin. act"), "", False, ), ] return actions def get_extra_values(self, prefix="", no_values=False, filtr=None, **kwargs): values = {} if not filtr or prefix + "upstream_finds" in filtr: values[prefix + "upstream_finds"] = " ; ".join( str(up) for up in self.upstream.all() ) if not filtr or prefix + "downstream_finds" in filtr: values[prefix + "downstream_finds"] = " ; ".join( str(up) for up in self.downstream.all() ) if not filtr or prefix + "operations" in filtr: values[prefix + "operations"] = " ; ".join( str(ope) for ope in self.get_query_operations().all() ) if "associatedfind_" not in prefix and self.upstream.count(): find = self.upstream.all()[0] new_prefix = prefix + "associatedfind_" values.update( find.get_values( prefix=new_prefix, no_values=True, filtr=filtr, **kwargs ) ) return values def pre_save(self): super(Treatment, self).pre_save() # is not new if self.pk is not None: return self.index = 1 q = Treatment.objects.filter(year=self.year) if q.count(): self.index = q.all().aggregate(Max("index"))["index__max"] + 1 def _create_n_1_resulting_find( self, resulting_find, upstream_items, treatment_types ): """ Manage creation of n<->1 treatment """ m2m = {} base_fields = [f.name for f in Find._meta.get_fields()] for k in list(resulting_find.keys()): # if not in base fields should be a m2m if k not in base_fields: values = resulting_find.pop(k) if values: m2m[k + "s"] = values resulting_find["history_modifier"] = self.history_modifier new_find = Find.objects.create(**resulting_find) for k, v in m2m.items(): m2m_field = getattr(new_find, k) try: for value in m2m[k]: m2m_field.add(value) except TypeError: m2m_field.add(v) create_new_find = bool([tp for tp in treatment_types if tp.create_new_find]) current_base_finds = [] current_documents = [] for upstream_item in upstream_items: # datings are not explicitly part of the resulting_find # need to reassociate with no duplicate for dating in upstream_item.datings.all(): is_present = any( Dating.is_identical(current_dating, dating) for current_dating in new_find.datings.all() ) if is_present: continue dating.pk = None # duplicate dating.save() new_find.datings.add(dating) # associate base finds for base_find in upstream_item.base_finds.all(): if base_find.pk in current_base_finds: continue current_base_finds.append(base_find.pk) new_find.base_finds.add(base_find) # documents for document in upstream_item.documents.all(): if document.pk in current_documents: continue current_documents.append(document.pk) new_find.documents.add(document) # data new_find.data = update_data(new_find.data, upstream_item.data, merge=True) if create_new_find: upstream_item.downstream_treatment = self upstream_item.history_modifier = self.history_modifier upstream_item.save() else: self.finds.add(upstream_item) new_find.upstream_treatment = self new_find.skip_history_when_saving = True new_find.save() def _create_1_n_resulting_find( self, resulting_finds, upstream_item, user, treatment_types ): """ Manage creation of 1<->n treatment """ new_items = [] start_number = resulting_finds["start_number"] for idx in range(resulting_finds["number"]): label = resulting_finds["label"] + str(start_number + idx) new_find = Find.objects.get(pk=upstream_item.pk).duplicate(user) new_find.upstream_treatment = self new_find.label = label new_find.skip_history_when_saving = True new_find.save() new_items.append(new_find) create_new_find = bool([tp for tp in treatment_types if tp.create_new_find]) if create_new_find: upstream_item.downstream_treatment = self upstream_item.skip_history_when_saving = True upstream_item.save() else: self.finds.add(upstream_item) if getattr(user, "ishtaruser", None): b = FindBasket.objects.create( label=resulting_finds["basket_name"], user=user.ishtaruser ) for item in new_items: b.items.add(item) @property def associated_filename(self): return "-".join( str(slugify(getattr(self, attr))) for attr in ("year", "index", "label") ) def get_find_treatment_list(self): return FindTreatment.objects.filter(treatment=self).all() def clean_cache(self): self._saved_container_attributes, self._is_loan_return, self._is_new_find_creator = None, None, None self._is_current_container_changer, self._is_reference_container_changer = None, None @property def is_loan(self): if getattr(self, "_is_loan", None) is None: self._is_loan = any( 1 for tp in self.treatment_types.all() if not tp.change_reference_location and tp.change_current_location ) return self._is_loan @property def is_loan_return(self): if getattr(self, "_is_loan_return", None) is None: self._is_loan_return = any( 1 for tp in self.treatment_types.all() if tp.restore_reference_location ) return self._is_loan_return @property def is_new_find_creator(self): if getattr(self, "_is_new_find_creator", None) is None: self._is_new_find_creator = any( 1 for tp in self.treatment_types.all() if tp.create_new_find ) return self._is_new_find_creator @property def is_current_container_changer(self): if getattr(self, "_is_current_container_changer", None) is None: self._is_current_container_changer = self.treatment_types.filter( change_current_location=True ).exists() return self._is_current_container_changer @property def is_reference_container_changer(self): if getattr(self, "_is_reference_container_changer", None) is None: self._is_reference_container_changer = self.treatment_types.filter( change_reference_location=True).exists() return self._is_reference_container_changer def save(self, *args, **kwargs): items, user, extra_args_for_new, resulting_find = [], None, [], None upstream_items, upstream_item, resulting_finds = [], None, None treatment_types, return_new = [], False if "items" in kwargs: items = kwargs.pop("items") if "resulting_find" in kwargs: resulting_find = kwargs.pop("resulting_find") if "resulting_finds" in kwargs: resulting_finds = kwargs.pop("resulting_finds") if "upstream_items" in kwargs: upstream_items = kwargs.pop("upstream_items") if "upstream_item" in kwargs: upstream_item = kwargs.pop("upstream_item") if "user" in kwargs: user = kwargs.pop("user") if "extra_args_for_new" in kwargs: extra_args_for_new = kwargs.pop("extra_args_for_new") if "treatment_type_list" in kwargs: treatment_types = kwargs.pop("treatment_type_list") if "return_new" in kwargs: return_new = kwargs.pop("return_new") self.pre_save() super().save(*args, **kwargs) # reinit cached values self.clean_cache() to_be_executed = not self.executed and self.treatment_state.executed updated = [] # baskets if hasattr(items, "items"): items = items.items.all() if hasattr(upstream_items, "items"): upstream_items = upstream_items.items.all() if not items and self.finds.count(): items = list(self.finds.all()) if not treatment_types and self.treatment_types.count(): treatment_types = list(self.treatment_types.all()) # execute the treatment if upstream_items and resulting_find: if not to_be_executed: # should not happen but bad validation check... return self._create_n_1_resulting_find( resulting_find, upstream_items, treatment_types ) self.executed = True self.save() return if upstream_item and resulting_finds: if not to_be_executed: # should not happen but bad validation check... return self._create_1_n_resulting_find( resulting_finds, upstream_item, self.history_modifier, treatment_types ) self.executed = True self.save() return create_new_find = self.is_new_find_creator new_items = [] for item in items: if not create_new_find or not to_be_executed: self.finds.add(item) else: self.finds.clear() new = item.duplicate(user) item.downstream_treatment = self item.save() new.upstream_treatment = self for k in extra_args_for_new: setattr(new, k, extra_args_for_new[k]) new.save() updated.append(new.pk) new_items.append(new) # update baskets for basket in FindBasket.objects.filter(items__pk=item.pk).all(): basket.items.remove(item) basket.items.add(new) if to_be_executed or self.executed: self.verify_find_container_history() if not to_be_executed: if return_new: return new_items return # manage loan return if self.loan_return(updated=updated): self.executed = True self.save() # manage containers elif self.container and self.move_finds_to_new_container(updated=updated): self.executed = True self.save() if return_new: return new_items return @property def saved_container_attributes(self): """ Return container attribute to change depending on treatment types """ if getattr(self, "_saved_container_attributes", None) is None: container_attrs = [] if self.is_current_container_changer: container_attrs.append("container") if self.is_reference_container_changer: container_attrs.append("container_ref") self._saved_container_attributes = container_attrs return self._saved_container_attributes def verify_find_container_history(self): container_attrs = self.saved_container_attributes if (not self.container or not container_attrs) and not self.is_loan_return: return False q = self._get_finds_query_for_treatments() if not q.exists(): return False for find in q.all(): q2 = FindTreatment.objects.filter(find=find, treatment=self) for find_treatment in q2.all(): find_treatment.generate_full_location() def _get_finds_query_for_treatments(self): if self.is_new_find_creator: return Find.objects.filter(upstream_treatment=self) return Find.objects.filter(treatments=self) def loan_return(self, updated=None): """ Manage loan return - change find location using treatment info :param treatment_types: if not provided re-fetched from database :param updated: list of already updated finds :return: True if container changed """ if not updated: updated = [] if not self.is_loan_return: return False q = self._get_finds_query_for_treatments() for find in q.all(): if find.container_ref and find.container != find.container_ref: find.container = find.container_ref if find.pk in updated: # don't record twice history find.skip_history_when_saving = True else: updated.append(find.pk) find.save() return True def move_finds_to_new_container(self, container_attrs=None, updated=None): """ Change find location using treatment info :param updated: list of already updated finds to update :return: True if container changed """ if not updated: updated = [] if not container_attrs: container_attrs = self.saved_container_attributes if not container_attrs: return False q = self._get_finds_query_for_treatments() for find in q.all(): for container_attr in container_attrs: setattr(find, container_attr, self.container) if find.pk in updated: # don't record twice history find.skip_history_when_saving = True else: updated.append(find.pk) find.save() return True post_save.connect(cached_label_changed, sender=Treatment) def pre_delete_treatment(sender, **kwargs): treatment = kwargs.get("instance") for find in Find.objects.filter(upstream_treatment=treatment).all(): if find.downstream_treatment: # a new treatment have be done since the deleted treatment # TODO ! # raise NotImplemented() pass find.delete() for find in Find.objects.filter(downstream_treatment=treatment).all(): find.downstream_treatment = None find.save() pre_delete.connect(pre_delete_treatment, sender=Treatment) m2m_changed.connect(document_attached_changed, sender=Treatment.documents.through) for attr in Treatment.HISTORICAL_M2M: m2m_changed.connect( m2m_historization_changed, sender=getattr(Treatment, attr).through ) class AbsFindTreatments(models.Model): find = models.ForeignKey( Find, verbose_name=_("Find"), related_name="%(class)s_related", on_delete=models.DO_NOTHING, ) treatment = models.OneToOneField( Treatment, verbose_name=_("Treatment"), primary_key=True, on_delete=models.DO_NOTHING, ) # primary_key is set to prevent django to ask for an id column # treatment is not a real primary key treatment_nb = models.IntegerField(_("Order")) TABLE_COLS = ["treatment__" + col for col in Treatment.TABLE_COLS] + [ "treatment_nb" ] COL_LABELS = { "treatment__treatment_type": _("Treatment type"), "treatment__start_date": _("Start date"), "treatment__end_date": _("End date"), "treatment__location": _("Location"), "treatment__container": _("Container"), "treatment__person": _("Doer"), "treatment__upstream": _("Related finds"), "treatment__downstream": _("Related finds"), } class Meta: abstract = True def __str__(self): return "{} - {} [{}]".format(self.find, self.treatment, self.treatment_nb) class FindNonModifTreatments(AbsFindTreatments): CREATE_SQL = """ CREATE VIEW find_nonmodif_treatments_tree AS WITH RECURSIVE rel_tree AS ( SELECT f.id AS find_id, of.id AS old_find_id, f.downstream_treatment_id, f.upstream_treatment_id, 1 AS level, ARRAY[]::integer[] AS path_info FROM archaeological_finds_find f INNER JOIN archaeological_finds_find of ON of.downstream_treatment_id = f.upstream_treatment_id WHERE f.downstream_treatment_id is NULL AND f.upstream_treatment_id is NOT NULL UNION ALL SELECT c.id AS find_id, p.old_find_id, c.downstream_treatment_id, c.upstream_treatment_id, p.level + 1, p.path_info||c.downstream_treatment_id FROM archaeological_finds_find c JOIN rel_tree p ON c.downstream_treatment_id = p.upstream_treatment_id AND (p.path_info = ARRAY[]::integer[] OR NOT (c.downstream_treatment_id = ANY(p.path_info[0:array_upper(p.path_info, 1)-1])) ) ) SELECT DISTINCT find_id, old_find_id, path_info, level FROM rel_tree UNION ALL SELECT id AS find_id, id AS old_find_id, ARRAY[]::integer[] AS path_info, 0 AS level FROM archaeological_finds_find f WHERE f.downstream_treatment_id is NULL ORDER BY find_id; CREATE VIEW find_nonmodif_treatments AS SELECT DISTINCT y.find_id, y.old_find_id, ft.treatment_id as treatment_id, 1 AS treatment_nb FROM (SELECT * FROM find_nonmodif_treatments_tree) y INNER JOIN archaeological_finds_find_treatments ft ON ft.find_id = y.old_find_id ORDER BY y.find_id, ft.treatment_id; -- deactivate deletion CREATE RULE find_nonmodif_treatments_del AS ON DELETE TO find_nonmodif_treatments DO INSTEAD DELETE FROM archaeological_finds_find where id=NULL; """ DELETE_SQL = """ DROP VIEW IF EXISTS find_nonmodif_treatments; DROP VIEW IF EXISTS find_nonmodif_treatments_tree; """ TABLE_COLS = [ "treatment__treatment_type", "treatment__upstream", "treatment__start_date", "treatment__end_date", "treatment__location", "treatment__container", "treatment__person", "treatment_nb", ] # search parameters EXTRA_REQUEST_KEYS = {"find_id": "find_id"} class Meta: managed = False db_table = "find_nonmodif_treatments" unique_together = ("find", "treatment") ordering = ("find", "-treatment_nb") class FindUpstreamTreatments(AbsFindTreatments): CREATE_SQL = """ CREATE VIEW find_uptreatments_tree AS WITH RECURSIVE rel_tree AS ( SELECT id AS find_id, upstream_treatment_id, downstream_treatment_id, 1 AS level, ARRAY[]::integer[] AS path_info FROM archaeological_finds_find WHERE upstream_treatment_id is NULL AND downstream_treatment_id is NOT NULL UNION ALL SELECT c.id AS find_id, c.upstream_treatment_id, c.downstream_treatment_id, p.level + 1, p.path_info||c.upstream_treatment_id FROM archaeological_finds_find c JOIN rel_tree p ON c.upstream_treatment_id = p.downstream_treatment_id AND (p.path_info = ARRAY[]::integer[] OR NOT (c.upstream_treatment_id = ANY(p.path_info[0:array_upper(p.path_info, 1)-1])) ) ) SELECT DISTINCT find_id, path_info, level FROM rel_tree ORDER BY find_id; CREATE VIEW find_uptreatments AS SELECT DISTINCT find_id, path_info[nb] AS treatment_id, level - nb + 1 AS treatment_nb FROM (SELECT *, generate_subscripts(path_info, 1) AS nb FROM find_uptreatments_tree) y WHERE path_info[nb] is not NULL ORDER BY find_id, treatment_id; -- deactivate deletion CREATE RULE find_uptreatments_del AS ON DELETE TO find_uptreatments DO INSTEAD DELETE FROM archaeological_finds_find where id=NULL; """ DELETE_SQL = """ DROP VIEW IF EXISTS find_uptreatments; DROP VIEW IF EXISTS find_uptreatments_tree; """ TABLE_COLS = [ "treatment__treatment_type", "treatment__upstream", "treatment__start_date", "treatment__end_date", "treatment__location", "treatment__container", "treatment__person", "treatment_nb", ] # search parameters EXTRA_REQUEST_KEYS = {"find_id": "find_id"} class Meta: managed = False db_table = "find_uptreatments" unique_together = ("find", "treatment") ordering = ("find", "-treatment_nb") class FindDownstreamTreatments(AbsFindTreatments): CREATE_SQL = """ CREATE VIEW find_downtreatments_tree AS WITH RECURSIVE rel_tree AS ( SELECT id AS find_id, downstream_treatment_id, upstream_treatment_id, 1 AS level, ARRAY[]::integer[] AS path_info FROM archaeological_finds_find WHERE downstream_treatment_id is NULL AND upstream_treatment_id is NOT NULL UNION ALL SELECT c.id AS find_id, c.downstream_treatment_id, c.upstream_treatment_id, p.level + 1, p.path_info||c.downstream_treatment_id FROM archaeological_finds_find c JOIN rel_tree p ON c.downstream_treatment_id = p.upstream_treatment_id AND (p.path_info = ARRAY[]::integer[] OR NOT (c.downstream_treatment_id = ANY(p.path_info[0:array_upper(p.path_info, 1)-1])) ) ) SELECT DISTINCT find_id, path_info, level FROM rel_tree ORDER BY find_id; CREATE VIEW find_downtreatments AS SELECT DISTINCT find_id, path_info[nb] AS treatment_id, level - nb + 1 AS treatment_nb FROM (SELECT *, generate_subscripts(path_info, 1) AS nb FROM find_downtreatments_tree) y WHERE path_info[nb] is not NULL ORDER BY find_id, treatment_id; -- deactivate deletion CREATE RULE find_downtreatments_del AS ON DELETE TO find_downtreatments DO INSTEAD DELETE FROM archaeological_finds_find where id=NULL; """ DELETE_SQL = """ DROP VIEW IF EXISTS find_downtreatments; DROP VIEW IF EXISTS find_downtreatments_tree; """ TABLE_COLS = [ "treatment__treatment_type", "treatment__downstream", "treatment__start_date", "treatment__end_date", "treatment__location", "treatment__container", "treatment__person", "treatment_nb", ] # search parameters EXTRA_REQUEST_KEYS = {"find_id": "find_id"} class Meta: managed = False db_table = "find_downtreatments" unique_together = ("find", "treatment") ordering = ("find", "-treatment_nb") class FindTreatments(AbsFindTreatments): CREATE_SQL = """ CREATE VIEW find_treatments AS SELECT find_id, treatment_id, treatment_nb, TRUE as upstream FROM find_uptreatments UNION SELECT find_id, treatment_id, treatment_nb, FALSE as upstream FROM find_downtreatments ORDER BY find_id, treatment_id, upstream; -- deactivate deletion CREATE RULE find_treatments_del AS ON DELETE TO find_treatments DO INSTEAD DELETE FROM archaeological_finds_find where id=NULL; """ DELETE_SQL = """ DROP VIEW IF EXISTS find_treatments; """ upstream = models.BooleanField(_("Is upstream")) class Meta: managed = False db_table = "find_treatments" unique_together = ("find", "treatment") ordering = ("find", "upstream", "-treatment_nb") class TreatmentFileType(GeneralType): treatment_type = models.ForeignKey( TreatmentType, blank=True, null=True, on_delete=models.SET_NULL ) class Meta: verbose_name = _("Treatment request type") verbose_name_plural = _("Treatment request types") ordering = ("label",) ADMIN_SECTION = _("Treatments") post_save.connect(post_save_cache, sender=TreatmentFileType) post_delete.connect(post_save_cache, sender=TreatmentFileType) class TreatmentFile( DashboardFormItem, ClosedItem, DocumentItem, BaseHistorizedItem, CompleteIdentifierItem, OwnPerms, ValueGetter, MainItem, ): SLUG = "treatmentfile" APP = "archaeological-finds" MODEL = SLUG SHOW_URL = "show-treatmentfile" DELETE_URL = "delete-treatmentfile" TABLE_COLS = ["type", "year", "index", "internal_reference", "name"] BASE_SEARCH_VECTORS = [ SearchVectorConfig("type__label"), SearchVectorConfig("internal_reference"), SearchVectorConfig("name"), SearchVectorConfig("comment", "local"), ] INT_SEARCH_VECTORS = [ SearchVectorConfig("year"), SearchVectorConfig("index"), ] PARENT_SEARCH_VECTORS = ["in_charge", "applicant", "applicant_organisation"] NUMBER_FIELDS = ["year", "index"] DATED_FIELDS = BaseHistorizedItem.DATED_FIELDS + [ "end_date", "exhibition_start_date", "exhibition_end_date", ] EXTRA_REQUEST_KEYS = { "in_charge__pk": "in_charge__pk", # used by dynamic_table_documents "applicant__pk": "applicant__pk", # used by dynamic_table_documents } REVERSED_BOOL_FIELDS = [ "documents__image__isnull", "documents__associated_file__isnull", "documents__associated_url__isnull", ] # alternative names of fields for searches ALT_NAMES = { "name": SearchAltName( pgettext_lazy("key for text search", "name"), "name__iexact" ), "internal_reference": SearchAltName( pgettext_lazy("key for text search", "reference"), "internal_reference__iexact", ), "year": SearchAltName(pgettext_lazy("key for text search", "year"), "year"), "index": SearchAltName(pgettext_lazy("key for text search", "index"), "index"), "type": SearchAltName( pgettext_lazy("key for text search", "type"), "type__label__iexact" ), "in_charge": SearchAltName( pgettext_lazy("key for text search", "in-charge"), "in_charge__cached_label__iexact", ), "applicant": SearchAltName( pgettext_lazy("key for text search", "applicant"), "applicant__cached_label__iexact", related_name="applicant" ), "applicant_organisation": SearchAltName( pgettext_lazy("key for text search", "applicant-organisation"), "applicant_organisation__cached_label__iexact", related_name="applicant_organisation" ), "end_date": SearchAltName( pgettext_lazy("key for text search", "end-date"), "end_date", ), "exhibition_start": SearchAltName( pgettext_lazy("key for text search", "exhibition-start"), "exhibition_start_date", ), "exhibition_end": SearchAltName( pgettext_lazy("key for text search", "exhibition-end"), "exhibition_end_date", ), } ALT_NAMES.update(BaseHistorizedItem.ALT_NAMES) ALT_NAMES.update(DocumentItem.ALT_NAMES) DEFAULT_SEARCH_FORM = ("archaeological_finds.forms_treatments", "TreatmentFileSelect") # fields year = models.IntegerField(_("Year"), default=get_current_year) index = models.IntegerField(_("Index"), default=1) internal_reference = models.CharField( _("Internal reference"), blank=True, null=True, max_length=200 ) external_id = models.CharField( _("Reference"), blank=True, null=True, max_length=200 ) name = models.TextField(_("Name"), blank=True, default="") type = models.ForeignKey( TreatmentFileType, verbose_name=_("Treatment request type"), on_delete=models.PROTECT, ) in_charge = models.ForeignKey( Person, related_name="treatmentfile_responsability", verbose_name=_("Person in charge"), on_delete=models.SET_NULL, blank=True, null=True, ) applicant = models.ForeignKey( Person, related_name="treatmentfile_applicant", verbose_name=_("Applicant"), on_delete=models.SET_NULL, blank=True, null=True, ) applicant_organisation = models.ForeignKey( Organization, related_name="treatmentfile_applicant", verbose_name=_("Applicant organisation"), on_delete=models.SET_NULL, blank=True, null=True, ) end_date = models.DateField(_("Closing date"), null=True, blank=True) creation_date = models.DateField( _("Creation date"), default=datetime.date.today, blank=True, null=True ) reception_date = models.DateField(_("Reception date"), blank=True, null=True) # exhibition exhibition_name = models.TextField(_("Exhibition name"), blank=True, default="") exhibition_start_date = models.DateField( _("Exhibition start date"), blank=True, null=True ) exhibition_end_date = models.DateField( _("Exhibition end date"), blank=True, null=True ) comment = models.TextField(_("Comment"), blank=True, default="") documents = models.ManyToManyField( Document, related_name="treatment_files", verbose_name=_("Documents"), blank=True, ) main_image = models.ForeignKey( Document, related_name="main_image_treatment_files", on_delete=models.SET_NULL, verbose_name=_("Main image"), blank=True, null=True, ) associated_basket = models.ForeignKey( FindBasket, null=True, blank=True, on_delete=models.SET_NULL, related_name="treatment_files", ) history = HistoricalRecords() class Meta: verbose_name = _("Treatment request") verbose_name_plural = _("Treatment requests") unique_together = ("year", "index") permissions = ( ("view_own_treatmentfile", "Can view own Treatment request"), ("add_own_treatmentfile", "Can add own Treatment request"), ("change_own_treatmentfile", "Can change own Treatment request"), ("delete_own_treatmentfile", "Can delete own Treatment request"), ) ordering = ("cached_label",) indexes = [ GinIndex(fields=["data"]), ] ADMIN_SECTION = _("Treatments") def __str__(self): return self.cached_label or "" @property def short_class_name(self): return _("Treatment request") @classmethod def class_verbose_name(cls): # TODO: should be unecessary if MainItem return cls._meta.verbose_name @classmethod def get_query_owns(cls, ishtaruser): return ( Q(history_creator=ishtaruser.user_ptr) | Q(in_charge__ishtaruser=ishtaruser) ) & Q(end_date__isnull=True) @property def associated_filename(self): return "-".join( str(slugify(getattr(self, attr))) for attr in ("year", "index", "internal_reference", "name") if getattr(self, attr) ) def get_extra_actions(self, request): # url, base_text, icon, extra_text, extra css class, is a quick action actions = super(TreatmentFile, self).get_extra_actions(request) if self.can_do(request, "add_administrativeact"): actions += [ ( reverse("treatmentfile-add-adminact", args=[self.pk]), _("Add associated administrative act"), "fa fa-plus", _("admin. act"), "", False, ), ] if not self.associated_basket: return actions if ( self.type.treatment_type and self.treatments.filter( treatment_types__pk=self.type.treatment_type.pk ).count() ): # a treatment of this type already exists return actions can_edit_find = self.can_do(request, "change_find") if can_edit_find: actions += [ ( reverse("treatmentfile-add-treatment", args=[self.pk]), _("Add associated treatment"), "fa fa-flask", "", "", False, ), ] return actions @classmethod def get_owns( cls, user, menu_filtr=None, limit=None, values=None, get_short_menu_class=None ): owns = super(TreatmentFile, cls).get_owns( user, limit=limit, values=values, get_short_menu_class=get_short_menu_class ) return cls._return_get_owns(owns, values, get_short_menu_class) def _generate_cached_label(self): label = self._profile_generate_cached_label() if label: return label items = [ str(getattr(self, k)) for k in ["year", "index", "internal_reference", "name"] if getattr(self, k) ] return settings.JOINT.join(items) def _get_base_image_path( self, ): return "{}/{}/{}".format(self.SLUG, self.year, self.index) def pre_save(self): # is not new if self.pk is not None: return self.index = 1 q = TreatmentFile.objects.filter(year=self.year) if q.count(): self.index = q.all().aggregate(Max("index"))["index__max"] + 1 def save(self, *args, **kwargs): self.pre_save() super(TreatmentFile, self).save(*args, **kwargs) m2m_changed.connect(document_attached_changed, sender=TreatmentFile.documents.through) post_save.connect(cached_label_changed, sender=TreatmentFile)