#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2012-2017 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. from collections import OrderedDict import datetime from itertools import groupby import uuid from django.apps import apps from django.conf import settings from django.contrib.gis.db import models from django.contrib.gis.db.models.aggregates import Union from django.contrib.gis.db.models.functions import Centroid from django.contrib.postgres.indexes import GinIndex from django.core.urlresolvers import reverse from django.db import IntegrityError, transaction from django.db.models import Q, Count, Sum, Max, Avg from django.db.models.signals import post_save, m2m_changed, post_delete from django.forms import ValidationError from ishtar_common.utils import ugettext_lazy as _, pgettext_lazy from ishtar_common.models import ( BaseHistorizedItem, Dashboard, DashboardFormItem, Document, DocumentTemplate, GeneralRecordRelations, GeneralRelationType, GeneralType, IshtarUser, LightHistorizedItem, OperationType, Organization, OwnPerms, Person, PersonType, post_delete_record_relation, post_save_cache, RelationItem, ShortMenuItem, SourceType, Town, ValueGetter, get_current_profile, document_attached_changed, HistoryModel, SearchAltName, GeoItem, CompleteIdentifierItem, SearchVectorConfig, DocumentItem, QuickAction, MainItem, HierarchicalType, ) from ishtar_common.models_common import Department, HistoricalRecords from ishtar_common.model_managers import UUIDModelManager from ishtar_common.utils import ( cached_label_changed, force_cached_label_changed, mode, m2m_historization_changed, post_save_geo, ) class RemainType(GeneralType): class Meta: verbose_name = _("Remain type") verbose_name_plural = _("Remain types") ordering = ("label",) post_save.connect(post_save_cache, sender=RemainType) post_delete.connect(post_save_cache, sender=RemainType) class Period(GeneralType): order = models.IntegerField(_("Order")) start_date = models.IntegerField(_("Start date"), null=True, blank=True) end_date = models.IntegerField(_("End date"), null=True, blank=True) parent = models.ForeignKey( "Period", verbose_name=_("Parent period"), on_delete=models.SET_NULL, blank=True, null=True, ) class Meta: verbose_name = _("Type Period") verbose_name_plural = _("Types Period") ordering = ("order",) def __str__(self): return self.label post_save.connect(post_save_cache, sender=Period) post_delete.connect(post_save_cache, sender=Period) class ReportState(GeneralType): order = models.IntegerField(_("Order")) class Meta: verbose_name = _("Type of report state") verbose_name_plural = _("Types of report state") ordering = ("order",) post_save.connect(post_save_cache, sender=ReportState) post_delete.connect(post_save_cache, sender=ReportState) class SiteManager(models.GeoManager): def get_by_natural_key(self, txt_idx): return self.get(reference=txt_idx) class RecordQualityType(GeneralType): order = models.IntegerField(_("Order")) class Meta: verbose_name = _("Type of record quality") verbose_name_plural = _("Types of record quality") ordering = ("order",) class CulturalAttributionType(HierarchicalType): order = models.IntegerField(_("Order"), default=10) class Meta: verbose_name = _("Cultural attribution type") verbose_name_plural = _("Cultural attribution types") ordering = ("order",) post_save.connect(post_save_cache, sender=RecordQualityType) post_delete.connect(post_save_cache, sender=RecordQualityType) class ArchaeologicalSite( DocumentItem, BaseHistorizedItem, CompleteIdentifierItem, GeoItem, OwnPerms, ValueGetter, MainItem, ): SLUG = "site" APP = "archaeological-operations" MODEL = "archaeological-site" SHOW_URL = "show-site" DELETE_URL = "delete-site" TABLE_COLS = [ "reference", "name", "cached_towns_label", "cached_periods", "cached_remains", ] NEW_QUERY_ENGINE = True COL_LABELS = { "cached_towns_label": _("Towns"), "cached_periods": _("Periods"), "cached_remains": _("Remains"), } LONG_SLUG = "archaeologicalsite" STATISTIC_MODALITIES_OPTIONS = OrderedDict( [ ("towns__areas__label", _("Area")), ("towns__areas__parent__label", _("Extended area")), ("periods__label", _("Periods")), ("remains__label", _("Remains")), ("documents__source_type__label", _("Associated document type")), ("last_modified__year", _("Modification (year)")), ] ) STATISTIC_MODALITIES = [key for key, lbl in STATISTIC_MODALITIES_OPTIONS.items()] BASE_SEARCH_VECTORS = [ SearchVectorConfig("comment", "local"), SearchVectorConfig("discovery_area", "local"), SearchVectorConfig("locality_cadastral", "local"), SearchVectorConfig("locality_ngi", "local"), SearchVectorConfig("name"), SearchVectorConfig("oceanographic_service_localisation"), SearchVectorConfig("reference"), SearchVectorConfig("other_reference"), SearchVectorConfig("shipwreck_code"), SearchVectorConfig("shipwreck_name"), SearchVectorConfig("drassm_number"), SearchVectorConfig("affmar_number"), ] M2M_SEARCH_VECTORS = [ SearchVectorConfig("periods__label", "local"), SearchVectorConfig("remains__label", "local"), SearchVectorConfig("towns__name"), ] PARENT_SEARCH_VECTORS = ["operations"] DATED_FIELDS = BaseHistorizedItem.DATED_FIELDS + ["sinking_date"] EXTRA_REQUEST_KEYS = { "towns_label": "towns", "collaborators__pk": "collaborators__pk", # dynamic_table_documents "cached_towns_label": "cached_towns_label", "cached_periods": "cached_periods", "cached_remains": "remains", } # alternative names of fields for searches REVERSED_BOOL_FIELDS = [ "documents__image__isnull", "documents__associated_file__isnull", "documents__associated_url__isnull", ] ALT_NAMES = { "reference": SearchAltName( pgettext_lazy("key for text search", "reference"), "reference__iexact" ), "name": SearchAltName( pgettext_lazy("key for text search", "name"), "name__iexact" ), "other_reference": SearchAltName( pgettext_lazy("key for text search", "other-reference"), "other_reference__iexact", ), "periods": SearchAltName( pgettext_lazy("key for text search", "period"), "periods__label__iexact" ), "remains": SearchAltName( pgettext_lazy("key for text search", "remain"), "remains__label__iexact" ), "towns": SearchAltName( pgettext_lazy("key for text search", "town"), "towns__cached_label__iexact" ), "towns__areas": SearchAltName( pgettext_lazy("key for text search", "area"), "towns__areas__label__iexact" ), "comment": SearchAltName( pgettext_lazy("key for text search", "comment"), "comment__iexact" ), "locality_ngi": SearchAltName( pgettext_lazy("key for text search", "locality-ngi"), "locality_ngi__iexact" ), "locality_cadastral": SearchAltName( pgettext_lazy("key for text search", "locality-cadastral"), "locality_cadastral__iexact", ), "shipwreck_name": SearchAltName( pgettext_lazy("key for text search", "shipwreck-name"), "shipwreck_name__iexact", ), "oceanographic_service_localisation": SearchAltName( pgettext_lazy("key for text search", "oceanographic-service-localisation"), "oceanographic_service_localisation__iexact", ), "shipwreck_code": SearchAltName( pgettext_lazy("key for text search", "shipwreck-code"), "shipwreck_code__iexact", ), "sinking_date": SearchAltName( pgettext_lazy("key for text search", "sinking-date"), "sinking_date" ), "discovery_area": SearchAltName( pgettext_lazy("key for text search", "discovery-area"), "discovery_area__iexact", ), "operation": SearchAltName( pgettext_lazy("key for text search", "operation"), "operations__cached_label__icontains", ), "top_operation": SearchAltName( pgettext_lazy("key for text search", "top-operation"), "top_operations__cached_label__icontains", ), "drassm_number": SearchAltName( pgettext_lazy("key for text search", "numero-drassm"), "drassm_number__iexact", ), "affmar_number": SearchAltName( pgettext_lazy("key for text search", "numero-affmar"), "affmar_number__iexact", ), "cultural_attributions": SearchAltName( pgettext_lazy("key for text search", "cultural-attribution"), "cultural_attributions__label__iexact", ), } ALT_NAMES.update(BaseHistorizedItem.ALT_NAMES) ALT_NAMES.update(DocumentItem.ALT_NAMES) UP_MODEL_QUERY = { "operation": ( pgettext_lazy("key for text search", "operation"), "cached_label", ), } RELATIVE_SESSION_NAMES = [ ("operation", "operations__pk"), ] HISTORICAL_M2M = ["periods", "remains", "towns", "cultural_attributions"] CACHED_LABELS = [ "cached_label", "cached_towns_label", "cached_periods", "cached_remains", ] DOWN_MODEL_UPDATE = ["context_records"] QA_LOCK = QuickAction( url="site-qa-lock", icon_class="fa fa-lock", text=_("Lock/Unlock"), target="many", rights=["change_archaeologicalsite", "change_own_archaeologicalsite"], ) QA_EDIT = QuickAction( url="site-qa-bulk-update", icon_class="fa fa-pencil", text=_("Bulk update"), target="many", rights=["change_archaeologicalsite", "change_own_archaeologicalsite"], ) QUICK_ACTIONS = [ QA_EDIT, QA_LOCK, QuickAction( url="site-qa-duplicate", icon_class="fa fa-clone", text=_("Duplicate"), target="one", rights=["change_archaeologicalsite", "change_own_archaeologicalsite"], ), ] objects = SiteManager() reference = models.CharField(_("Reference"), max_length=200, unique=True) other_reference = models.TextField(_("Other reference"), blank=True, default="") name = models.CharField(_("Name"), max_length=200, null=True, blank=True) periods = models.ManyToManyField(Period, verbose_name=_("Periods"), blank=True) remains = models.ManyToManyField( "RemainType", verbose_name=_("Remains"), blank=True ) cultural_attributions = models.ManyToManyField( "CulturalAttributionType", verbose_name=_("Cultural attribution"), blank=True ) towns = models.ManyToManyField( Town, verbose_name=_("Towns"), related_name="sites", blank=True ) comment = models.TextField(_("Comment"), blank=True, default="") locality_ngi = models.TextField( _("National Geographic Institute locality"), blank=True, default="" ) locality_cadastral = models.TextField( _("Cadastral locality"), blank=True, default="" ) collaborators = models.ManyToManyField( Person, blank=True, verbose_name=_("Collaborators"), related_name="site_collaborator", ) # underwater shipwreck_name = models.TextField(_("Shipwreck name"), blank=True, default="") oceanographic_service_localisation = models.TextField( _("Oceanographic service localisation"), blank=True, default="" ) shipwreck_code = models.TextField(_("Shipwreck code"), blank=True, default="") sinking_date = models.DateField(_("Sinking date"), null=True, blank=True) discovery_area = models.TextField(_("Discovery area"), blank=True, default="") affmar_number = models.CharField( _("AffMar number"), max_length=100, null=True, blank=True ) drassm_number = models.CharField( _("DRASSM number"), max_length=100, null=True, blank=True ) documents = models.ManyToManyField( Document, related_name="sites", verbose_name=_("Documents"), blank=True ) main_image = models.ForeignKey( Document, related_name="main_image_sites", on_delete=models.SET_NULL, verbose_name=_("Main image"), blank=True, null=True, ) cached_label = models.TextField( _("Cached name"), blank=True, default="", db_index=True, help_text=_("Generated automatically - do not edit"), ) cached_towns_label = models.TextField( _("Cached town label"), blank=True, default="", help_text=_("Generated automatically - do not edit"), ) cached_periods = models.TextField( _("Cached periods label"), blank=True, default="", help_text=_("Generated automatically - do not edit"), ) cached_remains = models.TextField( _("Cached remains label"), blank=True, default="", help_text=_("Generated automatically - do not edit"), ) history = HistoricalRecords(bases=[HistoryModel]) class Meta: verbose_name = _("Archaeological site") verbose_name_plural = _("Archaeological sites") permissions = ( ("view_archaeologicalsite", "Can view all Archaeological sites"), ("view_own_archaeologicalsite", "Can view own Archaeological site"), ("add_own_archaeologicalsite", "Can add own Archaeological site"), ("change_own_archaeologicalsite", "Can change own Archaeological site"), ("delete_own_archaeologicalsite", "Can delete own Archaeological site"), ) indexes = [ GinIndex(fields=["data"]), ] def __str__(self): return self.cached_label or "" @property def short_class_name(self): return _("SITE") @property def top_operation(self): if self.top_operations.count(): return self.top_operations.all()[0] return def public_representation(self): dct = super(ArchaeologicalSite, self).public_representation() dct.update( { "reference": self.reference, "name": self.name, "periods": [str(p) for p in self.periods.all()], "remains": [str(r) for r in self.remains.all()], "towns": [t.label_with_areas for t in self.towns.all()], "comment": self.comment, "locality": self.locality_ngi or self.locality_cadastral, } ) profile = get_current_profile() if profile.underwater: dct["shipwreck-name"] = self.shipwreck_name dct["sinking-date"] = self.sinking_date dct["discovery-area"] = self.discovery_area return dct @property def finds(self): from archaeological_finds.models import Find return Find.objects.filter( base_finds__context_record__archaeological_site__pk=self.pk ) def get_extra_actions(self, request): """ For sheet template """ # url, base_text, icon, extra_text, extra css class, is a quick action actions = super(ArchaeologicalSite, self).get_extra_actions(request) # is_locked = self.is_locked(request.user) can_edit_site = self.can_do(request, "change_archaeologicalsite") if can_edit_site: actions += [ ( reverse("site-qa-duplicate", args=[self.pk]), _("Duplicate"), "fa fa-clone", "", "", True, ), ] return actions @classmethod def _get_query_owns_dicts(cls, ishtaruser, no_rel=False): profile = ishtaruser.current_profile town_ids = [] if profile: town_ids = [town["pk"] for town in profile.query_towns.values("pk").all()] query_owns = [ { "collaborators__pk": ishtaruser.person.pk, "history_creator": ishtaruser.user_ptr, "towns__pk__in": town_ids, } ] return query_owns @classmethod def get_query_owns(cls, ishtaruser): from archaeological_warehouse.models import Warehouse q = ( cls._construct_query_own( "operations__context_record__base_finds__find__container__responsible__", Warehouse._get_query_owns_dicts(ishtaruser), ) | cls._construct_query_own( "operations__context_record__base_finds__find__basket__", [{"shared_with": ishtaruser, "shared_write_with": ishtaruser}], ) | cls._construct_query_own( "operations__context_record__base_finds__find__container__location__", Warehouse._get_query_owns_dicts(ishtaruser), ) | cls._construct_query_own( "top_operations__context_record__base_finds__find__container__responsible__", Warehouse._get_query_owns_dicts(ishtaruser), ) | cls._construct_query_own( "top_operations__context_record__base_finds__find__container__location__", Warehouse._get_query_owns_dicts(ishtaruser), ) | cls._construct_query_own( "operations__", Operation._get_query_owns_dicts(ishtaruser, no_rel=True) ) | cls._construct_query_own( "top_operations__", Operation._get_query_owns_dicts(ishtaruser) ) | cls._construct_query_own("", cls._get_query_owns_dicts(ishtaruser)) ) return q @classmethod def get_owns( cls, user, menu_filtr=None, limit=None, values=None, get_short_menu_class=None ): replace_query = None if menu_filtr and "operation" in menu_filtr: replace_query = Q(operations=menu_filtr["operation"]) owns = super(ArchaeologicalSite, cls).get_owns( user, replace_query=replace_query, limit=limit, values=values, get_short_menu_class=get_short_menu_class, ) return cls._return_get_owns(owns, values, get_short_menu_class) def _generate_cached_label(self): name = self.reference if self.name: name += " %s %s" % (settings.JOINT, self.name) keys = [("towns", " - {}"), ("remains", " - {}"), ("periods", " [{}]")] for k, lbl in keys: if getattr(self, k).count(): name += lbl.format(", ".join([str(v) for v in getattr(self, k).all()])) return name def _generate_cached_towns_label(self): return self.towns_label() or "-" def _generate_cached_remains(self): return " & ".join([str(remain) for remain in self.remains.all()]) or "-" def _generate_cached_periods(self): return " & ".join([str(period) for period in self.periods.all()]) or "-" def natural_key(self): return (self.reference,) @property def external_id(self): return self.reference def towns_codes(self): return [town.label_with_areas for town in self.towns.all()] def towns_label(self): return " & ".join(self.towns_codes()) def get_town_centroid(self): q = ( self.towns.filter(center__isnull=False) .annotate(centroid=Centroid(Union("center"))) .all() ) if not q.count(): return return q.all()[0].centroid, self._meta.verbose_name def get_town_polygons(self): q = self.towns.filter(limit__isnull=False).annotate(poly=Union("limit")).all() if not q.count(): return return q.all()[0].poly, self._meta.verbose_name def _get_base_image_path(self): return "{}/{}".format(self.SLUG, self.reference) def create_or_update_top_operation(self, create=False): """ Create a virtual operation to associate with the site. A cluster operation is created for site with many operation. This cluster operation can be used to attach context records which are only attached to the site. """ if not self.top_operations.count(): if not create: return operation_type, created = OperationType.objects.get_or_create( txt_idx="unknown", defaults={"label": _("Unknown"), "available": True, "order": 999}, ) name = str(_("Virtual operation of site: {}")).format(self.reference) if self.towns.count(): name += " - " + ", ".join([town.name for town in self.towns.all()]) operation = Operation.objects.create( operation_type=operation_type, common_name=name, virtual_operation=True ) operation.top_sites.add(self) top_operation = self.top_operations.all()[0] current_operations = dict( [ (ope.pk, ope) for ope in self.operations.exclude(pk=top_operation.pk).all() ] ) q = RecordRelations.objects.filter( left_record=top_operation, relation_type__txt_idx="has_got" ) for relation in q.all(): if relation.right_record.pk not in current_operations: relation.delete() else: current_operations.pop(relation.right_record.pk) rel_type = RelationType.get_cache("has_got") for missing, value in current_operations.items(): RecordRelations.objects.create( left_record=top_operation, right_record=value, relation_type=rel_type, ) def site_post_save(sender, **kwargs): cached_label_changed(sender=sender, **kwargs) post_save_geo(sender=sender, **kwargs) post_save.connect(site_post_save, sender=ArchaeologicalSite) m2m_changed.connect( document_attached_changed, sender=ArchaeologicalSite.documents.through ) for attr in ArchaeologicalSite.HISTORICAL_M2M: m2m_changed.connect( m2m_historization_changed, sender=getattr(ArchaeologicalSite, attr).through ) def get_values_town_related(item, prefix, values, filtr=None): if not filtr or prefix + "parcellist" in filtr: values[prefix + "parcellist"] = item.render_parcels() if not filtr or prefix + "towns_count" in filtr: values[prefix + "towns_count"] = str(item.towns.count()) get_towns = not filtr or prefix + "towns" in filtr get_dpt = not filtr or prefix + "departments" in filtr get_dpt_nb = not filtr or prefix + "departments_number" in filtr if not get_towns and not get_dpt and not get_dpt_nb: return values if get_towns: values[prefix + "towns"] = "" if get_dpt: values[prefix + "departments"] = "" if get_dpt_nb: values[prefix + "departments_number"] = "" if item.towns.count(): if get_towns: values[prefix + "towns"] = ", ".join( [town.name for town in item.towns.all().order_by("name")] ) if settings.COUNTRY == "fr" and (get_dpt_nb or get_dpt_nb): dpts_num = set([town.numero_insee[:2] for town in item.towns.all()]) if get_dpt_nb: values[prefix + "departments_number"] = ", ".join( list(sorted(dpts_num)) ) if get_dpt: values[prefix + "departments"] = ", ".join( [ Department.objects.get(number=dpt).label for dpt in sorted(dpts_num) if Department.objects.filter(number=dpt).count() ] ) return values class ClosedItem(object): def closing(self): if self.is_active(): return in_history = False date = self.end_date # last action is closing? for idx, item in enumerate(self.history.order_by("-history_date").all()): if not idx: # last action continue if not item.end_date or item.end_date != self.end_date: break in_history = True user = None if in_history: if item.history_modifier_id: q = IshtarUser.objects.filter(pk=item.history_modifier_id) if q.count(): user = q.all()[0] elif self.history_modifier_id: q = IshtarUser.objects.filter(pk=self.history_modifier_id) if q.count(): user = q.all()[0] return {"date": date, "user": user} class ParcelItem: def clean_parcel_duplicates(self): parcels = {} for p in self.parcels.order_by("pk").all(): if p.associated_file: continue key = (p.section, p.parcel_number, p.year, p.town.pk, p.public_domain) if key in parcels: parcels[key].merge(p) else: parcels[key] = p class Operation( ClosedItem, DocumentItem, BaseHistorizedItem, CompleteIdentifierItem, GeoItem, OwnPerms, ValueGetter, MainItem, DashboardFormItem, RelationItem, ParcelItem, ): SLUG = "operation" APP = "archaeological-operations" MODEL = "operation" SHOW_URL = "show-operation" DELETE_URL = "delete-operation" TABLE_COLS = [ "code_patriarche", "year", "cached_towns_label", "common_name", "operation_type__label", "start_date", "excavation_end_date", "cached_remains", ] NEW_QUERY_ENGINE = True # statistics STATISTIC_MODALITIES_OPTIONS = OrderedDict( [ ("operation_type__label", _("Operation type")), ("year", _("Year")), ("towns__areas__label", _("Area")), ("towns__areas__parent__label", _("Extended area")), ("remains__label", _("Remains")), ("periods__label", _("Periods")), ("record_quality_type__label", _("Record quality")), ("documentation_received", _("Documentation received")), ("finds_received", _("Finds received")), ("documents__source_type__label", _("Associated document type")), ("last_modified__year", _("Modification (year)")), ] ) STATISTIC_MODALITIES = [key for key, lbl in STATISTIC_MODALITIES_OPTIONS.items()] # search parameters BOOL_FIELDS = [ "end_date__isnull", "virtual_operation", "documentation_received", "finds_received", ] MANY_COUNTED_FIELDS = ["context_record__base_finds"] REVERSED_BOOL_FIELDS = [ "documents__image__isnull", "documents__associated_file__isnull", "documents__associated_url__isnull", ] DATED_FIELDS = BaseHistorizedItem.DATED_FIELDS + [ "start_date__lte", "start_date__gte", "excavation_end_date__lte", "excavation_end_date__gte", "documentation_deadline__lte", "documentation_deadline__gte", "finds_deadline__lte", "finds_deadline__gte", ] EXTRA_REQUEST_KEYS = { "operation_type__label": "operation_type__label", "common_name": "common_name__icontains", "cached_label": "cached_label__icontains", "comment": "comment__icontains", "scientific_documentation_comment": "scientific_documentation_comment__icontains", "abstract": "abstract__icontains", "end_date": "end_date__isnull", "start_before": "start_date__lte", "start_after": "start_date__gte", "end_before": "excavation_end_date__lte", "end_after": "excavation_end_date__gte", "towns__numero_insee__startswith": "towns__numero_insee__startswith", "parcel": "parcels__cached_label__iexact", "history_creator": "history_creator__ishtaruser__person__pk", "history_modifier": "history_modifier__ishtaruser__person__pk", "documentation_deadline_before": "documentation_deadline__lte", "documentation_deadline_after": "documentation_deadline__gte", "finds_deadline_before": "finds_deadline__lte", "finds_deadline_after": "finds_deadline__gte", "related_treatment": "context_record__base_finds__find__upstream_treatment__id", "towns_label": "towns", "scientist__pk": "scientist__pk", # dynamic_table_documents "in_charge__pk": "in_charge__pk", # dynamic_table_documents "collaborators__pk": "collaborators__pk", # dynamic_table_documents "cira_rapporteur__pk": "cira_rapporteur__pk", # dynamic_table_documents } COL_LABELS = { "code_patriarche": "Code patriarche", "associated_file_short_label": _("Associated file (label)"), "operator__name": _("Operator name"), "scientist__raw_name": _("Scientist (full name)"), "associated_file__external_id": _("Associated file (external ID)"), "scientist__title": _("Scientist (title)"), "scientist__surname": _("Scientist (surname)"), "scientist__name": _("Scientist (name)"), "scientist__attached_to__name": _("Scientist - Organization (name)"), "in_charge__title": _("Scientific monitor (title)"), "in_charge__surname": _("Scientific monitor (surname)"), "in_charge__name": _("Scientific monitor (name)"), "in_charge__attached_to__name": _("Scientific monitor - Organization (name)"), "cira_rapporteur__surname": "Rapporteur CTRA/CIRA (prénom)", "cira_rapporteur__name": "Rapporteur CTRA/CIRA (nom)", "cira_rapporteur__attached_to__name": "Rapporteur CTRA/CIRA - " "Organisation (nom)", "archaeological_sites__reference": _("Archaeological sites (reference)"), "towns_label": _("Towns"), "operation_type__label": _("Operation type"), "cached_towns_label": _("Towns"), "cached_periods": _("Periods"), "cached_remains": _("Remains"), } BASE_SEARCH_VECTORS = [ SearchVectorConfig("abstract", "local"), SearchVectorConfig("address", "local"), SearchVectorConfig("code_patriarche"), SearchVectorConfig("comment", "local"), SearchVectorConfig("common_name"), SearchVectorConfig("common_name", "local"), SearchVectorConfig("in_charge__cached_label"), SearchVectorConfig("protagonist__cached_label"), SearchVectorConfig("official_report_number"), SearchVectorConfig("old_code"), SearchVectorConfig("operation_type__label"), SearchVectorConfig("operator_reference"), SearchVectorConfig("operator__cached_label"), SearchVectorConfig("scientist__cached_label"), SearchVectorConfig("scientific_documentation_comment", "local"), SearchVectorConfig("seizure_name"), SearchVectorConfig("drassm_code"), ] PROPERTY_SEARCH_VECTORS = [ SearchVectorConfig("full_reference"), SearchVectorConfig("short_code_patriarche"), ] INT_SEARCH_VECTORS = [ SearchVectorConfig("year"), ] M2M_SEARCH_VECTORS = [ SearchVectorConfig("periods__label", "local"), SearchVectorConfig("remains__label", "local"), SearchVectorConfig("towns__name"), SearchVectorConfig("towns__numero_insee"), ] PARENT_SEARCH_VECTORS = ["associated_file"] PARENT_ONLY_SEARCH_VECTORS = ["archaeological_sites"] ASSOCIATED = { "scientist": {("person_types", PersonType): ("head_scientist", "sra_agent")}, } CACHED_LABELS = [ "cached_label", "cached_towns_label", "cached_periods", "cached_remains", ] objects = UUIDModelManager() # alternative names of fields for searches ALT_NAMES = { "year": SearchAltName(pgettext_lazy("key for text search", "year"), "year"), "operation_code": SearchAltName( pgettext_lazy("key for text search", "operation-code"), "operation_code" ), "code_patriarche": SearchAltName( pgettext_lazy("key for text search", "patriarche"), "code_patriarche__iexact", ), "towns": SearchAltName( pgettext_lazy("key for text search", "town"), "towns__cached_label__iexact" ), "towns__areas": SearchAltName( pgettext_lazy("key for text search", "area"), "towns__areas__label__iexact" ), "parcel": SearchAltName( pgettext_lazy("key for text search", "parcel"), "parcels__cached_label__iexact", ), "towns__numero_insee__startswith": SearchAltName( pgettext_lazy("key for text search", "department"), "towns__numero_insee__startswith", ), "common_name": SearchAltName( pgettext_lazy("key for text search", "name"), "common_name__iexact" ), "address": SearchAltName( pgettext_lazy("key for text search", "address"), "address__iexact" ), "operation_type": SearchAltName( pgettext_lazy("key for text search", "type"), "operation_type__label__iexact", ), "end_date": SearchAltName( pgettext_lazy("key for text search", "is-open"), "end_date__isnull" ), "in_charge": SearchAltName( pgettext_lazy("key for text search", "scientific-monitor"), "in_charge__cached_label__iexact", ), "scientist": SearchAltName( pgettext_lazy("key for text search", "scientist"), "scientist__cached_label__iexact", ), "operator": SearchAltName( pgettext_lazy("key for text search", "operator"), "operator__cached_label__iexact", ), "remains": SearchAltName( pgettext_lazy("key for text search", "remain"), "remains__label__iexact" ), "periods": SearchAltName( pgettext_lazy("key for text search", "period"), "periods__label__iexact" ), "start_before": SearchAltName( pgettext_lazy("key for text search", "start-before"), "start_date__lte" ), "start_after": SearchAltName( pgettext_lazy("key for text search", "start-after"), "start_date__gte" ), "end_before": SearchAltName( pgettext_lazy("key for text search", "end-before"), "excavation_end_date__lte", ), "end_after": SearchAltName( pgettext_lazy("key for text search", "end-after"), "excavation_end_date__gte", ), "relation_types": SearchAltName( pgettext_lazy("key for text search", "relation-types"), "relation_types" ), "comment": SearchAltName( pgettext_lazy("key for text search", "comment"), "comment__iexact" ), "abstract": SearchAltName( pgettext_lazy("key for text search", "abstract"), "abstract__iexact" ), "scientific_documentation_comment": SearchAltName( pgettext_lazy("key for text search", "scientific-documentation-comment"), "scientific_documentation_comment__iexact", ), "record_quality_type": SearchAltName( pgettext_lazy("key for text search", "record-quality"), "record_quality_type__label__iexact", ), "report_processing": SearchAltName( pgettext_lazy("key for text search", "report-processing"), "report_processing__label__iexact", ), "virtual_operation": SearchAltName( pgettext_lazy("key for text search", "virtual-operation"), "virtual_operation", ), "archaeological_sites": SearchAltName( pgettext_lazy("key for text search", "site"), "archaeological_sites__cached_label__icontains", ), "documentation_received": SearchAltName( pgettext_lazy("key for text search", "documentation-received"), "documentation_received", ), "documentation_deadline_before": SearchAltName( pgettext_lazy("key for text search", "documentation-deadline-before"), "documentation_deadline__lte", ), "documentation_deadline_after": SearchAltName( pgettext_lazy("key for text search", "documentation-deadline-after"), "documentation_deadline__gte", ), "finds_received": SearchAltName( pgettext_lazy("key for text search", "finds-received"), "finds_received" ), "has_finds": SearchAltName( pgettext_lazy("key for text search", "has-finds"), "context_record__base_finds", ), "finds_deadline_before": SearchAltName( pgettext_lazy("key for text search", "finds-deadline-before"), "finds_deadline__lte", ), "finds_deadline_after": SearchAltName( pgettext_lazy("key for text search", "finds-deadline-after"), "finds_deadline__gte", ), "drassm_code": SearchAltName( pgettext_lazy("key for text search", "code-drassm"), "drassm_code__iexact" ), } ALT_NAMES.update(BaseHistorizedItem.ALT_NAMES) ALT_NAMES.update(DocumentItem.ALT_NAMES) QA_EDIT = QuickAction( url="operation-qa-bulk-update", icon_class="fa fa-pencil", text=_("Bulk update"), target="many", rights=["change_operation", "change_own_operation"], ) QA_LOCK = QuickAction( url="operation-qa-lock", icon_class="fa fa-lock", text=_("Lock/Unlock"), target="many", rights=["change_operation", "change_own_operation"], ) QUICK_ACTIONS = [ QA_EDIT, QA_LOCK, QuickAction( url="operation-qa-duplicate", icon_class="fa fa-clone", text=_("Duplicate"), target="one", rights=["change_operation", "change_own_operation"], ), ] UP_MODEL_QUERY = { "site": (pgettext_lazy("key for text search", "site"), "cached_label"), "file": (pgettext_lazy("key for text search", "file"), "cached_label"), } RELATIVE_SESSION_NAMES = [ ("file", "associated_file__pk"), ("site", "archaeological_sites__pk"), ] POST_PROCESS_REQUEST = { "towns__numero_insee__startswith": "_get_department_code", } DOWN_MODEL_UPDATE = ["context_record"] HISTORICAL_M2M = [ "remains", "towns", "periods", ] # fields definition uuid = models.UUIDField(default=uuid.uuid4) creation_date = models.DateField(_("Creation date"), default=datetime.date.today) end_date = models.DateField(_("Closing date"), null=True, blank=True) start_date = models.DateField(_("Start date"), null=True, blank=True) excavation_end_date = models.DateField( _("Excavation end date"), null=True, blank=True ) report_delivery_date = models.DateField( _("Report delivery date"), null=True, blank=True ) scientist = models.ForeignKey( Person, blank=True, null=True, verbose_name=_("In charge scientist"), on_delete=models.SET_NULL, related_name="operation_scientist_responsability", ) operator = models.ForeignKey( Organization, blank=True, null=True, related_name="operator", verbose_name=_("Operator"), on_delete=models.SET_NULL, ) in_charge = models.ForeignKey( Person, blank=True, null=True, verbose_name=_("In charge"), on_delete=models.SET_NULL, related_name="operation_responsability", ) collaborators = models.ManyToManyField( Person, blank=True, verbose_name=_("Collaborators"), related_name="operation_collaborator", ) year = models.IntegerField(_("Year"), null=True, blank=True) operation_code = models.IntegerField(_("Numeric reference"), null=True, blank=True) associated_file = models.ForeignKey( "archaeological_files.File", related_name="operations", verbose_name=_("File"), on_delete=models.SET_NULL, blank=True, null=True, ) operation_type = models.ForeignKey( OperationType, related_name="+", verbose_name=_("Operation type") ) surface = models.FloatField(_("Surface (m2)"), blank=True, null=True) remains = models.ManyToManyField( "RemainType", verbose_name=_("Remains"), blank=True ) towns = models.ManyToManyField( Town, verbose_name=_("Towns"), related_name="operations" ) cost = models.IntegerField(_("Cost (euros)"), blank=True, null=True) # preventive periods = models.ManyToManyField(Period, verbose_name=_("Periods"), blank=True) # preventive scheduled_man_days = models.IntegerField( _("Scheduled man-days"), blank=True, null=True ) # preventive optional_man_days = models.IntegerField( _("Optional man-days"), blank=True, null=True ) # preventive effective_man_days = models.IntegerField( _("Effective man-days"), blank=True, null=True ) report_processing = models.ForeignKey( ReportState, verbose_name=_("Report processing"), on_delete=models.SET_NULL, blank=True, null=True, ) old_code = models.CharField(_("Old code"), max_length=200, null=True, blank=True) ## fr code_patriarche = models.TextField( "Code PATRIARCHE", blank=True, default="", unique=True ) # preventive fnap_financing = models.FloatField("Financement FNAP (%)", blank=True, null=True) # preventive fnap_cost = models.IntegerField("Financement FNAP (€)", blank=True, null=True) # preventive diag zoning_prescription = models.NullBooleanField( _("Prescription on zoning"), blank=True, null=True ) # preventive diag large_area_prescription = models.NullBooleanField( _("Prescription on large area"), blank=True, null=True ) geoarchaeological_context_prescription = models.NullBooleanField( _("Prescription on geoarchaeological context"), blank=True, null=True ) # preventive diag cira_rapporteur = models.ForeignKey( Person, related_name="cira_rapporteur", null=True, blank=True, on_delete=models.SET_NULL, verbose_name="Rapporteur CTRA/CIRA", ) negative_result = models.NullBooleanField( "Résultat considéré comme négatif", blank=True, null=True ) cira_date = models.DateField("Date avis CTRA/CIRA", null=True, blank=True) eas_number = models.CharField( "Numéro de l'EA", max_length=20, null=True, blank=True ) ## end fr operator_reference = models.CharField( _("Operator reference"), max_length=20, null=True, blank=True ) common_name = models.TextField(_("Generic name"), blank=True, default="") address = models.TextField(_("Address / Locality"), blank=True, default="") comment = models.TextField(_("Comment"), blank=True, default="") scientific_documentation_comment = models.TextField( _("Comment about scientific documentation"), blank=True, default="" ) documents = models.ManyToManyField( Document, related_name="operations", verbose_name=_("Documents"), blank=True ) main_image = models.ForeignKey( Document, related_name="main_image_operations", on_delete=models.SET_NULL, verbose_name=_("Main image"), blank=True, null=True, ) cached_label = models.CharField( _("Cached name"), max_length=500, help_text=_("Generated automatically - do not edit"), null=True, blank=True, db_index=True, ) archaeological_sites = models.ManyToManyField( ArchaeologicalSite, verbose_name=_("Archaeological sites"), blank=True, related_name="operations", ) top_sites = models.ManyToManyField( ArchaeologicalSite, verbose_name=_("Sites for which this operation is top operation"), related_name="top_operations", blank=True, ) virtual_operation = models.BooleanField( _("Virtual operation"), default=False, help_text=_( "If checked, it means that this operation have not been " "officialy registered." ), ) record_quality_type = models.ForeignKey( RecordQualityType, verbose_name=_("Record quality"), on_delete=models.SET_NULL, null=True, blank=True, ) abstract = models.TextField(_("Abstract"), blank=True, default="") documentation_deadline = models.DateField( _("Deadline for submission of the documentation"), blank=True, null=True ) documentation_received = models.NullBooleanField( _("Documentation received"), blank=True, null=True ) finds_deadline = models.DateField( _("Deadline for submission of the finds"), blank=True, null=True ) finds_received = models.NullBooleanField(_("Finds received"), blank=True, null=True) # underwater drassm_code = models.CharField( _("DRASSM code"), max_length=100, null=True, blank=True ) # judiciary seizure_name = models.TextField(_("Seizure name"), blank=True, default="") official_report_number = models.TextField( _("Official report number"), blank=True, default="" ) protagonist = models.ForeignKey( Person, verbose_name=_("Name of the protagonist"), blank=True, null=True, related_name="operation_protagonist", ) applicant_authority = models.ForeignKey( Organization, verbose_name=_("Applicant authority"), blank=True, null=True, related_name="operation_applicant_authority", ) minutes_writer = models.ForeignKey( Person, verbose_name=_("Writer of the minutes"), blank=True, null=True, related_name="minutes_writer", ) cached_towns_label = models.TextField( _("Cached town label"), blank=True, default="", help_text=_("Generated automatically - do not edit"), ) cached_periods = models.TextField( _("Cached periods label"), blank=True, default="", help_text=_("Generated automatically - do not edit"), ) cached_remains = models.TextField( _("Cached remains label"), blank=True, default="", help_text=_("Generated automatically - do not edit"), ) history = HistoricalRecords(bases=[HistoryModel]) class Meta: verbose_name = _("Operation") verbose_name_plural = _("Operations") permissions = ( ("view_operation", "Can view all Operations"), ("view_own_operation", "Can view own Operation"), ("add_own_operation", "Can add own Operation"), ("change_own_operation", "Can change own Operation"), ("delete_own_operation", "Can delete own Operation"), ("close_operation", "Can close Operation"), ) ordering = ("cached_label",) indexes = [ GinIndex(fields=["data"]), ] def natural_key(self): return (self.uuid,) @classmethod def get_owns( cls, user, menu_filtr=None, limit=None, values=None, get_short_menu_class=None ): replace_query = None if menu_filtr and "file" in menu_filtr: replace_query = Q(associated_file=menu_filtr["file"]) owns = super(Operation, cls).get_owns( user, replace_query=replace_query, limit=limit, values=values, get_short_menu_class=get_short_menu_class, ) return cls._return_get_owns(owns, values, get_short_menu_class) def __str__(self): return self.cached_label or "" DOC_VALUES = [ ("context_records", _("List of associated context records")), ("containers", _("List of associated containers")), ] def get_containers_values(self, filtr, exclude) -> list: # Container value # list Container = apps.get_model("archaeological_warehouse", "Container") containers = [] q = Container.objects.filter( finds__base_finds__context_record__operation=self ).distinct("index") exclude += ["operation", "context_record"] for c in q.order_by("index").all(): containers.append(c.get_values(filtr=filtr, exclude=exclude)) return containers def get_values(self, prefix="", no_values=False, filtr=None, **kwargs): values = super(Operation, self).get_values( prefix=prefix, no_values=no_values, filtr=filtr, **kwargs ) values = get_values_town_related(self, prefix, values, filtr=filtr) exclude = kwargs.get("exclude", []) if prefix: return values if ( not filtr or "context_records" in filtr ) and "context_records" not in exclude: kwargs["no_base_finds"] = False values["context_records"] = [ cr.get_values(prefix=prefix, no_values=True, filtr=None, **kwargs) for cr in self.context_record.all() ] if (not filtr or "containers" in filtr) and "context_records" not in exclude: values["containers"] = self.get_containers_values(filtr, exclude) return values def public_representation(self): dct = super(Operation, self).public_representation() year = ( self.year if self.year and self.year != settings.ISHTAR_DEFAULT_YEAR else None ) dct.update( { "year": year, "common-name": self.common_name, "operation-type": self.operation_type and str(self.operation_type), "remains": [str(r) for r in self.remains.all()], "periods": [str(p) for p in self.periods.all()], "excavation-start-date": self.start_date, "excavation-end-date": self.excavation_end_date, "address": self.address, "comment": self.comment, } ) return dct @classmethod def _get_department_code(cls, value): if not settings.ISHTAR_DPTS: return "" for k, v in settings.ISHTAR_DPTS: if v.lower() == value: return k return "" @property def short_class_name(self): return _("OPE") @property def external_id(self): return self.code_patriarche @property def short_label(self): if settings.COUNTRY == "fr": return self.reference return str(self) @property def relation_label(self): return self.short_label @property def name(self): return self.common_name @property def show_url(self): return reverse("show-operation", args=[self.pk, ""]) def towns_codes(self): return [town.label_with_areas for town in self.towns.all()] def towns_label(self): return " - ".join(self.towns_codes()) def has_finds(self): from archaeological_finds.models import BaseFind return BaseFind.objects.filter(context_record__operation=self).count() def finds(self): from archaeological_finds.models import BaseFind return BaseFind.objects.filter(context_record__operation=self) def get_reference(self, full=False): profile = get_current_profile() ref = "" if self.code_patriarche: ref = (profile.operation_prefix or "") + str(self.code_patriarche) if not full: return ref if self.year and self.operation_code: if ref: ref += " - " ref += profile.default_operation_prefix or "" ref += "-".join((str(self.year), str(self.operation_code))) return ref or "00" @property def short_code_patriarche(self): if not self.code_patriarche: return "" if isinstance(self.code_patriarche, int): self.code_patriarche = str(self.code_patriarche) profile = get_current_profile() if not profile.operation_region_code or not self.code_patriarche.startswith( profile.operation_region_code ): return self.code_patriarche return self.code_patriarche[len(profile.operation_region_code) :] @property def reference(self): return self.get_reference() @property def full_reference(self): return self.get_reference(full=True) @property def report_delivery_delay(self): return None # q = self.source.filter(source_type__txt_idx__endswith='_report') # if not self.report_delivery_date or not q.count(): # return None def _generate_cached_label(self): items = [self.get_town_label(), self.get_reference(full=True)] if self.common_name: items.append(self.common_name) cached_label = settings.JOINT.join(items) return cached_label def _generate_cached_towns_label(self): return self.towns_label() or "-" def _generate_cached_remains(self): return " & ".join([str(remain) for remain in self.remains.all()]) or "-" def _generate_cached_periods(self): return " & ".join([str(period) for period in self.periods.all()]) or "-" def _get_associated_cached_labels(self): return list(self.context_record.all()) def _cached_labels_bulk_update(self): self.context_record.model.cached_label_bulk_update(operation_id=self.pk) return True def _get_base_image_path(self): return "{}/{}/{}".format(self.SLUG, self.year, self.reference) def get_town_label(self): lbl = str(_("Intercommunal")) if self.towns.count() == 1: lbl = self.towns.values("name").all()[0]["name"] return lbl def get_department(self): if not self.towns.count(): return "00" return self.towns.values("numero_insee").all()[0]["numero_insee"][:2] def grouped_parcels(self): return Parcel.grouped_parcels(list(self.parcels.distinct().all())) def render_parcels(self): return Parcel.render_parcels(list(self.parcels.distinct().all())) def get_town_centroid(self): q = ( self.towns.filter(center__isnull=False) .annotate(centroid=Centroid(Union("center"))) .all() ) if not q.count(): return return q.all()[0].centroid, self._meta.verbose_name def get_town_polygons(self): q = self.towns.filter(limit__isnull=False).annotate(poly=Union("limit")).all() if not q.count(): return None return q.all()[0].poly, self._meta.verbose_name def context_record_relations_q(self): from archaeological_context_records.models import RecordRelations as CRRL return CRRL.objects.filter(left_record__operation=self) def context_record_docs_q(self): return Document.objects.filter(context_records__operation=self) def find_docs_q(self): return Document.objects.filter( finds__base_finds__context_record__operation=self ) def containers_q(self): from archaeological_warehouse.models import Container return Container.objects.filter( finds__base_finds__context_record__operation=self ) def get_extra_actions(self, request): """ For sheet template """ # url, base_text, icon, extra_text, extra css class, is a quick action actions = super(Operation, self).get_extra_actions(request) is_locked = self.is_locked(request.user) can_edit_operation = self.can_do(request, "change_operation") if can_edit_operation: actions += [ ( reverse("operation-qa-duplicate", args=[self.pk]), _("Duplicate"), "fa fa-clone", "", "", True, ), ] can_add_cr = self.can_do(request, "add_contextrecord") if can_add_cr and not is_locked: actions += [ ( reverse("operation-qa-contextrecord", args=[self.pk]), _("Add context record"), "fa fa-plus", _("context record"), "", True, ), ] return actions associated_file_short_label_lbl = _("Archaeological file") full_code_patriarche_lbl = _("Code patriarche") @property def associated_file_short_label(self): if not self.associated_file: return "" return self.associated_file.short_label @classmethod def get_available_operation_code(cls, year=None): max_val = cls.objects.filter(year=year).aggregate(Max("operation_code"))[ "operation_code__max" ] return (max_val + 1) if max_val else 1 year_index_lbl = _("Operation code") @property def year_index(self): if not self.operation_code: return "" lbl = str(self.operation_code) year = self.year or 0 profile = get_current_profile() lbl = (profile.default_operation_prefix or "") + "%d-%s%s" % ( year, (3 - len(lbl)) * "0", lbl, ) return lbl @property def full_code_patriarche(self): if not self.code_patriarche: return "" profile = get_current_profile() return (profile.operation_prefix or "") + self.code_patriarche def clean(self): if not self.operation_code: return objs = self.__class__.objects.filter( year=self.year, operation_code=self.operation_code ) if self.pk: objs = objs.exclude(pk=self.pk) if objs.count(): raise ValidationError( _("This operation code already exists for " "this year") ) @property def surface_ha(self): if self.surface: return self.surface / 10000.0 @property def cost_by_m2(self): if not self.surface or not self.cost: return return round(float(self.cost) / self.surface, 2) @classmethod def _get_query_owns_dicts(cls, ishtaruser, no_rel=False): profile = ishtaruser.current_profile town_ids = [] if profile: town_ids = [town["pk"] for town in profile.query_towns.values("pk").all()] query_owns = [ { "in_charge": ishtaruser.person, "scientist": ishtaruser.person, "collaborators__pk": ishtaruser.person.pk, "history_creator": ishtaruser.user_ptr, "towns__pk__in": town_ids, }, ] if not no_rel: query_owns[0][ "archaeological_sites__collaborators__pk" ] = ishtaruser.person.pk return query_owns @classmethod def get_query_owns(cls, ishtaruser): from archaeological_warehouse.models import Warehouse q = ( cls._construct_query_own( "context_record__base_finds__find__container__responsible__", Warehouse._get_query_owns_dicts(ishtaruser), ) | cls._construct_query_own( "context_record__base_finds__find__container__location__", Warehouse._get_query_owns_dicts(ishtaruser), ) | cls._construct_query_own( "context_record__base_finds__find__basket__", [{"shared_with": ishtaruser, "shared_write_with": ishtaruser}], ) | cls._construct_query_own("", cls._get_query_owns_dicts(ishtaruser)) ) return q def is_active(self): return not bool(self.end_date) @property def nb_parcels(self): _("Number of parcels") nb = 0 if self.associated_file: nb = self.associated_file.parcels.count() if not nb: nb = self.parcels.count() return nb @property def nb_acts(self, update=False): _("Number of administrative acts") return self._get_or_set_stats("_nb_acts", update) def _nb_acts(self): return self.administrative_act.count() @property def nb_indexed_acts(self, update=False): _("Number of indexed administrative acts") return self._get_or_set_stats("_nb_indexed_acts", update) def _nb_indexed_acts(self): return self.administrative_act.filter(act_type__indexed=True).count() @property def nb_context_records(self, update=False): _("Number of context records") return self._get_or_set_stats("_nb_context_records", update) def _nb_context_records(self): return self.context_record.count() @property def nb_context_records_by_type(self, update=False): return self._get_or_set_stats( "_nb_context_records_by_type", update, expected_type=list ) def _nb_context_records_by_type(self): nbs = [] q = ( self.context_record.values("unit", "unit__label") .distinct() .order_by("label") ) for res in q.all(): nbs.append( ( str(res["unit__label"] or "-"), self.context_record.filter(unit=res["unit"]).count(), ) ) return list(set(nbs)) @property def nb_context_records_by_periods(self, update=False): return self._get_or_set_stats( "_nb_context_records_by_periods", update, expected_type=list ) def _nb_context_records_by_periods(self): nbs = [] q = ( self.context_record.values("datings__period", "datings__period__label") .distinct() .order_by("datings__period__order") ) for res in q.all(): nbs.append( ( str(res["datings__period__label"] or "-"), self.context_record.filter( datings__period=res["datings__period"] ).count(), ) ) return nbs @property def nb_finds(self, update=False): _("Number of finds") return self._get_or_set_stats("_nb_finds", update) def _nb_finds(self): from archaeological_finds.models import Find q = Find.objects.filter( base_finds__context_record__operation=self, upstream_treatment_id__isnull=True, ).distinct() return q.count() @property def nb_finds_by_material_type(self, update=False): return self._get_or_set_stats( "_nb_finds_by_material_type", update, expected_type=list ) def _nb_finds_by_material_type(self): from archaeological_finds.models import Find nbs = [] q = ( Find.objects.filter( upstream_treatment_id__isnull=True, base_finds__context_record__operation=self, ) .distinct() .values("material_types__pk", "material_types__label") .distinct() .order_by("material_types__label") ) for res in q.all(): nbs.append( ( str(res["material_types__label"] or "-"), Find.objects.filter( base_finds__context_record__operation=self, upstream_treatment_id__isnull=True, material_types__pk=res["material_types__pk"], ).count(), ) ) return nbs @property def nb_finds_by_types(self, update=False): return self._get_or_set_stats("_nb_finds_by_types", update, expected_type=list) def _nb_finds_by_types(self): from archaeological_finds.models import Find nbs = [] q = ( Find.objects.filter(base_finds__context_record__operation=self) .values("object_types", "object_types__label") .distinct() .order_by("object_types__label") ) for res in q.all(): label = str(res["object_types__label"]) if label == "None": label = str(_("No type")) nbs.append( ( label, Find.objects.filter( base_finds__context_record__operation=self, upstream_treatment_id__isnull=True, object_types=res["object_types"], ).count(), ) ) return nbs @property def nb_finds_by_periods(self, update=False): return self._get_or_set_stats( "_nb_finds_by_periods", update, expected_type=list ) def _nb_finds_by_periods(self): from archaeological_finds.models import Find nbs = [] q = ( Find.objects.filter(base_finds__context_record__operation=self) .values("datings__period", "datings__period__label") .distinct() .order_by("datings__period__order") ) for res in q.all(): nbs.append( ( str(res["datings__period__label"] or "-"), Find.objects.filter( base_finds__context_record__operation=self, upstream_treatment_id__isnull=True, datings__period=res["datings__period"], ).count(), ) ) return nbs @property def nb_documents(self, update=False): _("Number of sources") return self._get_or_set_stats("_nb_documents", update) def _nb_documents(self): return ( self.documents.count() + Document.objects.filter(context_records__operation=self).count() + Document.objects.filter( finds__base_finds__context_record__operation=self ).count() ) @property def nb_documents_by_types(self, update=False): return self._get_or_set_stats( "_nb_documents_by_types", update, expected_type=list ) def _nb_documents_by_types(self): docs = {} qs = [ self.documents, Document.objects.filter(context_records__operation=self), Document.objects.filter( finds__upstream_treatment_id__isnull=True, finds__base_finds__context_record__operation=self, ), ] for q in qs: for st in set(q.values_list("source_type_id", flat=True).distinct()): if st not in docs: docs[st] = 0 docs[st] += q.filter(source_type_id=st).count() docs = [ (str(SourceType.objects.get(pk=k)) if k else str(_("No type")), docs[k]) for k in docs ] return list(sorted(docs, key=lambda x: x[0])) @property def nb_stats_finds_by_ue(self, update=False): return self._get_or_set_stats("_nb_stats_finds_by_ue", update) def _nb_stats_finds_by_ue(self): _("Mean") res, finds = {}, [] for cr in self.context_record.all(): finds.append(cr.base_finds.count()) if not finds: return res res["mean"] = float(sum(finds)) / max(len(finds), 1) res["min"] = min(finds) res["max"] = max(finds) res["mode"] = " ; ".join([str(m) for m in mode(finds)]) return res def save(self, *args, **kwargs): # put a default year if start_date is defined if self.start_date and not self.year: self.year = self.start_date.year if self.operation_code is None: self.operation_code = self.get_available_operation_code(self.year) if hasattr(self, "code_patriarche"): self.code_patriarche = self.code_patriarche or "" item = super(Operation, self).save(*args, **kwargs) self.clean_parcel_duplicates() return item m2m_changed.connect(force_cached_label_changed, sender=Operation.towns.through) m2m_changed.connect(document_attached_changed, sender=Operation.documents.through) for attr in Operation.HISTORICAL_M2M: m2m_changed.connect( m2m_historization_changed, sender=getattr(Operation, attr).through ) def operation_post_save(sender, **kwargs): if not kwargs["instance"]: return post_save_geo(sender=sender, **kwargs) operation = kwargs["instance"] operation.skip_history_when_saving = True if operation.fnap_financing and operation.cost: fnap_cost = int(float(operation.cost) / 100 * operation.fnap_financing) if not operation.fnap_cost or operation.fnap_cost != fnap_cost: operation.fnap_cost = fnap_cost operation.save() elif operation.fnap_cost and operation.cost: fnap_percent = float(operation.fnap_cost) * 100 / operation.cost if operation.fnap_financing != fnap_percent: operation.fnap_financing = fnap_percent operation.save() cached_label_changed(sender, **kwargs) if operation.associated_file: operation.associated_file.update_short_menu_class() # manage parcel association for parcel in operation.parcels.all(): parcel.copy_to_file() # external id update for parcel in operation.parcels.all(): parcel.update_external_id(save=True) for cr in operation.context_record.all(): cr.update_external_id(save=True) post_save.connect(operation_post_save, sender=Operation) def operation_town_m2m_changed(sender, **kwargs): operation = kwargs.get("instance", None) if not operation: return operation._prevent_loop = False operation.regenerate_all_ids() cached_label_changed(sender, **kwargs) m2m_changed.connect( operation_town_m2m_changed, sender=Operation.towns.through ) class RelationType(GeneralRelationType): class Meta: verbose_name = _("Operation relation type") verbose_name_plural = _("Operation relation types") ordering = ("order", "label") class OperationRecordRelationManager(models.Manager): def get_by_natural_key(self, left_record, right_record, relation_type): return self.get( left_record__uuid=left_record, right_record__uuid=right_record, relation_type__txt_idx=relation_type, ) class RecordRelations(GeneralRecordRelations, models.Model): MAIN_ATTR = "left_record" left_record = models.ForeignKey(Operation, related_name="right_relations") right_record = models.ForeignKey(Operation, related_name="left_relations") relation_type = models.ForeignKey(RelationType) objects = OperationRecordRelationManager() class Meta: verbose_name = _("Operation record relation") verbose_name_plural = _("Operation record relations") ordering = ( "left_record__cached_label", "relation_type", "right_record__cached_label", ) permissions = [ ("view_operationrelation", "Can view all Operation relations"), ] def natural_key(self): return ( self.left_record.uuid, self.right_record.uuid, self.relation_type.txt_idx, ) post_delete.connect(post_delete_record_relation, sender=RecordRelations) class OperationByDepartment(models.Model): """ Database view for dashboard """ CREATE_SQL = """ CREATE VIEW operation_department (id, department_id, operation_id) as select town."id", town."departement_id", operation_towns."operation_id" from ishtar_common_town town inner join archaeological_operations_operation_towns operation_towns on operation_towns."town_id"=town."id" order by town."departement_id"; CREATE RULE operation_department_delete AS ON DELETE TO operation_department DO INSTEAD(); """ DELETE_SQL = """ DROP VIEW IF EXISTS operation_department; """ operation = models.ForeignKey(Operation, verbose_name=_("Operation")) department = models.ForeignKey( Department, verbose_name=_("Department"), on_delete=models.DO_NOTHING, blank=True, null=True, ) class Meta: managed = False db_table = "operation_department" class ActType(GeneralType): TYPE = ( ("F", _("Archaeological file")), ("O", _("Operation")), ("TF", _("Treatment request")), ("T", _("Treatment")), ) SERIALIZATION_EXCLUDE = ["associated_template"] intented_to = models.CharField(_("Intended to"), max_length=2, choices=TYPE) code = models.CharField(_("Code"), max_length=10, blank=True, null=True) associated_template = models.ManyToManyField( DocumentTemplate, blank=True, verbose_name=_("Associated template"), related_name="acttypes", ) indexed = models.BooleanField(_("Indexed"), default=False) class Meta: verbose_name = _("Act type") verbose_name_plural = _("Act types") ordering = ("label",) post_save.connect(post_save_cache, sender=ActType) post_delete.connect(post_save_cache, sender=ActType) class AdministrativeAct(DocumentItem, BaseHistorizedItem, OwnPerms, ValueGetter): TABLE_COLS = [ "full_ref", "signature_date__year", "index", "act_type", "act_object", "signature_date", "associated_file__cached_label", "operation__cached_label", "towns_label", ] SLUG = "administrativeact" TABLE_COLS_FILE = [ "full_ref", "year", "index", "act_type", "act_object", "associated_file", "towns_label", ] TABLE_COLS_OPE = [ "full_ref", "year", "index", "act_type", "operation", "act_object", "towns_label", ] if settings.COUNTRY == "fr": TABLE_COLS.append("departments_label") TABLE_COLS_FILE.append("departments_label") TABLE_COLS_OPE.append("departments_label") # search parameters DATED_FIELDS = BaseHistorizedItem.DATED_FIELDS + ["signature_date__lte", "signature_date__gte"] ASSOCIATED_MODELS = [ ("File", "associated_file"), (Person, "associated_file__general_contractor"), ] EXTRA_REQUEST_KEYS = { "act_object": "act_object__icontains", "act_type__intented_to": "act_type__intented_to", "associated_file__general_contractor__attached_to": "associated_file__general_contractor__attached_to__pk", "associated_file__name": "associated_file__name__icontains", "associated_file__operations__code_patriarche": "associated_file__operations__code_patriarche", "associated_file__permit_reference": "associated_file__permit_reference__icontains", "associated_file__towns": "associated_file__towns__pk", "associated_file__towns__numero_insee__startswith": "associated_file__towns__numero_insee__startswith", "indexed": "index__isnull", "history_creator": "history_creator__ishtaruser__person__pk", "history_modifier": "history_modifier__ishtaruser__person__pk", "operation__code_patriarche": "operation__code_patriarche", "operation__towns": "operation__towns__pk", "operation__towns__numero_insee__startswith": "operation__towns__numero_insee__startswith", "parcel_0": ( "associated_file__parcels__section", "operation__parcels__section", "operation__associated_file__parcels__section", ), "parcel_1": ( "associated_file__parcels__parcel_number" "operation__parcels__parcel_number", "operation__associated_file__parcels__parcel_number", ), "parcel_2": ( "associated_file__parcels__public_domain", "operation__parcels__public_domain", "operation__associated_file__parcels__public_domain", ), "signature_date_before": "signature_date__lte", "signature_date_after": "signature_date__gte", "year": "signature_date__year", } REVERSED_BOOL_FIELDS = [ "index__isnull", "documents__image__isnull", "documents__associated_url__isnull", "documents__associated_file__isnull", ] RELATIVE_SESSION_NAMES = [ ("operation", "operation__pk"), ("file", "associated_file__pk"), ] COL_LABELS = { "full_ref": _("Ref."), "signature_date__year": _("Year"), "associated_file__cached_label": _("Archaeological file"), "operation__cached_label": _("Operation"), } BASE_SEARCH_VECTORS = [ SearchVectorConfig("act_type__label"), SearchVectorConfig("act_object", "local"), SearchVectorConfig("towns_label"), ] INT_SEARCH_VECTORS = [ SearchVectorConfig("year"), SearchVectorConfig("index"), ] PARENT_SEARCH_VECTORS = [ "operator", "scientist", "signatory", "associated_file", "operation", "treatment_file", "treatment", ] # alternative names of fields for searches ALT_NAMES = { "year": SearchAltName( pgettext_lazy("key for text search", "year"), "signature_date__year" ), "index": SearchAltName(pgettext_lazy("key for text search", "index"), "index"), "ref_sra": SearchAltName( pgettext_lazy("key for text search", "other-ref"), "ref_sra__iexact" ), "operation__code_patriarche": SearchAltName( pgettext_lazy("key for text search", "patriarche"), "operation__code_patriarche", ), "act_type": SearchAltName( pgettext_lazy("key for text search", "type"), "act_type__label__iexact" ), "indexed": SearchAltName( pgettext_lazy("key for text search", "indexed"), "index__isnull" ), "operation__towns": SearchAltName( pgettext_lazy("key for text search", "operation-town"), "operation__towns__cached_label__iexact", ), "associated_file__towns": SearchAltName( pgettext_lazy("key for text search", "file-town"), "associated_file__towns__cached_label__iexact", ), "parcel": SearchAltName( pgettext_lazy("key for text search", "parcel"), ( "associated_file__parcels__cached_label__iexact", "operation__parcels__cached_label__iexact", "operation__associated_file__parcels__cached_label__iexact", ), ), "operation__towns__numero_insee__startswith": SearchAltName( pgettext_lazy("key for text search", "operation-department"), "operation__towns__numero_insee__startswith", ), "associated_file__towns__numero_insee__startswith": SearchAltName( pgettext_lazy("key for text search", "file-department"), "associated_file__towns__numero_insee__startswith", ), "act_object": SearchAltName( pgettext_lazy("key for text search", "object"), "act_object__icontains" ), "signature_date_before": SearchAltName( pgettext_lazy("key for text search", "signature-before"), "signature_date__lte", ), "signature_date_after": SearchAltName( pgettext_lazy("key for text search", "signature-after"), "signature_date__gte", ), "associated_file__name": SearchAltName( pgettext_lazy("key for text search", "file-name"), "associated_file__name__icontains", ), "associated_file__general_contractor": SearchAltName( pgettext_lazy("key for text search", "general-contractor"), "associated_file__general_contractor__cached_label__iexact", ), "associated_file__general_contractor__attached_to": SearchAltName( pgettext_lazy("key for text search", "general-contractor-organization"), "associated_file__general_contractor__attached_to" "__cached_label__iexact", ), "associated_file__numeric_reference": SearchAltName( pgettext_lazy("key for text search", "file-reference"), "associated_file__numeric_reference", ), "associated_file__year": SearchAltName( pgettext_lazy("key for text search", "file-year"), "associated_file__year" ), "associated_file__internal_reference": SearchAltName( pgettext_lazy("key for text search", "file-other-reference"), "associated_file__internal_reference__iexact", ), "associated_file__in_charge": SearchAltName( pgettext_lazy("key for text search", "file-in-charge"), "associated_file__in_charge__cached_label__iexact", ), "associated_file__permit_reference": SearchAltName( pgettext_lazy("key for text search", "file-permit-reference"), "associated_file__permit_reference__iexact", ), "treatment__name": SearchAltName( pgettext_lazy("key for text search", "treatment-name"), "treatment__label__icontains", ), "treatment__other_reference": SearchAltName( pgettext_lazy("key for text search", "treatment-reference"), "treatment__other_reference__icontains", ), "treatment__year": SearchAltName( pgettext_lazy("key for text search", "treatment-year"), "treatment__year" ), "treatment__index": SearchAltName( pgettext_lazy("key for text search", "treatment-index"), "treatment__index" ), "treatment__treatment_types": SearchAltName( pgettext_lazy("key for text search", "treatment-type"), "treatment__treatment_types__label__iexact", ), "treatment_file__name": SearchAltName( pgettext_lazy("key for text search", "treatment-file-name"), "treatment_file__name__icontains", ), "treatment_file__internal_reference": SearchAltName( pgettext_lazy("key for text search", "treatment-file-reference"), "treatment_file__internal_reference__icontains", ), "treatment_file__year": SearchAltName( pgettext_lazy("key for text search", "treatment-file-year"), "treatment_file__year", ), "treatment_file__index": SearchAltName( pgettext_lazy("key for text search", "treatment-file-index"), "treatment_file__index", ), "treatment_file__type": SearchAltName( pgettext_lazy("key for text search", "treatment-file-type"), "treatment_file__type__label__iexact", ), } ALT_NAMES.update(BaseHistorizedItem.ALT_NAMES) ALT_NAMES.update(DocumentItem.ALT_NAMES) UP_MODEL_QUERY = {} POST_PROCESS_REQUEST = { "operation__towns__numero_insee__startswith": "_get_department_code", "associated_file__towns__numero_insee__startswith": "_get_department_code", } # fields act_type = models.ForeignKey(ActType, verbose_name=_("Act type")) in_charge = models.ForeignKey( Person, blank=True, null=True, related_name="adminact_operation_in_charge", verbose_name=_("Scientific monitor"), on_delete=models.SET_NULL, ) index = models.IntegerField(verbose_name=_("Index"), blank=True, null=True) operator = models.ForeignKey( Organization, blank=True, null=True, verbose_name=_("Archaeological preventive operator"), related_name="adminact_operator", on_delete=models.SET_NULL, ) scientist = models.ForeignKey( Person, blank=True, null=True, related_name="adminact_scientist", on_delete=models.SET_NULL, verbose_name=_("Scientist in charge"), ) signatory = models.ForeignKey( Person, blank=True, null=True, related_name="signatory", verbose_name=_("Signatory"), on_delete=models.SET_NULL, ) operation = models.ForeignKey( Operation, blank=True, null=True, related_name="administrative_act", verbose_name=_("Operation"), ) associated_file = models.ForeignKey( "archaeological_files.File", blank=True, null=True, related_name="administrative_act", verbose_name=_("Archaeological file"), ) treatment_file = models.ForeignKey( "archaeological_finds.TreatmentFile", blank=True, null=True, related_name="administrative_act", verbose_name=_("Treatment request"), ) treatment = models.ForeignKey( "archaeological_finds.Treatment", blank=True, null=True, related_name="administrative_act", verbose_name=_("Treatment"), ) signature_date = models.DateField(_("Signature date"), blank=True, null=True) year = models.IntegerField(_("Year"), blank=True, null=True) act_object = models.TextField(_("Object"), blank=True, default="") if settings.COUNTRY == "fr": ref_sra = models.CharField( "Référence SRA", max_length=15, blank=True, null=True ) departments_label = models.TextField( _("Departments"), blank=True, default="", help_text=_("Cached values get from associated departments"), ) towns_label = models.TextField( _("Towns"), blank=True, default="", help_text=_("Cached values get from associated towns"), ) documents = models.ManyToManyField( Document, related_name="administrativeacts", verbose_name=_("Documents"), blank=True, ) main_image = models.ForeignKey( Document, related_name="main_image_administrativeacts", on_delete=models.SET_NULL, verbose_name=_("Main image"), blank=True, null=True, ) history = HistoricalRecords() _prefix = "adminact_" class Meta: ordering = ("year", "signature_date", "index", "act_type") verbose_name = _("Administrative act") verbose_name_plural = _("Administrative acts") permissions = ( ("view_administrativeact", "Can view all Administrative acts"), ("view_own_administrativeact", "Can view own Administrative act"), ("add_own_administrativeact", "Can add own Administrative act"), ("change_own_administrativeact", "Can change own Administrative act"), ("delete_own_administrativeact", "Can delete own Administrative act"), ) indexes = [ GinIndex(fields=["data"]), ] @property def DELETE_URL(self): if self.operation: return "delete-administrativeact-operation" if self.associated_file: return "delete-administrativeact-file" if self.treatment: return "delete-administrativeact-treatment" if self.treatment_file: return "delete-administrativeact-treatmentfile" def __str__(self): lbl = "" if self.year: lbl = str(self.year) if self.index: lbl += "-{}".format(self.index) if lbl: lbl += " - " lbl += self.act_type.label + " - " return lbl + settings.JOINT.join( [str(item) for item in [self.related_item, self.act_object] if item] ) full_ref_lbl = _("Ref.") def _get_base_image_path(self): if self.year: return str(self.year) return "" @property def full_ref(self): lbl = [] if self.year: lbl.append(str(self.year)) if self.index: lbl.append("n°%d" % self.index) if settings.COUNTRY == "fr" and self.ref_sra: lbl.append("[%s]" % self.ref_sra) return " ".join(lbl) @property def associated_filename(self): return self.get_filename() @property def towns(self): if self.associated_file: return self.associated_file.towns.all() elif self.operation: return self.operation.towns.all() return [] @property def departments(self): if settings.COUNTRY != "fr": return "" q = None if self.associated_file: q = self.associated_file.towns.all() elif self.operation: q = self.operation.towns.all() if not q: return "" dpts = [] for town in q: dpt = town.numero_insee[:2] if dpt not in dpts: dpts.append(dpt) return ", ".join(list(sorted(dpts))) @classmethod def _get_department_code(cls, value): if not settings.ISHTAR_DPTS: return "" for k, v in settings.ISHTAR_DPTS: if v.lower() == value: return k return "" @property def related_item(self): if self.operation: return self.operation if self.associated_file: return self.associated_file if self.treatment: return self.treatment if self.treatment_file: return self.treatment_file def get_extra_templates(self, request): urlname = "generatedoc-administrativeactop" return [ (template.name, reverse(urlname, args=[self.pk, template.pk])) for template in self.act_type.associated_template.all() ] def get_filename(self): filename = self.related_item.associated_filename filename = "-".join(filename.split("-")[:-1]) # remove date if self.act_type.code: filename += "-" + self.act_type.code if self.signature_date and self.index: filename += "-%d-%d" % (self.signature_date.year, self.index) if self.signature_date: filename += "-" + self.signature_date.strftime("%Y%m%d") return filename def publish(self, template_pk=None): if not self.act_type.associated_template.count(): return if not template_pk: template = self.act_type.associated_template.all()[0] else: q = self.act_type.associated_template.filter(pk=template_pk) if not q.count(): return template = q.all()[0] return template.publish(self) def _get_index(self): if not self.index: c_index = 1 q = AdministrativeAct.objects.filter( act_type__indexed=True, signature_date__year=self.year, index__isnull=False, ).order_by("-index") if q.count(): c_index = q.all()[0].index + 1 self.index = c_index conflict = AdministrativeAct.objects.filter( act_type__indexed=True, signature_date__year=self.year, index=self.index ) if self.pk: conflict = conflict.exclude(pk=self.pk) if conflict.count(): if self.pk: raise ValidationError(_("This index already exists for " "this year")) else: self._get_index() def clean(self, *args, **kwargs): if not self.signature_date: return super(AdministrativeAct, self).clean(*args, **kwargs) self.year = self.signature_date.year if not self.act_type.indexed: return super(AdministrativeAct, self).clean(*args, **kwargs) self._get_index() super(AdministrativeAct, self).clean(*args, **kwargs) def save(self, *args, **kwargs): if settings.COUNTRY == "fr": self.departments_label = self.departments self.towns_label = ", ".join(list(sorted([str(town) for town in self.towns]))) force = False if "force" in kwargs: force = kwargs.pop("force") if self.signature_date: self.year = self.signature_date.year if self.act_type.indexed: if not force: self._get_index() else: try: self._get_index() except: pass super(AdministrativeAct, self).save(*args, **kwargs) if hasattr(self, "associated_file") and self.associated_file: self.associated_file.update_has_admin_act() self.associated_file.update_short_menu_class() updated = self.update_search_vector() if updated: self.save() def strip_zero(value): for idx, nb in enumerate(value): if nb != "0": return value[idx:] return value class Parcel(LightHistorizedItem): EXTERNAL_ID_KEY = "parcel_external_id" BASE_SEARCH_VECTORS = [ SearchVectorConfig("section"), SearchVectorConfig("parcel_number"), SearchVectorConfig("cached_label"), ] PARENT_SEARCH_VECTORS = ["operation"] objects = UUIDModelManager() uuid = models.UUIDField(default=uuid.uuid4) associated_file = models.ForeignKey( "archaeological_files.File", related_name="parcels", verbose_name=_("File"), blank=True, null=True, on_delete=models.SET_NULL, ) operation = models.ForeignKey( Operation, related_name="parcels", blank=True, null=True, verbose_name=_("Operation"), on_delete=models.SET_NULL, ) year = models.IntegerField(_("Year"), blank=True, null=True) town = models.ForeignKey(Town, related_name="parcels", verbose_name=_("Town")) section = models.CharField(_("Section"), max_length=4, null=True, blank=True) parcel_number = models.CharField( _("Parcel number"), max_length=6, null=True, blank=True ) public_domain = models.BooleanField(_("Public domain"), default=False) external_id = models.CharField( _("External ID"), max_length=100, null=True, blank=True ) auto_external_id = models.BooleanField( _("External ID is set automatically"), default=False ) address = models.TextField(_("Address - Locality"), blank=True, default="") cached_label = models.TextField( _("Cached name"), blank=True, default="", db_index=True ) class Meta: verbose_name = _("Parcel") verbose_name_plural = _("Parcels") ordering = ("town", "year", "section", "parcel_number") indexes = [ GinIndex(fields=["data"]), ] @property def short_label(self): items = [str(item) for item in [self.section, self.parcel_number, self.address] if item] if self.public_domain: items.append(str(_("Public domain"))) return settings.JOINT.join(items) def __str__(self): return self.short_label def natural_key(self): return (self.uuid,) """ def merge(self, parcel): # cannot automatically merge if self.address and parcel.address and self.address != parcel.address: return if self.external_id and parcel.external_id and \ self.external_id != parcel.external_id: return if self.year and parcel.year and \ self.year != parcel.year: return self.address = self.address or parcel.address self.external_id = self.external_id or parcel.external_id self.year = self.year or parcel.year self.save() for owner in parcel.owners.all(): owner.parcel = self owner.save() if hasattr(parcel, 'context_record'): for cr in parcel.context_record.all(): cr.parcel = self cr.save() parcel.delete() """ def _generate_cached_label(self): if self.public_domain: return "DP" return "{}{}".format(self.section or "", self.parcel_number or "") @classmethod def grouped_parcels(cls, parcels): sortkeyfn = lambda s: ( getattr(s, "town_id"), getattr(s, "section") or "", getattr(s, "year") or 0, ) parcels = sorted(parcels, key=sortkeyfn) grouped = [] for keys, parcel_grp in groupby(parcels, key=sortkeyfn): for idx, parcel in enumerate(parcel_grp): if not idx: grouped.append(parcel) grouped[-1].parcel_numbers = [] nb = "" if parcel.parcel_number: if parcel.parcel_number == "0": nb = "0" else: nb = ( "0" * (12 - len(parcel.parcel_number)) + parcel.parcel_number ) if parcel.public_domain: if nb: nb += " " nb += str(_("Public domain")) grouped[-1].parcel_numbers.append(nb) grouped[-1].parcel_numbers.sort() grouped[-1].parcel_numbers = [ strip_zero(n) for n in grouped[-1].parcel_numbers ] return grouped @classmethod def render_parcels(cls, parcels): parcels = cls.grouped_parcels(parcels) res = "" c_town, c_section = "", "" for idx, parcels in enumerate(parcels): if c_town != str(parcels.town): c_town = str(parcels.town) if idx: res += " ; " res += str(parcels.town) + " : " elif c_section: res += " / " else: # public domain res += " & " c_section = parcels.section res += parcels.section + " " res += ", ".join(parcels.parcel_numbers) if parcels.year: res += " ({})".format(parcels.year) return res def long_label(self): items = [str(self.operation) or str(self.associated_file) or ""] items += [str(item) for item in [self.section, self.parcel_number] if item] return settings.JOINT.join(items) def copy_to_file(self): """ Copy from operation to file when associating file to operation """ if not self.operation or not self.operation.associated_file: # not concerned return keys = { "town": self.town, "section": self.section, "parcel_number": self.parcel_number, } if self.operation.associated_file.parcels.filter(**keys).count(): # everything is OK return keys["address"] = self.address keys["year"] = self.year keys["associated_file"] = self.operation.associated_file new_p = Parcel.objects.create(**keys) # also copy owning for owning in self.owners.all(): ParcelOwner.objects.create( owner=owning.owner, parcel=new_p, start_date=owning.start_date, end_date=owning.end_date, ) def copy_to_operation(self): """ Parcel cannot have operation and associated_file but on new parcel association a copy have to be done before cleaning """ if not (self.operation and self.associated_file): # everything is OK return keys = { "town": self.town, "section": self.section, "parcel_number": self.parcel_number, "operation": self.operation, "associated_file": None, "defaults": {"address": self.address, "year": self.year}, } new_p, created = Parcel.objects.get_or_create(**keys) # copy owning only if created if created: for owning in self.owners.all(): ParcelOwner.objects.create( owner=owning.owner, parcel=new_p, start_date=owning.start_date, end_date=owning.end_date, ) self.operation = None self.save() def clean_orphan(self): """ Remove when the parcel is linked to nothing """ if self.operation or self.associated_file: return if self.context_record.count(): # trying to restore a lost parcel self.operation = self.context_record.all()[0].operation self.skip_history_when_saving = True self.save() elif self.id: self.delete() def parcel_post_save(sender, **kwargs): if not kwargs["instance"]: return parcel = kwargs["instance"] cached_label_changed(sender, **kwargs) if ( not getattr(parcel, "_updated_id", None) and not parcel.operation and not parcel.associated_file and parcel.context_record.count() ): # trying to restore a lost parcel parcel.operation = parcel.context_record.all()[0].operation parcel.save() return if parcel.context_record.count(): parcel.context_record.model.cached_label_bulk_update(parcel_id=parcel.id) if ( parcel.operation and parcel.operation.pk and parcel.town not in list(parcel.operation.towns.all()) ): try: # multiple save can cause multiple add with transaction.atomic(): parcel.operation.towns.add(parcel.town) except IntegrityError: pass if ( parcel.associated_file and parcel.associated_file.pk and parcel.town not in list(parcel.associated_file.towns.all()) ): try: # multiple save can cause multiple add with transaction.atomic(): parcel.associated_file.towns.add(parcel.town) except IntegrityError: pass if parcel.operation and parcel.associated_file: # parcels are copied between files and operations parcel.copy_to_operation() post_save.connect(parcel_post_save, sender=Parcel) class ParcelOwner(LightHistorizedItem): uuid = models.UUIDField(default=uuid.uuid4) owner = models.ForeignKey( Person, verbose_name=_("Owner"), related_name="parcel_owner" ) parcel = models.ForeignKey(Parcel, verbose_name=_("Parcel"), related_name="owners") start_date = models.DateField(_("Start date")) end_date = models.DateField(_("End date")) objects = UUIDModelManager() class Meta: verbose_name = _("Parcel owner") verbose_name_plural = _("Parcel owners") indexes = [ GinIndex(fields=["data"]), ] def __str__(self): return "{}{}{}".format(self.owner, settings.JOINT, self.parcel) def natural_key(self): return (self.uuid,) @property def operation(self): return self.parcel.operation @property def associated_file(self): return self.parcel.associated_file class OperationDashboard: def __init__(self): main_dashboard = Dashboard(Operation) self.total_number = main_dashboard.total_number self.filters_keys = [ "recorded", "effective", "active", "field", "documented", "closed", "documented_closed", ] filters = { "recorded": {}, "effective": {"scientist__isnull": False}, "active": {"scientist__isnull": False, "end_date__isnull": True}, "field": {"excavation_end_date__isnull": True}, "documented": {"documents__isnull": False}, "documented_closed": { "documents__isnull": False, "end_date__isnull": False, }, "closed": {"end_date__isnull": False}, } filters_label = { "recorded": _("Recorded"), "effective": _("Effective"), "active": _("Active"), "field": _("Field completed"), "documented": _("Associated report"), "closed": _("Closed"), "documented_closed": _("Documented and closed"), } self.filters_label = [filters_label[k] for k in self.filters_keys] self.total = [] for fltr_key in self.filters_keys: fltr, lbl = filters[fltr_key], filters_label[fltr_key] nb = Operation.objects.filter(**fltr).count() self.total.append((lbl, nb)) self.surface_by_type = ( Operation.objects.values("operation_type__label") .annotate(number=Sum("surface")) .order_by("-number", "operation_type__label") ) self.by_type = [] self.types = OperationType.objects.filter(available=True).all() for fltr_key in self.filters_keys: fltr, lbl = filters[fltr_key], filters_label[fltr_key] type_res = ( Operation.objects.filter(**fltr) .values("operation_type", "operation_type__label") .annotate(number=Count("pk")) .order_by("operation_type") ) types_dct = {} for typ in type_res.all(): types_dct[typ["operation_type"]] = typ["number"] types = [] for typ in self.types: if typ.pk in types_dct: types.append(types_dct[typ.pk]) else: types.append(0) self.by_type.append((lbl, types)) self.by_year = [] self.years = [ res["year"] for res in Operation.objects.values("year").order_by("-year").distinct() ] for fltr_key in self.filters_keys: fltr, lbl = filters[fltr_key], filters_label[fltr_key] year_res = ( Operation.objects.filter(**fltr) .values("year") .annotate(number=Count("pk")) .order_by("year") ) years_dct = {} for yr in year_res.all(): years_dct[yr["year"]] = yr["number"] years = [] for yr in self.years: if yr in years_dct: years.append(years_dct[yr]) else: years.append(0) self.by_year.append((lbl, years)) self.by_realisation_year = [] self.realisation_years = [ res["date"] for res in Operation.objects.extra( {"date": "date_trunc('year', start_date)"} ) .values("date") .filter(start_date__isnull=False) .order_by("-date") .distinct() ] for fltr_key in self.filters_keys: fltr, lbl = filters[fltr_key], filters_label[fltr_key] year_res = ( Operation.objects.filter(**fltr) .extra({"date": "date_trunc('year', start_date)"}) .values("date") .values("date") .filter(start_date__isnull=False) .annotate(number=Count("pk")) .order_by("-date") ) years_dct = {} for yr in year_res.all(): years_dct[yr["date"]] = yr["number"] years = [] for yr in self.realisation_years: if yr in years_dct: years.append(years_dct[yr]) else: years.append(0) self.by_realisation_year.append((lbl, years)) self.effective = [] for typ in self.types: year_res = ( Operation.objects.filter( **{"scientist__isnull": False, "operation_type": typ} ) .values("year") .annotate(number=Count("pk")) .order_by("-year") .distinct() ) years_dct = {} for yr in year_res.all(): years_dct[yr["year"]] = yr["number"] years = [] for yr in self.years: if yr in years_dct: years.append(years_dct[yr]) else: years.append(0) self.effective.append((typ, years)) # TODO: by date now = datetime.date.today() limit = datetime.date(now.year, now.month, 1) - datetime.timedelta(365) by_realisation_month = Operation.objects.filter( start_date__gt=limit, start_date__isnull=False ).extra({"date": "date_trunc('month', start_date)"}) self.last_months = [] date = datetime.datetime(now.year, now.month, 1) for mt_idx in range(12): self.last_months.append(date) if date.month > 1: date = datetime.datetime(date.year, date.month - 1, 1) else: date = datetime.datetime(date.year - 1, 12, 1) self.by_realisation_month = [] for fltr_key in self.filters_keys: fltr, lbl = filters[fltr_key], filters_label[fltr_key] month_res = ( by_realisation_month.filter(**fltr) .annotate(number=Count("pk")) .order_by("-date") ) month_dct = {} for mt in month_res.all(): month_dct[mt.date] = mt.number date = datetime.date(now.year, now.month, 1) months = [] for date in self.last_months: if date in month_dct: months.append(month_dct[date]) else: months.append(0) self.by_realisation_month.append((lbl, months)) # survey and excavations self.survey, self.excavation = {}, {} for dct_res, ope_types in ( (self.survey, ("arch_diagnostic",)), (self.excavation, ("prev_excavation", "prog_excavation")), ): dct_res["total"] = [] operation_type = {"operation_type__txt_idx__in": ope_types} for fltr_key in self.filters_keys: fltr, lbl = filters[fltr_key], filters_label[fltr_key] fltr.update(operation_type) nb = Operation.objects.filter(**fltr).count() dct_res["total"].append((lbl, nb)) dct_res["by_year"] = [] for fltr_key in self.filters_keys: fltr, lbl = filters[fltr_key], filters_label[fltr_key] fltr.update(operation_type) year_res = ( Operation.objects.filter(**fltr) .values("year") .annotate(number=Count("pk")) .order_by("year") ) years_dct = {} for yr in year_res.all(): years_dct[yr["year"]] = yr["number"] years = [] for yr in self.years: if yr in years_dct: years.append(years_dct[yr]) else: years.append(0) dct_res["by_year"].append((lbl, years)) dct_res["by_realisation_year"] = [] for fltr_key in self.filters_keys: fltr, lbl = filters[fltr_key], filters_label[fltr_key] fltr.update(operation_type) year_res = ( Operation.objects.filter(**fltr) .extra({"date": "date_trunc('year', start_date)"}) .values("date") .filter(start_date__isnull=False) .annotate(number=Count("pk")) .order_by("-date") ) years_dct = {} for yr in year_res.all(): years_dct[yr["date"]] = yr["number"] years = [] for yr in self.realisation_years: if yr in years_dct: years.append(years_dct[yr]) else: years.append(0) dct_res["by_realisation_year"].append((lbl, years)) current_year_ope = Operation.objects.filter(**operation_type).filter( year=datetime.date.today().year ) current_realisation_year_ope = Operation.objects.filter( **operation_type ).filter(start_date__year=datetime.date.today().year) res_keys = [("area_realised", current_realisation_year_ope)] if dct_res == self.survey: res_keys.append(("area", current_year_ope)) for res_key, base_ope in res_keys: dct_res[res_key] = [] for fltr_key in self.filters_keys: fltr, lbl = filters[fltr_key], filters_label[fltr_key] area_res = ( base_ope.filter(**fltr).annotate(number=Sum("surface")).all() ) val = 0 if area_res: val = (area_res[0].number or 0) / 10000.0 dct_res[res_key].append(val) # TODO... res_keys = [("manday_realised", current_realisation_year_ope)] if dct_res == self.survey: res_keys.append(("manday", current_year_ope)) for res_key, base_ope in res_keys: dct_res[res_key] = [] for fltr_key in self.filters_keys: dct_res[res_key].append("-") # TODO... res_keys = [("mandayhect_realised", current_realisation_year_ope)] if dct_res == self.survey: res_keys.append(("mandayhect", current_year_ope)) for res_key, base_ope in res_keys: dct_res[res_key] = [] for fltr_key in self.filters_keys: dct_res[res_key].append("-") # TODO... dct_res["mandayhect_real_effective"] = "-" if dct_res == self.survey: dct_res["mandayhect_effective"] = "-" res_keys = [("org_realised", current_realisation_year_ope)] if dct_res == self.survey: res_keys.append(("org", current_year_ope)) for res_key, base_ope in res_keys: org_res = ( base_ope.filter(scientist__attached_to__isnull=False) .values("scientist__attached_to", "scientist__attached_to__name") .annotate(area=Sum("surface")) .order_by("scientist__attached_to__name") .all() ) # TODO: man-days, man-days/hectare dct_res[res_key] = [] for vals in org_res: vals["area"] = (vals["area"] or 0) / 10000.0 dct_res[res_key].append(vals) year_ope = Operation.objects.filter(**operation_type) res_keys = ["org_by_year"] if dct_res == self.survey: res_keys.append("org_by_year_realised") q = ( year_ope.values( "scientist__attached_to", "scientist__attached_to__name" ) .filter(scientist__attached_to__isnull=False) .order_by("scientist__attached_to__name") .distinct() ) org_list = [ (org["scientist__attached_to"], org["scientist__attached_to__name"]) for org in q ] # org_list_dct = dict(org_list) for res_key in res_keys: dct_res[res_key] = [] years = self.years if res_key == "org_by_year_realised": years = self.realisation_years for org_id, org_label in org_list: org_res = year_ope.filter(scientist__attached_to__pk=org_id) key_date = "" if res_key == "org_by_year": org_res = org_res.values("year") key_date = "year" else: org_res = ( org_res.extra({"date": "date_trunc('year', start_date)"}) .values("date") .filter(start_date__isnull=False) ) key_date = "date" org_res = org_res.annotate(area=Sum("surface"), cost=Sum("cost")) years_dct = {} for yr in org_res.all(): area = (yr["area"] if yr["area"] else 0) / 10000.0 cost = yr["cost"] if yr["cost"] else 0 years_dct[yr[key_date]] = (area, cost) r_years = [] for yr in years: if yr in years_dct: r_years.append(years_dct[yr]) else: r_years.append((0, 0)) dct_res[res_key].append((org_label, r_years)) area_means, area_sums = [], [] cost_means, cost_sums = [], [] for idx, year in enumerate(years): vals = [r_yars[idx] for lb, r_yars in dct_res[res_key]] if not vals: continue sum_area = sum([a for a, c in vals]) sum_cost = sum([c for a, c in vals]) area_means.append(sum_area / len(vals)) area_sums.append(sum_area) cost_means.append(sum_cost / len(vals)) cost_sums.append(sum_cost) dct_res[res_key + "_area_mean"] = area_means dct_res[res_key + "_area_sum"] = area_sums dct_res[res_key + "_cost_mean"] = cost_means dct_res[res_key + "_cost_mean"] = cost_sums if dct_res == self.survey: self.survey["effective"] = [] for yr in self.years: year_res = Operation.objects.filter( scientist__isnull=False, year=yr, operation_type__txt_idx__in=ope_types, ).annotate(number=Sum("surface"), mean=Avg("surface")) nb = year_res[0].number if year_res.count() else 0 nb = nb if nb else 0 mean = year_res[0].mean if year_res.count() else 0 mean = mean if mean else 0 self.survey["effective"].append((nb, mean)) # TODO:Man-Days/hectare by Year # CHECK: month of realisation or month? dct_res["by_month"] = [] for fltr_key in self.filters_keys: fltr, lbl = filters[fltr_key], filters_label[fltr_key] fltr.update(operation_type) month_res = ( by_realisation_month.filter(**fltr) .annotate(number=Count("pk")) .order_by("-date") ) month_dct = {} for mt in month_res.all(): month_dct[mt.date] = mt.number date = datetime.date(now.year, now.month, 1) months = [] for date in self.last_months: if date in month_dct: months.append(month_dct[date]) else: months.append(0) dct_res["by_month"].append((lbl, months)) operation_type = {"operation_type__txt_idx__in": ope_types} self.departments = [ (fd["department__pk"], fd["department__label"]) for fd in OperationByDepartment.objects.filter(department__isnull=False) .values("department__label", "department__pk") .order_by("department__label") .distinct() ] dct_res["by_dpt"] = [] for dpt_id, dpt_label in self.departments: vals = ( OperationByDepartment.objects.filter( department__pk=dpt_id, operation__operation_type__txt_idx__in=ope_types, ) .values("department__pk", "operation__year") .annotate(number=Count("operation")) .order_by("operation__year") ) dct_years = {} for v in vals: dct_years[v["operation__year"]] = v["number"] years = [] for y in self.years: if y in dct_years: years.append(dct_years[y]) else: years.append(0) years.append(sum(years)) dct_res["by_dpt"].append((dpt_label, years)) dct_res["effective_by_dpt"] = [] for dpt_id, dpt_label in self.departments: vals = ( OperationByDepartment.objects.filter( department__pk=dpt_id, operation__scientist__isnull=False, operation__operation_type__txt_idx__in=ope_types, ) .values("department__pk", "operation__year") .annotate( number=Count("operation"), area=Sum("operation__surface"), fnap=Sum("operation__fnap_cost"), cost=Sum("operation__cost"), ) .order_by("operation__year") ) dct_years = {} for v in vals: values = [] for k in ("number", "area", "cost", "fnap"): value = v[k] or 0 if k == "area": value /= 10000.0 values.append(value) dct_years[v["operation__year"]] = values years = [] for y in self.years: if y in dct_years: years.append(dct_years[y]) else: years.append((0, 0, 0, 0)) nbs, areas, costs, fnaps = zip(*years) years.append((sum(nbs), sum(areas), sum(costs), sum(fnaps))) dct_res["effective_by_dpt"].append((dpt_label, years)) OperationTown = Operation.towns.through query = ( OperationTown.objects.filter( operation__scientist__isnull=False, operation__operation_type__txt_idx__in=ope_types, ) .values("town__name", "town__departement__number") .annotate(nb=Count("operation")) .order_by("-nb", "town__name")[:10] ) dct_res["towns"] = [] for r in query: dct_res["towns"].append( ( "%s (%s)" % (r["town__name"], r["town__departement__number"]), r["nb"], ) ) if dct_res == self.survey: query = ( OperationTown.objects.filter( operation__scientist__isnull=False, operation__operation_type__txt_idx__in=ope_types, operation__surface__isnull=False, ) .values("town__name", "town__departement__number") .annotate(nb=Sum("operation__surface")) .order_by("-nb", "town__name")[:10] ) dct_res["towns_surface"] = [] for r in query: dct_res["towns_surface"].append( ( "%s (%s)" % (r["town__name"], r["town__departement__number"]), r["nb"], ) ) else: query = ( OperationTown.objects.filter( operation__scientist__isnull=False, operation__operation_type__txt_idx__in=ope_types, operation__cost__isnull=False, ) .values("town__name", "town__departement__number") .annotate(nb=Sum("operation__cost")) .order_by("-nb", "town__name")[:10] ) dct_res["towns_cost"] = [] for r in query: dct_res["towns_cost"].append( ( "%s (%s)" % (r["town__name"], r["town__departement__number"]), r["nb"], ) ) class OperationTypeOld(GeneralType): order = models.IntegerField(_("Order"), default=1) preventive = models.BooleanField(_("Is preventive"), default=True) class Meta: verbose_name = _("Operation type old") verbose_name_plural = _("Operation types old") ordering = ["-preventive", "order", "label"]