#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2012-2017 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import datetime from collections import OrderedDict from django.apps import apps from django.conf import settings from django.contrib.gis.db import models from django.contrib.postgres.indexes import GinIndex from django.core.cache import cache from django.core.validators import MinValueValidator, MaxValueValidator from django.db.models import Q, Count, Sum, Max from django.db.models.signals import post_save, m2m_changed, post_delete from django.urls import reverse from ishtar_common.models_common import OrderedHierarchicalType from ishtar_common.utils import ugettext_lazy as _, pgettext_lazy, get_current_profile, InlineClass from ishtar_common.utils import ( cached_label_changed, get_cache, get_current_year, get_generated_id, m2m_historization_changed, ) from ishtar_common.models import ( Department, GeneralType, GlobalVar, BaseHistorizedItem, Imported, OwnPerms, Person, Organization, Town, Dashboard, DashboardFormItem, HistoricalRecords, ValueGetter, MainItem, OperationType, post_save_cache, Document, HistoryModel, SearchAltName, SearchVectorConfig, DocumentItem, CompleteIdentifierItem, HierarchicalType, ) from archaeological_operations.models import ( get_values_town_related, ClosedItem, ParcelItem, ) class PriceAgreement(GeneralType): order = models.IntegerField(_("Order"), default=10) start_date = models.DateField(_("Start date"), blank=True, null=True) end_date = models.DateField(_("End date"), blank=True, null=True) class Meta: verbose_name = _("Price agreement") verbose_name_plural = _("Price agreement") ordering = ( "order", "start_date", "end_date", "label", ) ADMIN_SECTION = _("Preventive") class Job(GeneralType): price_agreement = models.ForeignKey( PriceAgreement, verbose_name=_("Price agreement"), blank=True, null=True, on_delete=models.CASCADE, related_name="jobs" ) ground_daily_cost = models.FloatField(_("Ground daily cost"), blank=True, null=True) daily_cost = models.FloatField(_("Daily cost"), blank=True, null=True) permanent_contract = models.NullBooleanField( _("Permanent contract"), blank=True, null=True ) default_daily_need_on_ground = models.FloatField( _("Def. daily number on ground"), default=0 ) default_daily_need = models.FloatField(_("Def. daily number on study"), default=0) order = models.IntegerField(_("Order"), default=10) child = models.ForeignKey( "self", blank=True, null=True, verbose_name=_("Child"), help_text=_("Auto-add this job when a parent is added"), related_name="parents", on_delete=models.SET_NULL, ) class Meta: verbose_name = _("Job") verbose_name_plural = _("Jobs") ordering = ( "order", "-permanent_contract", "label", ) ADMIN_SECTION = _("Preventive") def __str__(self): lbl = self.label if not self.permanent_contract: lbl += " ({})".format(_("fixed-term contract")) return lbl @property def has_parents(self): return bool(self.parents.count()) @classmethod def get_choices(cls, current_value, price_agreement_id=None): q = cls.objects.filter( available=True, parents__isnull=True, ) if price_agreement_id: q = q.filter(price_agreement=price_agreement_id) permanent = [(j.pk, str(j)) for j in q.filter(permanent_contract=True).all()] fixed_term = [(j.pk, str(j)) for j in q.filter(permanent_contract=False).all()] if current_value: if current_value.permanent_contract: permanent.append((current_value.pk, str(current_value))) else: fixed_term.append((current_value.pk, str(current_value))) return [("", "-" * 9)] + [ ( _("Permanent contract"), permanent, ), ( _("Fixed-term contract"), fixed_term, ), ] class GenericEquipmentServiceType(GeneralType): order = models.IntegerField(_("Order"), default=10) class Meta: verbose_name = _("Generic equipment type") verbose_name_plural = _("Generic equipment types") ordering = ( "order", "label", ) ADMIN_SECTION = _("Preventive") class EquipmentServiceType(GeneralType): generic_equipment_type = models.ForeignKey( GenericEquipmentServiceType, verbose_name=_("Generic type"), on_delete=models.CASCADE ) order = models.IntegerField(_("Order"), default=10) class Meta: verbose_name = _("Equipment/service type") verbose_name_plural = _("Equipment/service types") ordering = ( "order", "label", ) ADMIN_SECTION = _("Preventive") ES_UNITS = ( ("D", _("days")), ("W", _("weeks")), ("M", _("months")), ("L", _("linear m.")), ) DCT_ES_UNITS = dict(ES_UNITS) ES_UNITS_DAYS = { "D": 1, "W": 7, "M": 30, "L": 0 } class EquipmentServiceCost(models.Model): price_agreement = models.ForeignKey( PriceAgreement, verbose_name=_("Price agreement"), blank=True, null=True, on_delete=models.CASCADE, related_name="equipment_service_costs" ) equipment_service_type = models.ForeignKey( EquipmentServiceType, verbose_name=_("Equipment/Service"), on_delete=models.CASCADE ) slug = models.SlugField( _("Textual ID"), unique=True, max_length=300, help_text=_( "The slug is the standardized version of the name. It contains " "only lowercase letters, numbers and hyphens. Each slug must " "be unique." ), ) service_provider = models.CharField( _("Service provider"), max_length=200, blank=True, default="" ) flat_rate = models.BooleanField(_("Flat rate"), default=False) unitary_cost = models.FloatField(_("Unitary cost"), blank=True, null=True) unit = models.CharField( _("Unit"), max_length=1, choices=ES_UNITS, blank=True, null=True ) specificity = models.CharField( _("Specificity"), blank=True, max_length=200, default="" ) default_quantity_by_day = models.IntegerField( _("Default quantity by day"), default=0) order = models.IntegerField(_("Order"), default=10) available = models.BooleanField(_("Available"), default=True) parent = models.ForeignKey( EquipmentServiceType, blank=True, null=True, on_delete=models.CASCADE, verbose_name=_("Parent"), help_text=_("Auto-add this cost when a parent is added"), related_name="children", ) class Meta: verbose_name = _("Equipment/service cost") verbose_name_plural = _("Equipment/service costs") ordering = ( "order", "equipment_service_type__label", ) ADMIN_SECTION = _("Preventive") @classmethod def get_documentation_string(cls): """ Used for automatic documentation generation """ s = [] exclude = ["id", "order", "availabel"] for field in cls._meta.get_fields(): if field.name in exclude: continue if hasattr(field, "verbose_name"): s.append(f"**{field.name}** {field.verbose_name}") s = ", ".join(s) return s def __str__(self): lbl = "" if self.parent: lbl = self.parent.label + " - " lbl += str(self.equipment_service_type) if self.specificity: lbl += " - " + self.specificity if self.service_provider: lbl += f" ({self.service_provider})" if self.flat_rate: lbl += " - " + str(_("Flat rate")) if self.unit and self.unit in DCT_ES_UNITS: lbl += " - " + str(DCT_ES_UNITS[self.unit]) return lbl def natural_key(self): return (self.slug,) def history_compress(self): return self.slug @property def unit_label(self): if self.unit and self.unit in DCT_ES_UNITS: return DCT_ES_UNITS[self.unit] return "" class FileType(GeneralType): class Meta: verbose_name = _("Archaeological file type") verbose_name_plural = _("Archaeological file types") ordering = ("label",) @classmethod def is_preventive(cls, file_type_id, key="", force=False): key = key or "preventive" try: preventive = FileType.get_cache(key, force=force).pk return file_type_id == preventive except (FileType.DoesNotExist, AttributeError): return False post_save.connect(post_save_cache, sender=FileType) post_delete.connect(post_save_cache, sender=FileType) class PermitType(GeneralType): class Meta: verbose_name = _("Permit type") verbose_name_plural = _("Permit types") ordering = ("label",) post_save.connect(post_save_cache, sender=PermitType) post_delete.connect(post_save_cache, sender=PermitType) if settings.COUNTRY == "fr": class SaisineType(GeneralType, ValueGetter): delay = models.IntegerField(_("Delay (in days)"), default=30) class Meta: verbose_name = "Type de saisine" verbose_name_plural = "Types de saisine" ordering = ("label",) post_save.connect(post_save_cache, sender=SaisineType) post_delete.connect(post_save_cache, sender=SaisineType) class AgreementType(GeneralType): class Meta: verbose_name = _("Agreement type - France") verbose_name_plural = _("Agreement types - France") ordering = ("label",) ADMIN_SECTION = _("Preventive") post_save.connect(post_save_cache, sender=AgreementType) post_delete.connect(post_save_cache, sender=AgreementType) class MonitoringJustificationType(OrderedHierarchicalType): class Meta: verbose_name = _("Monitoring justification type") verbose_name_plural = _("Monitoring justification types") ordering = ("label",) ADMIN_SECTION = _("Preventive") post_save.connect(post_save_cache, sender=MonitoringJustificationType) post_delete.connect(post_save_cache, sender=MonitoringJustificationType) class DevelopmentType(OrderedHierarchicalType): class Meta: verbose_name = _("Development type") verbose_name_plural = _("Development types") ordering = ("label",) ADMIN_SECTION = _("Preventive") post_save.connect(post_save_cache, sender=DevelopmentType) post_delete.connect(post_save_cache, sender=DevelopmentType) class OperationTypeForRoyalties(GeneralType): increase_coefficient = models.FloatField( _("Increase coefficient"), default=1) increased_final_value = models.FloatField( _("Increased final value"), default=1) category = models.PositiveSmallIntegerField( _("Category"), default=1, validators=[MinValueValidator(1), MaxValueValidator(5)]) class Meta: verbose_name = _("Operation type for royalties - France") verbose_name_plural = _("Operation types for royalties - France") ordering = ("id",) ADMIN_SECTION = _("Preventive") post_save.connect(post_save_cache, sender=OperationTypeForRoyalties) post_delete.connect(post_save_cache, sender=OperationTypeForRoyalties) class File( ClosedItem, DocumentItem, BaseHistorizedItem, CompleteIdentifierItem, OwnPerms, ValueGetter, MainItem, DashboardFormItem, ParcelItem, ): SLUG = "file" SHOW_URL = "show-file" DELETE_URL = "delete-file" APP = "archaeological-files" MODEL = "file" TABLE_COLS = [ "numeric_reference", "year", "internal_reference", "file_type", "saisine_type", "towns_label", ] # statistics STATISTIC_MODALITIES_OPTIONS = OrderedDict( [ ("year", _("Year")), ("file_type__label", _("File type")), ("towns__areas__label", _("Area")), ("towns__areas__parent__label", _("Extended area")), ("saisine_type__label", "Type de saisine"), ("permit_type__label", _("Permit type")), ("requested_operation_type__label", _("File type")), ] ) STATISTIC_MODALITIES = [key for key, lbl in STATISTIC_MODALITIES_OPTIONS.items()] STATISTIC_SUM_VARIABLE = OrderedDict( ( ("pk", (_("Number"), 1)), ("total_surface", (_("Total surface (km²)"), 0.000001)), ("total_developed_surface", (_("Total developed surface (km²)"), 0.000001)), ) ) GET_VALUES_M2M = [ "towns", "departments", ] # search parameters BOOL_FIELDS = BaseHistorizedItem.BOOL_FIELDS + ["end_date__isnull"] NUMBER_FIELDS = ["year", "numeric_reference"] EXTRA_REQUEST_KEYS = { "parcel_0": ("parcels__section", "operations__parcels__section"), "parcel_1": ("parcels__parcel_number", "operations__parcels__parcel_number"), "parcel_2": ("operations__parcels__public_domain", "parcels__public_domain"), "end_date": "end_date__isnull", "towns__numero_insee__startswith": "towns__numero_insee__startswith", "name": "name__icontains", "cached_label": "cached_label__icontains", "comment": "comment__icontains", "permit_reference": "permit_reference__icontains", "general_contractor__attached_to": "general_contractor__attached_to__pk", "history_creator": "history_creator__ishtaruser__person__pk", "history_modifier": "history_modifier__ishtaruser__person__pk", "towns_label": "towns", "general_contractor__pk": "general_contractor__pk", "responsible_town_planning_service__pk": "responsible_town_planning_service__pk", "in_charge__pk": "in_charge__pk", } BASE_SEARCH_VECTORS = [ SearchVectorConfig("name"), SearchVectorConfig("internal_reference", "raw"), SearchVectorConfig("file_type__label"), SearchVectorConfig("saisine_type__label"), SearchVectorConfig("permit_type__label"), SearchVectorConfig("permit_reference", "raw"), SearchVectorConfig("comment", "local"), SearchVectorConfig("research_comment", "local"), ] PROPERTY_SEARCH_VECTORS = [ SearchVectorConfig("year_ref", "raw"), ] INT_SEARCH_VECTORS = [ SearchVectorConfig("numeric_reference"), SearchVectorConfig("year"), ] M2M_SEARCH_VECTORS = [SearchVectorConfig("towns__name")] PARENT_SEARCH_VECTORS = [ "in_charge", "general_contractor", "corporation_general_contractor", "responsible_town_planning_service", "planning_service", "organization", "scientist", ] COL_LABELS = { "towns_label": _("Towns"), } REVERSED_BOOL_FIELDS = [ "documents__image__isnull", "documents__associated_file__isnull", "documents__associated_url__isnull", ] # alternative names of fields for searches ALT_NAMES = { "year": SearchAltName(pgettext_lazy("key for text search", "year"), "year"), "numeric_reference": SearchAltName( pgettext_lazy("key for text search", "reference"), "numeric_reference" ), "internal_reference": SearchAltName( pgettext_lazy("key for text search", "other-reference"), "internal_reference__iexact", ), "towns": SearchAltName( pgettext_lazy("key for text search", "town"), "towns__cached_label__iexact" ), "parcel": SearchAltName( pgettext_lazy("key for text search", "parcel"), "parcels__cached_label__iexact", related_name="parcels" ), "towns__numero_insee__startswith": SearchAltName( pgettext_lazy("key for text search", "department"), "towns__numero_insee__startswith", ), "name": SearchAltName( pgettext_lazy("key for text search", "name"), "name__iexact" ), "operation_name": SearchAltName( pgettext_lazy("key for text search", "operation-name"), "operation_name__iexact" ), "file_type": SearchAltName( pgettext_lazy("key for text search", "type"), "file_type__label__iexact" ), "end_date": SearchAltName( pgettext_lazy("key for text search", "active"), "end_date__isnull" ), "saisine_type": SearchAltName( pgettext_lazy("key for text search", "saisine-type"), "saisine_type__label__iexact", related_name="saisine_type" ), "development_type": SearchAltName( pgettext_lazy("key for text search", "development-type"), "development_type__label__iexact", ), "monitoring_justification": SearchAltName( pgettext_lazy("key for text search", "monitoring-justification"), "monitoring_justification__label__iexact", ), "permit_type": SearchAltName( pgettext_lazy("key for text search", "permit-type"), "permit_type__label__iexact", related_name="permit_type" ), "permit_reference": SearchAltName( pgettext_lazy("key for text search", "permit-reference"), "permit_reference__iexact", ), "comment": SearchAltName( pgettext_lazy("key for text search", "comment"), "comment__iexact" ), "in_charge": SearchAltName( pgettext_lazy("key for text search", "in-charge"), "in_charge__cached_label__iexact", ), "general_contractor": SearchAltName( pgettext_lazy("key for text search", "general-contractor"), "general_contractor__cached_label__iexact", related_name="general_contractor" ), "general_contractor__attached_to": SearchAltName( pgettext_lazy("key for text search", "general-contractor-organization"), "general_contractor__attached_to__cached_label__iexact", related_name="general_contractor__attached_to" ), } ALT_NAMES.update(BaseHistorizedItem.ALT_NAMES) ALT_NAMES.update(DocumentItem.ALT_NAMES) ALT_NAMES.update(Imported.ALT_NAMES) POST_PROCESS_REQUEST = { "towns__numero_insee__startswith": "_get_department_code", } HISTORICAL_M2M = ["towns", "departments"] SERIALIZE_PROPERTIES = ["external_id"] # fields year = models.IntegerField(_("Year"), default=get_current_year) numeric_reference = models.IntegerField( _("Numeric reference"), blank=True, null=True ) internal_reference = models.CharField( _("Internal reference"), blank=True, null=True, max_length=60 ) external_id = models.CharField( _("External ID"), blank=True, null=True, max_length=120 ) auto_external_id = models.BooleanField( _("External ID is set automatically"), default=False ) name = models.TextField(_("Name"), blank=True, default="") operation_name = models.TextField(_("Operation name"), blank=True, default="") file_type = models.ForeignKey(FileType, verbose_name=_("File type"), on_delete=models.PROTECT) in_charge = models.ForeignKey( Person, related_name="file_responsability", verbose_name=_("Person in charge"), on_delete=models.SET_NULL, blank=True, null=True, ) general_contractor = models.ForeignKey( Person, related_name="general_contractor_files", verbose_name=_("General contractor"), blank=True, null=True, on_delete=models.SET_NULL, ) # aménageur - personne raw_general_contractor = models.CharField( _("General contractor (raw)"), max_length=200, blank=True, null=True ) corporation_general_contractor = models.ForeignKey( Organization, related_name="general_contractor_files", verbose_name=_("General contractor organization"), blank=True, null=True, on_delete=models.SET_NULL, ) # aménageur responsible_town_planning_service = models.ForeignKey( Person, related_name="responsible_town_planning_service_files", blank=True, null=True, verbose_name=_("Responsible for planning service"), on_delete=models.SET_NULL, ) # service instructeur - personne raw_town_planning_service = models.CharField( _("Planning service (raw)"), max_length=200, blank=True, null=True ) planning_service = models.ForeignKey( Organization, related_name="planning_service_files", blank=True, null=True, verbose_name=_("Planning service organization"), on_delete=models.SET_NULL, ) # service instructeur permit_type = models.ForeignKey( PermitType, verbose_name=_("Permit type"), blank=True, null=True, on_delete=models.SET_NULL, ) permit_reference = models.TextField(_("Permit/order reference"), blank=True, default="") end_date = models.DateField(_("Closing date"), null=True, blank=True) main_town = models.ForeignKey( Town, verbose_name=_("Main town"), null=True, blank=True, related_name="file_main", on_delete=models.SET_NULL, ) towns = models.ManyToManyField( Town, verbose_name=_("Towns"), related_name="file", blank=True ) creation_date = models.DateField( _("Creation date"), default=datetime.date.today, blank=True, null=True ) reception_date = models.DateField(_("Reception date"), blank=True, null=True) planning_service_date = models.DateField( _("Date of planning service file"), null=True, blank=True ) related_file = models.ForeignKey( "File", verbose_name=_("Related file"), blank=True, null=True, on_delete=models.SET_NULL, ) if settings.COUNTRY == "fr": saisine_type = models.ForeignKey( SaisineType, blank=True, null=True, on_delete=models.SET_NULL, verbose_name="Type de saisine", ) instruction_deadline = models.DateField( _("Instruction deadline"), blank=True, null=True ) total_surface = models.FloatField(_("Total surface (m²)"), blank=True, null=True) total_developed_surface = models.FloatField( _("Total developed surface (m²)"), blank=True, null=True ) locality = models.CharField(_("Locality"), max_length=100, null=True, blank=True) address = models.TextField(_("Main address"), blank=True, default="") postal_code = models.CharField( _("Main address - postal code"), max_length=10, null=True, blank=True ) comment = models.TextField(_("Comment"), blank=True, default="") # research archaeology --> departments = models.ManyToManyField( Department, verbose_name=_("Departments"), blank=True ) requested_operation_type = models.ForeignKey( OperationType, related_name="+", on_delete=models.SET_NULL, null=True, blank=True, verbose_name=_("Requested operation type"), ) organization = models.ForeignKey( Organization, blank=True, null=True, verbose_name=_("Organization"), related_name="files", on_delete=models.SET_NULL, ) scientist = models.ForeignKey( Person, blank=True, null=True, related_name="scientist", on_delete=models.SET_NULL, verbose_name=_("Scientist in charge"), ) research_comment = models.TextField( _("Research archaeology comment"), blank=True, default="" ) classified_area = models.NullBooleanField( _("Classified area"), blank=True, null=True ) protected_area = models.NullBooleanField(_("Protected area"), blank=True, null=True) if settings.COUNTRY == "fr": cira_advised = models.NullBooleanField("Passage en CIRA", blank=True, null=True) mh_register = models.NullBooleanField( "Sur Monument Historique classé", blank=True, null=True ) mh_listing = models.NullBooleanField( "Sur Monument Historique inscrit", blank=True, null=True ) # <-- research archaeology # --> preventive detail development_type = models.ForeignKey( DevelopmentType, verbose_name=_("Development type"), blank=True, null=True, on_delete=models.SET_NULL ) monitoring_justification = models.ForeignKey( MonitoringJustificationType, verbose_name=_("Monitoring justification"), blank=True, null=True, on_delete=models.SET_NULL ) price_agreement = models.ForeignKey( PriceAgreement, verbose_name=_("Price agreement"), blank=True, null=True, on_delete=models.SET_NULL ) intervention_period = models.CharField( _("Intervention period"), max_length=200, default="", blank=True ) study_period = models.CharField( _("Study period"), max_length=200, default="", blank=True ) report_due_period = models.CharField( _("Report due period"), max_length=200, default="", blank=True ) start_date = models.DateField(_("Start date"), blank=True, null=True) end_date = models.DateField(_("End date"), blank=True, null=True) ground_start_date = models.DateField(_("Ground start date"), blank=True, null=True) ground_end_date = models.DateField(_("Ground end date"), blank=True, null=True) execution_report_date = models.DateField( _("Execution report date"), blank=True, null=True ) linear_meter = models.IntegerField(_("Linear meter"), blank=True, null=True) type_of_agreement = models.ForeignKey( AgreementType, blank=True, null=True, on_delete=models.SET_NULL, verbose_name=_("Type of agreement"), ) operation_type_for_royalties = models.ForeignKey( OperationTypeForRoyalties, blank=True, null=True, on_delete=models.SET_NULL, verbose_name=_("Operation type for royalties"), ) # <-- preventive detail documents = models.ManyToManyField( Document, related_name="files", verbose_name=_("Documents"), blank=True ) main_image = models.ForeignKey( Document, related_name="main_image_files", on_delete=models.SET_NULL, verbose_name=_("Main image"), blank=True, null=True, ) imported_line = models.TextField(_("Imported line"), blank=True, default="") history = HistoricalRecords(bases=[HistoryModel]) GET_VALUES_EXTRA = ValueGetter.GET_VALUES_EXTRA + [ "general_contractor_address_1", "general_contractor_address_2", "general_contractor_address_3", "get_locality", "excavation_working_time_planned", "excavation_working_time_worked", "study_working_time_planned", "study_working_time_worked", "excavation_average_team_planned", "excavation_average_team_worked", "study_average_team_planned", "study_average_team_worked", ] class Meta: verbose_name = _("Archaeological file") verbose_name_plural = _("Archaeological files") permissions = ( ("view_own_file", "Can view own Archaeological file"), ("add_own_file", "Can add own Archaeological file"), ("change_own_file", "Can change own Archaeological file"), ("delete_own_file", "Can delete own Archaeological file"), ("close_file", "Can close File"), ) ordering = ("cached_label",) indexes = [ GinIndex(fields=["data"]), ] @classmethod def _get_department_code(cls, value): if not settings.ISHTAR_DPTS: return "" for k, v in settings.ISHTAR_DPTS: if v.lower() == value: return k return "" def _get_base_image_path(self): return "{}/{}".format(self.SLUG, self.reference or self.short_label) @property def short_class_name(self): return _("FILE") @property def full_internal_ref(self): return "{}{}".format(settings.ISHTAR_FILE_PREFIX or "", self.external_id or "") @property def year_ref(self): return f"{self.year}-{self.numeric_reference or 0}" @property def delay_date(self): cache_key, val = get_cache(self.__class__, [self.pk, "delay_date"]) if val: return val return self.update_delay_date(cache_key) def update_delay_date(self, cache_key=None): if not cache_key: cache_key, val = get_cache(self.__class__, [self.pk, "delay_date"]) date = self.reception_date if not date: date = datetime.date(2500, 1, 1) elif settings.COUNTRY == "fr" and self.saisine_type and self.saisine_type.delay: if isinstance(date, str): try: date = datetime.datetime.strptime(date, "%Y-%m-%d").date() except ValueError: date = datetime.date(2500, 1, 1) date += datetime.timedelta(days=self.saisine_type.delay) cache.set(cache_key, date, settings.CACHE_TIMEOUT) return date @property def has_adminact(self): cache_key, val = get_cache(self.__class__, [self.pk, "has_adminact"]) if val: return val return self.update_has_admin_act(cache_key) @property def get_locality(self): return " - ".join( [getattr(self, k) for k in ("locality", "address") if getattr(self, k)] ) @property def general_contractor_address_1(self): address = "" if self.general_contractor: if self.general_contractor.name: address = " ".join( [ str(getattr(self.general_contractor, key)) for key in ("title", "surname", "name") if getattr(self.general_contractor, key) ] ) elif self.general_contractor.raw_name: address = self.general_contractor.raw_name if ( not address and self.corporation_general_contractor and self.corporation_general_contractor.name ): address = self.corporation_general_contractor.name return address @property def general_contractor_address_2(self): address = "" if self.general_contractor and self.general_contractor.address: address = self.general_contractor.address if self.general_contractor.address_complement: address += " " + self.general_contractor.address_complement if ( not address and self.corporation_general_contractor and self.corporation_general_contractor.address ): address = self.corporation_general_contractor.address if self.corporation_general_contractor.address_complement: address += " " + self.corporation_general_contractor.address_complement return address @property def general_contractor_address_3(self): address = "" if self.general_contractor and self.general_contractor.postal_code: address = " ".join( [ getattr(self.general_contractor, key) for key in ("postal_code", "town") if getattr(self.general_contractor, key) ] ) if ( not address and self.corporation_general_contractor and self.corporation_general_contractor.address ): address = " ".join( [ getattr(self.corporation_general_contractor, key) for key in ("postal_code", "town") if getattr(self.corporation_general_contractor, key) ] ) return address @classmethod def similar_files(cls, parcels): # get similar parcels similar = set() for parcel in parcels: q = cls.objects.filter( parcels__town__pk=parcel["town"], parcels__section=parcel["section"], parcels__parcel_number=parcel["parcel_number"], ) if q.count(): for fle in q.all(): similar.add(fle) return similar def update_has_admin_act(self, cache_key=None): if not cache_key: cache_key, val = get_cache(self.__class__, [self.pk, "has_adminact"]) has_adminact = ( self.administrative_act.exclude(act_type__txt_idx="a_receipt").count() or self.operations.count() ) cache.set(cache_key, has_adminact, settings.CACHE_TIMEOUT) return has_adminact @classmethod def get_short_menu_class(cls, pk): cache_key, val = get_cache(cls, [pk, "short_class_name"]) if val: return val q = cls.objects.filter(pk=pk) if not q.count(): return "" item = q.all()[0] return item.update_short_menu_class(cache_key) def update_short_menu_class(self, cache_key=None): if not cache_key: cache_key, val = get_cache(self.__class__, [self.pk, "short_class_name"]) cls = "normal" if not self.file_type.txt_idx == "preventive": cls = "blue" elif not self.has_adminact and self.reception_date: reception_date = self.reception_date if isinstance(self.reception_date, str): try: reception_date = datetime.datetime.strptime(self.reception_date, "%Y-%m-%d").date() except ValueError: reception_date = None if reception_date: delta = datetime.date.today() - reception_date cls = "red" if self.saisine_type and self.saisine_type.delay: if delta.days <= (self.saisine_type.delay * 2 / 3): cls = "green" elif delta.days <= self.saisine_type.delay: cls = "orange" cache.set(cache_key, cls, settings.CACHE_TIMEOUT) return cls @classmethod def get_owns( cls, user, menu_filtr=None, limit=None, values=None, get_short_menu_class=False ): owns = super(File, cls).get_owns( user, limit=limit, values=values, get_short_menu_class=get_short_menu_class ) return cls._return_get_owns(owns, values, get_short_menu_class) def get_dynamic_values(self, prefix, values, filtr=None): q = GenericEquipmentServiceType.objects.filter(available=True) equipment_costs = [] for equipment_service in q.all(): pk = equipment_service.pk #if not filtr or not any( # key for f in filtr if f.startswith(prefix + key) #): q = self.equipment_costs.filter( equipment_service_cost__equipment_service_type__generic_equipment_type_id=pk, equipment_service_cost__unit__isnull=False, ).exclude( equipment_service_cost__unit__in=("L", ""), # exclude linear meter ) slug = equipment_service.txt_idx.replace("-", "_") key_time_planned = slug + "_equipment_time_planned" values[prefix + key_time_planned] = 0 key_time_worked = slug + "_equipment_time_worked" values[prefix + key_time_worked] = 0 key_qt_planned = slug + "_equipment_sum_planned" values[prefix + key_qt_planned] = 0 key_qt_worked = slug + "_equipment_sum_worked" values[prefix + key_qt_worked] = 0 key_costs = prefix + slug + "_equipment_costs" key_costs_dct = {} for cost in q.all(): quantity_by_day_planned = (cost.quantity_by_day_planned or 0) values[prefix + key_qt_planned] += quantity_by_day_planned quantity_by_day_worked = (cost.quantity_by_day_worked or 0) values[prefix + key_qt_worked] += quantity_by_day_worked value = ES_UNITS_DAYS[cost.equipment_service_cost.unit] * ( cost.days_planned or 0) if value > values[prefix + key_time_planned]: values[prefix + key_time_planned] = value cost_name = cost.equipment_service_cost.equipment_service_type.label if cost_name not in key_costs_dct: key_costs_dct[cost_name] = { "quantity_by_day_planned": 0, "days_planned": 0, "cost_planned": 0, "quantity_by_day_worked": 0, "days_worked": 0, "cost_worked": 0 } key_costs_dct[cost_name]["quantity_by_day_planned"] += (quantity_by_day_planned or 0) key_costs_dct[cost_name]["days_planned"] += (cost.days_planned or 0) key_costs_dct[cost_name]["cost_planned"] += value value = ES_UNITS_DAYS[cost.equipment_service_cost.unit] * ( cost.days_worked or 0) if value > values[prefix + key_time_planned]: values[prefix + key_time_worked] = value key_costs_dct[cost_name]["quantity_by_day_worked"] += (quantity_by_day_worked or 0) key_costs_dct[cost_name]["days_worked"] += (cost.days_worked or 0) key_costs_dct[cost_name]["cost_worked"] += value values[key_costs] = [] for cost_name in key_costs_dct: cost_detail = {"name": cost_name} cost_detail.update(key_costs_dct[cost_name]) values[key_costs].append(InlineClass(cost_detail)) if values[key_costs]: equipment_costs += values[key_costs][:] values["equipment_costs"] = equipment_costs return values @property def montant_subvention_dotation(self): """ France specific value used by templates """ if not self.operation_type_for_royalties or not self.total_surface: return return (self.total_surface or 0) * ( self.operation_type_for_royalties.increased_final_value or 0) @property def jobs_list_permanent(self): return self.jobs_list() @property def jobs_list_non_permanent(self): return self.jobs_list(permanent=False) def jobs_list(self, permanent=True): jobs = {} total_ground_cost_planned = 0 total_ground_cost_worked = 0 max_ground_day_planned = 0 for job in self.ground_jobs.filter(job__permanent_contract=permanent).all(): name = job.job.label has_parents = job.job.has_parents max_ground_day_planned = max(max_ground_day_planned, job.days_planned or 0) ground_cost_total = ( (job.man_by_day_planned or 0) * (job.days_planned or 0) * (job.cost_planned or 0) ) total_ground_cost_planned += ground_cost_total ground_cost_total_worked = ( (job.man_by_day_worked or 0) * (job.days_worked or 0) * (job.cost_worked or 0) ) total_ground_cost_worked += ground_cost_total_worked jobs[name] = { "ground_man_by_day_planned": job.man_by_day_planned, "ground_days_planned": job.days_planned, "ground_cost_planned": job.cost_planned, "ground_cost_total_planned": ground_cost_total or "", "ground_man_by_day_worked": job.man_by_day_worked, "ground_days_worked": job.days_worked, "ground_cost_worked": job.cost_worked, "ground_cost_total_worked": ground_cost_total_worked or "", "has_parents": has_parents, "man_by_day_planned": "", "days_planned": "", "cost_planned": "", "cost_total_planned": "", "man_by_day_worked": "", "days_worked": "", "cost_worked": "", "cost_total_worked": "", } total_cost_planned = 0 total_cost_worked = 0 max_day_worked = 0 for job in self.jobs.filter(job__permanent_contract=permanent).all(): name = job.job.label max_day_worked = max(max_day_worked, job.days_worked or 0) if name not in jobs: has_parents = job.job.has_parents jobs[name] = {"has_parents": has_parents} cost_total_planned = ( (job.man_by_day_planned or 0) * (job.days_planned or 0) * (job.cost_planned or 0) ) total_cost_planned += cost_total_planned cost_total_worked = ( (job.man_by_day_worked or 0) * (job.days_worked or 0) * (job.cost_worked or 0) ) total_cost_worked += cost_total_worked jobs[name].update({ "man_by_day_planned": job.man_by_day_planned, "days_planned": job.days_planned, "cost_planned": job.cost_planned, "cost_total_planned": cost_total_planned or "", "man_by_day_worked": job.man_by_day_worked, "days_worked": job.days_worked, "cost_worked": job.cost_worked, "cost_total_worked": cost_total_worked or "", }) result = [] for k in jobs: dct = jobs[k] dct["name"] = k result.append(InlineClass(dct)) dct = { "jobs_list": result, "ground_cost_planned": total_ground_cost_planned, "ground_cost_worked": total_ground_cost_worked, "cost_planned": total_cost_planned, "cost_worked": total_cost_worked, "max_day_worked": max_day_worked, "max_ground_day_worked": max_ground_day_planned, } return dct def get_extra_values(self, prefix="", no_values=False, filtr=None, **kwargs): values = get_values_town_related(self, prefix, {}, filtr=filtr) values = self.get_dynamic_values(prefix, values, filtr=filtr) values["montant_subvention_dotation"] = self.montant_subvention_dotation dct = self.jobs_list_permanent values["jobs_list"] = dct["jobs_list"][:] values["jobs_list_permanent"] = dct.pop("jobs_list") values["max_day_worked"] = dct.pop("max_day_worked") values["max_ground_day_worked"] = dct.pop("max_ground_day_worked") for k in dct.keys(): values["permanent_" + k] = dct[k] dct = self.jobs_list_non_permanent values["jobs_list"] += dct["jobs_list"][:] values["jobs_list_non_permanent"] = dct.pop("jobs_list") values["max_day_worked"] = max(values["max_day_worked"], dct.pop("max_day_worked")) values["max_ground_day_worked"] = max(values["max_ground_day_worked"], dct.pop("max_ground_day_worked")) for k in dct.keys(): values["non_permanent_" + k] = dct[k] values["used_equipments"] = self.used_equipments values["equipments_cost_planned"] = sum([cost for _1, cost, _2, _3, _4 in self.used_equipments]) values["equipments_cost_worked"] = sum([cost for _1, _2, cost, _3, _4 in self.used_equipments]) values["job_cost"] = 0 for k in ["non_permanent_ground_cost_planned", "non_permanent_cost_planned", "permanent_ground_cost_planned", "permanent_cost_planned"]: values["job_cost"] += values.get(k, 0) values["job_permanent_cost"] = 0 for k in ["permanent_ground_cost_planned", "permanent_cost_planned"]: values["job_permanent_cost"] += values.get(k, 0) values["job_non_permanent_cost"] = 0 for k in ["non_permanent_ground_cost_planned", "non_permanent_cost_planned"]: values["job_non_permanent_cost"] += values.get(k, 0) return values def get_values(self, prefix="", no_values=False, filtr=None, **kwargs): if "redevance" in filtr: filtr.append("total_surface") if "equipments_cost_planned_tva" in filtr: filtr.append("equipments_cost_planned") values = super().get_values( prefix=prefix, no_values=no_values, filtr=filtr, **kwargs) q = GlobalVar.objects.filter(slug="taux_rap") if q.count(): try: values["redevance"] = float(q.all()[0].value) * self.total_surface except ValueError: pass q = GlobalVar.objects.filter(slug="taux_tva") if q.count(): try: values["equipments_cost_planned_tva"] = float(q.all()[0].value) * float(values.get("equipments_cost_planned", 0)) except ValueError: pass return values def render_parcels(self): Parcel = apps.get_model("archaeological_operations", "Parcel") return Parcel.render_parcels(list(self.parcels.all())) def __str__(self): return self.cached_label or "" @property def short_label(self): return settings.JOINT.join(str(self).split(settings.JOINT)[1:]) @property def reference(self): return self.external_id or "" def _generate_cached_label(self): label = self._profile_generate_cached_label() if label: return label items = [self.get_town_label(), self.reference] items += [ str(getattr(self, k)) for k in ["internal_reference", "name"] if getattr(self, k) ] return settings.JOINT.join(items) def grouped_parcels(self): from archaeological_operations.models import Parcel return Parcel.grouped_parcels(list(self.parcels.all())) def get_town_label(self): lbl = str(_("Multi-town")) if self.main_town: lbl = self.main_town.name elif self.towns.count() == 1: lbl = self.towns.all()[0].name elif self.towns.count() == 0: lbl = str(_("No town")) return lbl def get_department(self): if not self.towns.count(): return "00" return self.towns.all()[0].numero_insee[:2] @classmethod def _get_query_owns_dicts(cls, ishtaruser): profile = ishtaruser.current_profile town_ids = [] if profile: town_ids = [town["pk"] for town in profile.query_towns.values("pk").all()] return [ { "in_charge": ishtaruser.person, "history_creator": ishtaruser.user_ptr, "towns__pk__in": town_ids, }, {"end_date__isnull": True}, ] @classmethod def get_query_owns(cls, ishtaruser): return cls._construct_query_own("", cls._get_query_owns_dicts(ishtaruser)) def is_active(self): return not bool(self.end_date) @property def town_list(self): return ", ".join([str(tw) for tw in self.towns.all()]) def total_surface_ha(self): if self.total_surface: return round(self.total_surface / 10000.0, 5) def total_developed_surface_ha(self): if self.total_developed_surface: return round(self.total_developed_surface / 10000.0, 5) def operation_acts(self): acts = [] for ope in self.operations.all(): for act in ope.administrative_act.all(): acts.append(act) return acts def update_raw_town_planning_service(self): if ( self.raw_town_planning_service and not self.responsible_town_planning_service ) or not self.responsible_town_planning_service: return False current_lbl = "" if self.raw_town_planning_service: current_lbl = self.raw_town_planning_service[:] lbl = str(self.responsible_town_planning_service) if not lbl: return False self.raw_town_planning_service = lbl[:200] return current_lbl != self.raw_town_planning_service def update_planning_service(self): if ( not self.responsible_town_planning_service or not self.responsible_town_planning_service.attached_to or self.planning_service ): return False self.planning_service = self.responsible_town_planning_service.attached_to return True def update_resp_planning_service(self): if ( not self.responsible_town_planning_service or self.responsible_town_planning_service.attached_to or not self.planning_service ): return False self.responsible_town_planning_service.attached_to = self.planning_service self.responsible_town_planning_service.save() return True def update_raw_general_contractor(self): if ( self.raw_general_contractor and not self.general_contractor ) or not self.general_contractor: return False current_lbl = "" if self.raw_general_contractor: current_lbl = self.raw_general_contractor[:] lbl = str(self.general_contractor) if not lbl: return False self.raw_general_contractor = lbl[:200] return current_lbl != self.raw_general_contractor def update_corpo_general_contractor(self): if ( not self.general_contractor or self.general_contractor.attached_to == self.corporation_general_contractor ): return False if self.general_contractor.attached_to: self.corporation_general_contractor = self.general_contractor.attached_to else: self.general_contractor.attached_to = self.corporation_general_contractor self.general_contractor.save() return True @property def excavation_working_time_planned(self): return self.ground_jobs.all().aggregate(Max("days_planned"))["days_planned__max"] or 0 @property def study_working_time_planned(self): return self.jobs.all().aggregate(Max("days_planned"))["days_planned__max"] or 0 @property def excavation_working_time_worked(self): return self.ground_jobs.all().aggregate(Max("days_worked"))["days_worked__max"] or 0 @property def study_working_time_worked(self): return self.jobs.all().aggregate(Max("days_worked"))["days_worked__max"] or 0 @property def excavation_average_team_planned(self): return round( sum([(job.man_by_day_planned or 0) * (job.days_planned or 0) for job in self.ground_jobs.all()]) / (self.excavation_working_time_planned or 1) ) @property def excavation_average_team_worked(self): return round( sum([(job.man_by_day_worked or 0) * (job.days_worked or 0) for job in self.ground_jobs.all()]) / (self.excavation_working_time_worked or 1) ) @property def study_average_team_planned(self): return round( sum([(job.man_by_day_planned or 0) * (job.days_planned or 0) for job in self.jobs.all()]) / (self.study_working_time_planned or 1) ) @property def study_average_team_worked(self): return round( sum([(job.man_by_day_worked or 0) * (job.days_worked or 0) for job in self.jobs.all()]) / (self.study_working_time_worked or 1) ) @property def job_cost_planned(self): return sum(job.cost_planned for job in self.jobs.all()) @property def ground_job_cost_planned(self): return sum(job.cost_planned for job in self.ground_jobs.all()) @property def job_cost_worked(self): return sum(job.cost_worked for job in self.jobs.all()) @property def ground_job_cost_worked(self): return sum(job.cost_worked for job in self.ground_jobs.all()) @property def job_cost_diff_planned_worked(self): return self.job_cost_planned - self.job_cost_worked @property def ground_job_cost_diff_planned_worked(self): return self.ground_job_cost_planned - self.ground_job_cost_worked @property def used_equipments(self): """ -> [("Engins mécaniques", cost_planned, cost_worked, diff planned - worked, [cost1, cost2, ...])] """ equipments = [] service_types = list(GenericEquipmentServiceType.objects.all()) for service_type in service_types: q = self.equipment_costs.filter( equipment_service_cost__equipment_service_type__generic_equipment_type=service_type) if not q.count(): continue equipments.append([service_type.label, 0, 0, 0, []]) for cost in q.all(): equipments[-1][-4] += cost.cost_planned equipments[-1][-3] += cost.cost_worked equipments[-1][-1].append(cost) equipments[-1][-2] = equipments[-1][-4] - equipments[-1][-3] return equipments def get_extra_actions(self, request): # url, base_text, icon, extra_text, extra css class, is a quick action actions = super(File, self).get_extra_actions(request) if self.can_do(request, "change_operation"): actions += [ ( reverse("file-parcels-modify", args=[self.pk]), _("Modify parcels"), "fa fa-pencil", _("parcels"), "", True, ), ] profile = get_current_profile() if profile.preventive_operator: actions += [ ( reverse("file-edit-preventive", args=[self.pk]), _("Edit intervention plan"), "fa fa-pencil", _("plan"), "", False, ), ] if self.can_do(request, "add_administrativeact"): actions += [ ( reverse("file-add-adminact", args=[self.pk]), _("Add associated administrative act"), "fa fa-plus", _("admin. act"), "", False, ), ] if self.can_do(request, "add_operation"): actions += [ ( reverse("file-add-operation", args=[self.pk]), _("Add operation"), "fa fa-plus", _("operation"), "", False, ) ] return actions def save(self, *args, **kwargs): returned = super(File, self).save(*args, **kwargs) if ( not getattr(self, "_no_new_add", None) and self.main_town and self.main_town not in list(self.towns.all()) ): self._no_new_add = True self.towns.add(self.main_town) updated = self.update_raw_town_planning_service() updated += self.update_planning_service() self.update_resp_planning_service() updated += self.update_raw_general_contractor() updated += self.update_corpo_general_contractor() if not self.external_id or self.auto_external_id: external_id = get_generated_id("file_external_id", self) if external_id != self.external_id: updated = True self.auto_external_id = True self.external_id = external_id complete_identifier = get_generated_id("file_complete_identifier", self) if complete_identifier != self.complete_identifier: updated = True self.complete_identifier = complete_identifier cached_label = self._generate_cached_label() if cached_label != self.cached_label: updated = True self.cached_label = cached_label if updated: self._cached_label_checked = False self.save() return returned self.update_delay_date() self.update_short_menu_class() return returned def is_preventive(self): return FileType.is_preventive(self.file_type.pk) m2m_changed.connect(cached_label_changed, sender=File.towns.through) post_save.connect(cached_label_changed, sender=File) for attr in File.HISTORICAL_M2M: m2m_changed.connect(m2m_historization_changed, sender=getattr(File, attr).through) class FileByDepartment(models.Model): """ Database view for dashboard """ CREATE_SQL = """ CREATE VIEW file_department (id, department_id, file_id) as select town."id", town."departement_id", file_towns."file_id" from ishtar_common_town town inner join archaeological_files_file_towns file_towns on file_towns."town_id"=town."id" order by town."departement_id"; CREATE RULE file_department_delete AS ON DELETE TO file_department DO INSTEAD(); """ DELETE_SQL = """ DROP VIEW IF EXISTS file_department; """ file = models.ForeignKey(File, verbose_name=_("File"), on_delete=models.DO_NOTHING) department = models.ForeignKey( Department, verbose_name=_("Department"), on_delete=models.DO_NOTHING, blank=True, null=True, ) class Meta: managed = False db_table = "file_department" def __str__(self): return "{} - {}".format(self.file, self.department) class ManDays(models.Model): man_by_day_planned = models.FloatField( _("Man by day - planned"), null=True, blank=True ) days_planned = models.FloatField(_("Days - planned"), null=True, blank=True) man_by_day_worked = models.FloatField( _("Man by day - worked"), null=True, blank=True ) days_worked = models.FloatField(_("Days - worked"), null=True, blank=True) class Meta: abstract = True @property def quantity_planned(self): if not self.days_planned or not self.man_by_day_planned: return 0 return self.days_planned * self.man_by_day_planned @property def quantity_worked(self): if not self.days_worked or not self.man_by_day_worked: return 0 return self.days_worked * self.man_by_day_worked class PreventiveFileGroundJob(ManDays): file = models.ForeignKey(File, related_name="ground_jobs", on_delete=models.CASCADE) job = models.ForeignKey(Job, on_delete=models.CASCADE, verbose_name=_("Job")) class Meta: ordering = ("job",) verbose_name = _("Human requirement on field") verbose_name_plural = _("Human requirements on field") ADMIN_SECTION = _("Preventive") @property def cost_planned(self): return (self.job.ground_daily_cost or 0) * self.quantity_planned @property def cost_worked(self): return (self.job.ground_daily_cost or 0) * self.quantity_worked class PreventiveFileJob(ManDays): file = models.ForeignKey(File, related_name="jobs", on_delete=models.CASCADE) job = models.ForeignKey(Job, on_delete=models.CASCADE, verbose_name=_("Job")) class Meta: ordering = ("job",) verbose_name = _("Human requirement for post-excavation") verbose_name_plural = _("Human requirements for post-excavation") ADMIN_SECTION = _("Preventive") @property def cost_planned(self): return (self.job.daily_cost or 0) * self.quantity_planned @property def cost_worked(self): return (self.job.daily_cost or 0) * self.quantity_worked class PreventiveFileEquipmentServiceCost(models.Model): file = models.ForeignKey(File, related_name="equipment_costs", on_delete=models.CASCADE) equipment_service_cost = models.ForeignKey(EquipmentServiceCost, on_delete=models.CASCADE) quantity_by_day_planned = models.FloatField( _("Quantity by day - planned"), null=True, blank=True ) days_planned = models.FloatField(_("Days - planned"), null=True, blank=True) quantity_by_day_worked = models.FloatField( _("Quantity by day - worked"), null=True, blank=True ) days_worked = models.FloatField(_("Days - worked"), null=True, blank=True) class Meta: ordering = ("equipment_service_cost",) verbose_name = _("Equipment requirement") verbose_name_plural = _("Equipment requirements") ADMIN_SECTION = _("Preventive") @property def quantity_planned(self): if self.equipment_service_cost.flat_rate: return self.quantity_by_day_planned or 0 if not self.days_planned or not self.quantity_by_day_planned: return 0 return self.days_planned * self.quantity_by_day_planned @property def quantity_worked(self): if self.equipment_service_cost.flat_rate: return self.quantity_by_day_worked or 0 if not self.quantity_by_day_worked or not self.days_worked: return 0 return self.days_worked * self.quantity_by_day_worked @property def cost_planned(self): return (self.equipment_service_cost.unitary_cost or 0) * self.quantity_planned @property def cost_worked(self): return (self.equipment_service_cost.unitary_cost or 0) * self.quantity_worked