#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2012-2017 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import datetime from collections import OrderedDict from django.apps import apps from django.conf import settings from django.contrib.gis.db import models from django.contrib.postgres.indexes import GinIndex from django.core.cache import cache from django.core.validators import MinValueValidator, MaxValueValidator from django.db.models import Q, Count, Sum, Max from django.db.models.signals import post_save, m2m_changed, post_delete from django.core.urlresolvers import reverse from ishtar_common.utils import ugettext_lazy as _, pgettext_lazy from ishtar_common.utils import ( cached_label_changed, get_cache, get_current_year, m2m_historization_changed, ) from ishtar_common.models import ( GeneralType, BaseHistorizedItem, OwnPerms, Person, Organization, Town, Dashboard, DashboardFormItem, ValueGetter, MainItem, OperationType, get_generated_id, post_save_cache, Document, HistoryModel, SearchAltName, SearchVectorConfig, DocumentItem, CompleteIdentifierItem, HierarchicalType, ) from ishtar_common.models_common import HistoricalRecords, Department from archaeological_operations.models import ( get_values_town_related, ClosedItem, ParcelItem, ) class PriceAgreement(GeneralType): order = models.IntegerField(_("Order"), default=10) start_date = models.DateField(_("Start date"), blank=True, null=True) end_date = models.DateField(_("End date"), blank=True, null=True) class Meta: verbose_name = _("Price agreement") verbose_name_plural = _("Price agreement") ordering = ( "order", "start_date", "end_date", "label", ) class Job(GeneralType): price_agreement = models.ForeignKey( PriceAgreement, verbose_name=_("Price agreement"), blank=True, null=True, on_delete=models.CASCADE ) ground_daily_cost = models.FloatField(_("Ground daily cost"), blank=True, null=True) daily_cost = models.FloatField(_("Daily cost"), blank=True, null=True) permanent_contract = models.NullBooleanField( _("Permanent contract"), blank=True, null=True ) default_daily_need_on_ground = models.FloatField( _("Def. daily number on ground"), default=0 ) default_daily_need = models.FloatField(_("Def. daily number on study"), default=0) order = models.IntegerField(_("Order"), default=10) child = models.ForeignKey( "self", blank=True, null=True, verbose_name=_("Child"), help_text=_("Auto-add this job when a parent is added"), related_name="parents", ) class Meta: verbose_name = _("Job") verbose_name_plural = _("Jobs") ordering = ( "order", "-permanent_contract", "label", ) def __str__(self): lbl = self.label if not self.permanent_contract: lbl += " ({})".format(_("fixed-term contract")) return lbl @classmethod def get_choices(cls, current_value, price_agreement_id=None): q = cls.objects.filter( available=True, parents__isnull=True, ) if price_agreement_id: q = q.filter(price_agreement=price_agreement_id) permanent = [(j.pk, str(j)) for j in q.filter(permanent_contract=True).all()] fixed_term = [(j.pk, str(j)) for j in q.filter(permanent_contract=False).all()] if current_value: if current_value.permanent_contract: permanent.append((current_value.pk, str(current_value))) else: fixed_term.append((current_value.pk, str(current_value))) return [("", "-" * 9)] + [ ( _("Permanent contract"), permanent, ), ( _("Fixed-term contract"), fixed_term, ), ] class GenericEquipmentServiceType(GeneralType): order = models.IntegerField(_("Order"), default=10) class Meta: verbose_name = _("Generic equipment type") verbose_name_plural = _("Generic equipment types") ordering = ( "order", "label", ) class EquipmentServiceType(GeneralType): generic_equipment_type = models.ForeignKey( GenericEquipmentServiceType, verbose_name=_("Generic type") ) order = models.IntegerField(_("Order"), default=10) class Meta: verbose_name = _("Equipment/service type") verbose_name_plural = _("Equipment/service types") ordering = ( "order", "label", ) ES_UNITS = ( ("D", _("days")), ("W", _("weeks")), ("M", _("months")), ("L", _("linear m.")), ) DCT_ES_UNITS = dict(ES_UNITS) ES_UNITS_DAYS = { "D": 1, "W": 7, "M": 30, "L": 0 } class EquipmentServiceCost(models.Model): price_agreement = models.ForeignKey( PriceAgreement, verbose_name=_("Price agreement"), blank=True, null=True, on_delete=models.CASCADE ) equipment_service_type = models.ForeignKey( EquipmentServiceType, verbose_name=_("Equipment/Service") ) slug = models.SlugField( _("Textual ID"), unique=True, max_length=300, help_text=_( "The slug is the standardized version of the name. It contains " "only lowercase letters, numbers and hyphens. Each slug must " "be unique." ), ) service_provider = models.CharField( _("Service provider"), max_length=200, blank=True, default="" ) flat_rate = models.BooleanField(_("Flat rate"), default=False) unitary_cost = models.FloatField(_("Unitary cost"), blank=True, null=True) unit = models.CharField( _("Unit"), max_length=1, choices=ES_UNITS, blank=True, null=True ) specificity = models.CharField( _("Specificity"), blank=True, max_length=200, default="" ) default_quantity_by_day = models.IntegerField( _("Default quantity by day"), default=0) order = models.IntegerField(_("Order"), default=10) available = models.BooleanField(_("Available"), default=True) parent = models.ForeignKey( EquipmentServiceType, blank=True, null=True, on_delete=models.CASCADE, verbose_name=_("Parent"), help_text=_("Auto-add this cost when a parent is added"), related_name="children", ) class Meta: verbose_name = _("Equipment/service cost") verbose_name_plural = _("Equipment/service costs") ordering = ( "order", "equipment_service_type__label", ) def __str__(self): lbl = "" if self.parent: lbl = self.parent.label + " - " lbl += str(self.equipment_service_type) if self.specificity: lbl += " - " + self.specificity if self.service_provider: lbl += f" ({self.service_provider})" if self.flat_rate: lbl += " - " + str(_("Flat rate")) if self.unit and self.unit in DCT_ES_UNITS: lbl += " - " + str(DCT_ES_UNITS[self.unit]) return lbl def natural_key(self): return (self.slug,) def history_compress(self): return self.slug @property def unit_label(self): if self.unit and self.unit in DCT_ES_UNITS: return DCT_ES_UNITS[self.unit] return "" class FileType(GeneralType): class Meta: verbose_name = _("Archaeological file type") verbose_name_plural = _("Archaeological file types") ordering = ("label",) @classmethod def is_preventive(cls, file_type_id, key=""): key = key or "preventive" try: preventive = FileType.get_cache(key).pk return file_type_id == preventive except (FileType.DoesNotExist, AttributeError): return False post_save.connect(post_save_cache, sender=FileType) post_delete.connect(post_save_cache, sender=FileType) class PermitType(GeneralType): class Meta: verbose_name = _("Permit type") verbose_name_plural = _("Permit types") ordering = ("label",) post_save.connect(post_save_cache, sender=PermitType) post_delete.connect(post_save_cache, sender=PermitType) if settings.COUNTRY == "fr": class SaisineType(GeneralType, ValueGetter): delay = models.IntegerField(_("Delay (in days)"), default=30) class Meta: verbose_name = "Type de saisine" verbose_name_plural = "Types de saisine" ordering = ("label",) post_save.connect(post_save_cache, sender=SaisineType) post_delete.connect(post_save_cache, sender=SaisineType) class AgreementType(GeneralType): class Meta: verbose_name = _("Agreement type - France") verbose_name_plural = _("Agreement types - France") ordering = ("label",) post_save.connect(post_save_cache, sender=AgreementType) post_delete.connect(post_save_cache, sender=AgreementType) class OperationTypeForRoyalties(GeneralType): increase_coefficient = models.FloatField( _("Increase coefficient"), default=1) increased_final_value = models.FloatField( _("Increased final value"), default=1) category = models.PositiveSmallIntegerField( _("Category"), default=1, validators=[MinValueValidator(1), MaxValueValidator(5)]) class Meta: verbose_name = _("Operation type for royalties - France") verbose_name_plural = _("Operation types for royalties - France") ordering = ("id",) post_save.connect(post_save_cache, sender=OperationTypeForRoyalties) post_delete.connect(post_save_cache, sender=OperationTypeForRoyalties) class File( ClosedItem, DocumentItem, BaseHistorizedItem, CompleteIdentifierItem, OwnPerms, ValueGetter, MainItem, DashboardFormItem, ParcelItem, ): SLUG = "file" SHOW_URL = "show-file" DELETE_URL = "delete-file" APP = "archaeological-files" MODEL = "file" TABLE_COLS = [ "numeric_reference", "year", "internal_reference", "file_type", "saisine_type", "towns_label", ] # statistics STATISTIC_MODALITIES_OPTIONS = OrderedDict( [ ("year", _("Year")), ("file_type__label", _("File type")), ("towns__areas__label", _("Area")), ("towns__areas__parent__label", _("Extended area")), ("saisine_type__label", "Type de saisine"), ("permit_type__label", _("Permit type")), ("requested_operation_type__label", _("File type")), ] ) STATISTIC_MODALITIES = [key for key, lbl in STATISTIC_MODALITIES_OPTIONS.items()] STATISTIC_SUM_VARIABLE = OrderedDict( ( ("pk", (_("Number"), 1)), ("total_surface", (_("Total surface (km2)"), 0.000001)), ("total_developed_surface", (_("Total developed surface (km2)"), 0.000001)), ) ) # search parameters BOOL_FIELDS = ["end_date__isnull"] EXTRA_REQUEST_KEYS = { "parcel_0": ("parcels__section", "operations__parcels__section"), "parcel_1": ("parcels__parcel_number", "operations__parcels__parcel_number"), "parcel_2": ("operations__parcels__public_domain", "parcels__public_domain"), "end_date": "end_date__isnull", "towns__numero_insee__startswith": "towns__numero_insee__startswith", "name": "name__icontains", "cached_label": "cached_label__icontains", "comment": "comment__icontains", "permit_reference": "permit_reference__icontains", "general_contractor__attached_to": "general_contractor__attached_to__pk", "history_creator": "history_creator__ishtaruser__person__pk", "history_modifier": "history_modifier__ishtaruser__person__pk", "towns_label": "towns", "general_contractor__pk": "general_contractor__pk", "responsible_town_planning_service__pk": "responsible_town_planning_service__pk", "in_charge__pk": "in_charge__pk", } BASE_SEARCH_VECTORS = [ SearchVectorConfig("name"), SearchVectorConfig("internal_reference"), SearchVectorConfig("file_type__label"), SearchVectorConfig("saisine_type__label"), SearchVectorConfig("permit_type__label"), SearchVectorConfig("permit_reference"), SearchVectorConfig("comment", "local"), SearchVectorConfig("research_comment", "local"), ] INT_SEARCH_VECTORS = [ SearchVectorConfig("numeric_reference"), SearchVectorConfig("year"), ] M2M_SEARCH_VECTORS = [SearchVectorConfig("towns__name")] PARENT_SEARCH_VECTORS = [ "in_charge", "general_contractor", "corporation_general_contractor", "responsible_town_planning_service", "planning_service", "organization", "scientist", ] COL_LABELS = { "towns_label": _("Towns"), } REVERSED_BOOL_FIELDS = [ "documents__image__isnull", "documents__associated_file__isnull", "documents__associated_url__isnull", ] # alternative names of fields for searches ALT_NAMES = { "year": SearchAltName(pgettext_lazy("key for text search", "year"), "year"), "numeric_reference": SearchAltName( pgettext_lazy("key for text search", "reference"), "numeric_reference" ), "internal_reference": SearchAltName( pgettext_lazy("key for text search", "other-reference"), "internal_reference__iexact", ), "towns": SearchAltName( pgettext_lazy("key for text search", "town"), "towns__cached_label__iexact" ), "parcel": SearchAltName( pgettext_lazy("key for text search", "parcel"), "parcels__cached_label__iexact", ), "towns__numero_insee__startswith": SearchAltName( pgettext_lazy("key for text search", "department"), "towns__numero_insee__startswith", ), "name": SearchAltName( pgettext_lazy("key for text search", "name"), "name__iexact" ), "file_type": SearchAltName( pgettext_lazy("key for text search", "type"), "file_type__label__iexact" ), "end_date": SearchAltName( pgettext_lazy("key for text search", "active"), "end_date__isnull" ), "saisine_type": SearchAltName( pgettext_lazy("key for text search", "saisine-type"), "saisine_type__label__iexact", ), "permit_type": SearchAltName( pgettext_lazy("key for text search", "permit-type"), "permit_type__label__iexact", ), "permit_reference": SearchAltName( pgettext_lazy("key for text search", "permit-reference"), "permit_reference__iexact", ), "comment": SearchAltName( pgettext_lazy("key for text search", "comment"), "comment__iexact" ), "in_charge": SearchAltName( pgettext_lazy("key for text search", "in-charge"), "in_charge__cached_label__iexact", ), "general_contractor": SearchAltName( pgettext_lazy("key for text search", "general-contractor"), "general_contractor__cached_label__iexact", ), "general_contractor__attached_to": SearchAltName( pgettext_lazy("key for text search", "general-contractor-organization"), "general_contractor__attached_to__cached_label__iexact", ), } ALT_NAMES.update(BaseHistorizedItem.ALT_NAMES) ALT_NAMES.update(DocumentItem.ALT_NAMES) POST_PROCESS_REQUEST = { "towns__numero_insee__startswith": "_get_department_code", } HISTORICAL_M2M = ["towns", "departments"] # fields year = models.IntegerField(_("Year"), default=get_current_year) numeric_reference = models.IntegerField( _("Numeric reference"), blank=True, null=True ) internal_reference = models.CharField( _("Internal reference"), blank=True, null=True, max_length=60 ) external_id = models.CharField( _("External ID"), blank=True, null=True, max_length=120 ) auto_external_id = models.BooleanField( _("External ID is set automatically"), default=False ) name = models.TextField(_("Name"), blank=True, default="") file_type = models.ForeignKey(FileType, verbose_name=_("File type")) in_charge = models.ForeignKey( Person, related_name="file_responsability", verbose_name=_("Person in charge"), on_delete=models.SET_NULL, blank=True, null=True, ) general_contractor = models.ForeignKey( Person, related_name="general_contractor_files", verbose_name=_("General contractor"), blank=True, null=True, on_delete=models.SET_NULL, ) # aménageur - personne raw_general_contractor = models.CharField( _("General contractor (raw)"), max_length=200, blank=True, null=True ) corporation_general_contractor = models.ForeignKey( Organization, related_name="general_contractor_files", verbose_name=_("General contractor organization"), blank=True, null=True, on_delete=models.SET_NULL, ) # aménageur responsible_town_planning_service = models.ForeignKey( Person, related_name="responsible_town_planning_service_files", blank=True, null=True, verbose_name=_("Responsible for planning service"), on_delete=models.SET_NULL, ) # service instructeur - personne raw_town_planning_service = models.CharField( _("Planning service (raw)"), max_length=200, blank=True, null=True ) planning_service = models.ForeignKey( Organization, related_name="planning_service_files", blank=True, null=True, verbose_name=_("Planning service organization"), on_delete=models.SET_NULL, ) # service instructeur permit_type = models.ForeignKey( PermitType, verbose_name=_("Permit type"), blank=True, null=True, on_delete=models.SET_NULL, ) permit_reference = models.TextField(_("Permit reference"), blank=True, default="") end_date = models.DateField(_("Closing date"), null=True, blank=True) main_town = models.ForeignKey( Town, verbose_name=_("Main town"), null=True, blank=True, related_name="file_main", on_delete=models.SET_NULL, ) towns = models.ManyToManyField( Town, verbose_name=_("Towns"), related_name="file", blank=True ) creation_date = models.DateField( _("Creation date"), default=datetime.date.today, blank=True, null=True ) reception_date = models.DateField(_("Reception date"), blank=True, null=True) planning_service_date = models.DateField( _("Date of planning service file"), null=True, blank=True ) related_file = models.ForeignKey( "File", verbose_name=_("Related file"), blank=True, null=True, on_delete=models.SET_NULL, ) if settings.COUNTRY == "fr": saisine_type = models.ForeignKey( SaisineType, blank=True, null=True, on_delete=models.SET_NULL, verbose_name="Type de saisine", ) instruction_deadline = models.DateField( _("Instruction deadline"), blank=True, null=True ) total_surface = models.FloatField(_("Total surface (m2)"), blank=True, null=True) total_developed_surface = models.FloatField( _("Total developed surface (m2)"), blank=True, null=True ) locality = models.CharField(_("Locality"), max_length=100, null=True, blank=True) address = models.TextField(_("Main address"), blank=True, default="") postal_code = models.CharField( _("Main address - postal code"), max_length=10, null=True, blank=True ) comment = models.TextField(_("Comment"), blank=True, default="") # research archaeology --> departments = models.ManyToManyField( Department, verbose_name=_("Departments"), blank=True ) requested_operation_type = models.ForeignKey( OperationType, related_name="+", on_delete=models.SET_NULL, null=True, blank=True, verbose_name=_("Requested operation type"), ) organization = models.ForeignKey( Organization, blank=True, null=True, verbose_name=_("Organization"), related_name="files", on_delete=models.SET_NULL, ) scientist = models.ForeignKey( Person, blank=True, null=True, related_name="scientist", on_delete=models.SET_NULL, verbose_name=_("Scientist in charge"), ) research_comment = models.TextField( _("Research archaeology comment"), blank=True, default="" ) classified_area = models.NullBooleanField( _("Classified area"), blank=True, null=True ) protected_area = models.NullBooleanField(_("Protected area"), blank=True, null=True) if settings.COUNTRY == "fr": cira_advised = models.NullBooleanField("Passage en CIRA", blank=True, null=True) mh_register = models.NullBooleanField( "Sur Monument Historique classé", blank=True, null=True ) mh_listing = models.NullBooleanField( "Sur Monument Historique inscrit", blank=True, null=True ) # <-- research archaeology # --> preventive detail price_agreement = models.ForeignKey( PriceAgreement, verbose_name=_("Price agreement"), blank=True, null=True, on_delete=models.SET_NULL ) study_period = models.CharField( _("Study period"), max_length=200, default="", blank=True ) start_date = models.DateField(_("Start date"), blank=True, null=True) end_date = models.DateField(_("End date"), blank=True, null=True) ground_start_date = models.DateField(_("Ground start date"), blank=True, null=True) ground_end_date = models.DateField(_("Ground end date"), blank=True, null=True) execution_report_date = models.DateField( _("Execution report date"), blank=True, null=True ) linear_meter = models.IntegerField(_("Linear meter"), blank=True, null=True) type_of_agreement = models.ForeignKey( AgreementType, blank=True, null=True, on_delete=models.SET_NULL, verbose_name=_("Type of agreement"), ) operation_type_for_royalties = models.ForeignKey( OperationTypeForRoyalties, blank=True, null=True, on_delete=models.SET_NULL, verbose_name=_("Operation type for royalties"), ) # <-- preventive detail documents = models.ManyToManyField( Document, related_name="files", verbose_name=_("Documents"), blank=True ) cached_label = models.TextField( _("Cached name"), blank=True, default="", db_index=True, help_text=_("Generated automatically - do not edit"), ) imported_line = models.TextField(_("Imported line"), blank=True, default="") history = HistoricalRecords(bases=[HistoryModel]) GET_VALUES_EXTRA = ValueGetter.GET_VALUES_EXTRA + [ "general_contractor_address_1", "general_contractor_address_2", "general_contractor_address_3", "get_locality", "excavation_working_time_planned", "excavation_working_time_worked", "study_working_time_planned", "study_working_time_worked", "excavation_average_team_planned", "excavation_average_team_worked", "study_average_team_planned", "study_average_team_worked", ] class Meta: verbose_name = _("Archaeological file") verbose_name_plural = _("Archaeological files") permissions = ( ("view_file", "Can view all Archaeological files"), ("view_own_file", "Can view own Archaeological file"), ("add_own_file", "Can add own Archaeological file"), ("change_own_file", "Can change own Archaeological file"), ("delete_own_file", "Can delete own Archaeological file"), ("close_file", "Can close File"), ) ordering = ("cached_label",) indexes = [ GinIndex(fields=["data"]), ] @classmethod def _get_department_code(cls, value): if not settings.ISHTAR_DPTS: return "" for k, v in settings.ISHTAR_DPTS: if v.lower() == value: return k return "" def _get_base_image_path(self): return "{}/{}".format(self.SLUG, self.reference or self.short_label) @property def short_class_name(self): return _("FILE") @property def full_internal_ref(self): return "{}{}".format(settings.ISHTAR_FILE_PREFIX or "", self.external_id or "") @property def delay_date(self): cache_key, val = get_cache(self.__class__, [self.pk, "delay_date"]) if val: return val return self.update_delay_date(cache_key) def update_delay_date(self, cache_key=None): if not cache_key: cache_key, val = get_cache(self.__class__, [self.pk, "delay_date"]) date = self.reception_date if not date: date = datetime.date(2500, 1, 1) elif settings.COUNTRY == "fr" and self.saisine_type and self.saisine_type.delay: date += datetime.timedelta(days=self.saisine_type.delay) cache.set(cache_key, date, settings.CACHE_TIMEOUT) return date @property def has_adminact(self): cache_key, val = get_cache(self.__class__, [self.pk, "has_adminact"]) if val: return val return self.update_has_admin_act(cache_key) @property def get_locality(self): return " - ".join( [getattr(self, k) for k in ("locality", "address") if getattr(self, k)] ) @property def general_contractor_address_1(self): address = "" if self.general_contractor: if self.general_contractor.name: address = " ".join( [ str(getattr(self.general_contractor, key)) for key in ("title", "surname", "name") if getattr(self.general_contractor, key) ] ) elif self.general_contractor.raw_name: address = self.general_contractor.raw_name if ( not address and self.corporation_general_contractor and self.corporation_general_contractor.name ): address = self.corporation_general_contractor.name return address @property def general_contractor_address_2(self): address = "" if self.general_contractor and self.general_contractor.address: address = self.general_contractor.address if self.general_contractor.address_complement: address += " " + self.general_contractor.address_complement if ( not address and self.corporation_general_contractor and self.corporation_general_contractor.address ): address = self.corporation_general_contractor.address if self.corporation_general_contractor.address_complement: address += " " + self.corporation_general_contractor.address_complement return address @property def general_contractor_address_3(self): address = "" if self.general_contractor and self.general_contractor.postal_code: address = " ".join( [ getattr(self.general_contractor, key) for key in ("postal_code", "town") if getattr(self.general_contractor, key) ] ) if ( not address and self.corporation_general_contractor and self.corporation_general_contractor.address ): address = " ".join( [ getattr(self.corporation_general_contractor, key) for key in ("postal_code", "town") if getattr(self.corporation_general_contractor, key) ] ) return address @classmethod def similar_files(cls, parcels): # get similar parcels similar = set() for parcel in parcels: q = cls.objects.filter( parcels__town__pk=parcel["town"], parcels__section=parcel["section"], parcels__parcel_number=parcel["parcel_number"], ) if q.count(): for fle in q.all(): similar.add(fle) return similar def update_has_admin_act(self, cache_key=None): if not cache_key: cache_key, val = get_cache(self.__class__, [self.pk, "has_adminact"]) has_adminact = ( self.administrative_act.exclude(act_type__txt_idx="a_receipt").count() or self.operations.count() ) cache.set(cache_key, has_adminact, settings.CACHE_TIMEOUT) return has_adminact @classmethod def get_short_menu_class(cls, pk): cache_key, val = get_cache(cls, [pk, "short_class_name"]) if val: return val q = cls.objects.filter(pk=pk) if not q.count(): return "" item = q.all()[0] return item.update_short_menu_class(cache_key) def update_short_menu_class(self, cache_key=None): if not cache_key: cache_key, val = get_cache(self.__class__, [self.pk, "short_class_name"]) cls = "normal" if not self.file_type.txt_idx == "preventive": cls = "blue" elif not self.has_adminact and self.reception_date: delta = datetime.date.today() - self.reception_date cls = "red" if self.saisine_type and self.saisine_type.delay: if delta.days <= (self.saisine_type.delay * 2 / 3): cls = "green" elif delta.days <= self.saisine_type.delay: cls = "orange" cache.set(cache_key, cls, settings.CACHE_TIMEOUT) return cls @classmethod def get_owns( cls, user, menu_filtr=None, limit=None, values=None, get_short_menu_class=False ): owns = super(File, cls).get_owns( user, limit=limit, values=values, get_short_menu_class=get_short_menu_class ) return cls._return_get_owns(owns, values, get_short_menu_class) def get_dynamic_values(self, prefix, values, filtr=None): q = GenericEquipmentServiceType.objects.filter(available=True) for equipment_service in q.all(): pk = equipment_service.pk #if not filtr or not any( # key for f in filtr if f.startswith(prefix + key) #): q = self.equipment_costs.filter( equipment_service_cost__equipment_service_type__generic_equipment_type_id=pk, equipment_service_cost__unit__isnull=False, ).exclude( equipment_service_cost__unit__in=("L", ""), # exclude linear meter ) slug = equipment_service.txt_idx.replace("-", "_") key_time_planned = slug + "_equipment_time_planned" values[prefix + key_time_planned] = 0 key_time_worked = slug + "_equipment_time_worked" values[prefix + key_time_worked] = 0 key_qt_planned = slug + "_equipment_sum_planned" values[prefix + key_qt_planned] = 0 key_qt_worked = slug + "_equipment_sum_worked" values[prefix + key_qt_worked] = 0 for cost in q.all(): values[prefix + key_qt_planned] += (cost.quantity_by_day_planned or 0) values[prefix + key_qt_worked] += (cost.quantity_by_day_worked or 0) print() value = ES_UNITS_DAYS[cost.equipment_service_cost.unit] * ( cost.days_planned or 0) if value > values[prefix + key_time_planned]: values[prefix + key_time_planned] = value value = ES_UNITS_DAYS[cost.equipment_service_cost.unit] * ( cost.days_worked or 0) if value > values[prefix + key_time_planned]: values[prefix + key_time_worked] = value return values def get_values(self, prefix="", no_values=False, filtr=None, **kwargs): values = super(File, self).get_values( prefix=prefix, no_values=no_values, filtr=filtr, **kwargs ) values = get_values_town_related(self, prefix, values, filtr=filtr) values = self.get_dynamic_values(prefix, values, filtr=filtr) return values def render_parcels(self): Parcel = apps.get_model("archaeological_operations", "Parcel") return Parcel.render_parcels(list(self.parcels.all())) def __str__(self): return self.cached_label or "" @property def short_label(self): return settings.JOINT.join(str(self).split(settings.JOINT)[1:]) @property def reference(self): return self.external_id or "" def _generate_cached_label(self): items = [self.get_town_label(), self.reference] items += [ str(getattr(self, k)) for k in ["internal_reference", "name"] if getattr(self, k) ] return settings.JOINT.join(items) def grouped_parcels(self): from archaeological_operations.models import Parcel return Parcel.grouped_parcels(list(self.parcels.all())) def get_town_label(self): lbl = str(_("Multi-town")) if self.main_town: lbl = self.main_town.name elif self.towns.count() == 1: lbl = self.towns.all()[0].name elif self.towns.count() == 0: lbl = str(_("No town")) return lbl def get_department(self): if not self.towns.count(): return "00" return self.towns.all()[0].numero_insee[:2] @classmethod def _get_query_owns_dicts(cls, ishtaruser): profile = ishtaruser.current_profile town_ids = [] if profile: town_ids = [town["pk"] for town in profile.query_towns.values("pk").all()] return [ { "in_charge": ishtaruser.person, "history_creator": ishtaruser.user_ptr, "towns__pk__in": town_ids, }, {"end_date__isnull": True}, ] @classmethod def get_query_owns(cls, ishtaruser): return cls._construct_query_own("", cls._get_query_owns_dicts(ishtaruser)) def is_active(self): return not bool(self.end_date) @property def town_list(self): return ", ".join([str(tw) for tw in self.towns.all()]) def total_surface_ha(self): if self.total_surface: return self.total_surface / 10000.0 def total_developed_surface_ha(self): if self.total_developed_surface: return self.total_developed_surface / 10000.0 def operation_acts(self): acts = [] for ope in self.operations.all(): for act in ope.administrative_act.all(): acts.append(act) return acts def update_raw_town_planning_service(self): if ( self.raw_town_planning_service and not self.responsible_town_planning_service ) or not self.responsible_town_planning_service: return False current_lbl = "" if self.raw_town_planning_service: current_lbl = self.raw_town_planning_service[:] lbl = str(self.responsible_town_planning_service) if not lbl: return False self.raw_town_planning_service = lbl[:200] return current_lbl != self.raw_town_planning_service def update_planning_service(self): if ( not self.responsible_town_planning_service or not self.responsible_town_planning_service.attached_to or self.planning_service ): return False self.planning_service = self.responsible_town_planning_service.attached_to return True def update_resp_planning_service(self): if ( not self.responsible_town_planning_service or self.responsible_town_planning_service.attached_to or not self.planning_service ): return False self.responsible_town_planning_service.attached_to = self.planning_service self.responsible_town_planning_service.save() return True def update_raw_general_contractor(self): if ( self.raw_general_contractor and not self.general_contractor ) or not self.general_contractor: return False current_lbl = "" if self.raw_general_contractor: current_lbl = self.raw_general_contractor[:] lbl = str(self.general_contractor) if not lbl: return False self.raw_general_contractor = lbl[:200] return current_lbl != self.raw_general_contractor def update_corpo_general_contractor(self): if ( not self.general_contractor or self.general_contractor.attached_to == self.corporation_general_contractor ): return False if self.general_contractor.attached_to: self.corporation_general_contractor = self.general_contractor.attached_to else: self.general_contractor.attached_to = self.corporation_general_contractor self.general_contractor.save() return True @property def excavation_working_time_planned(self): return self.ground_jobs.all().aggregate(Max("days_planned"))["days_planned__max"] or 0 @property def study_working_time_planned(self): return self.jobs.all().aggregate(Max("days_planned"))["days_planned__max"] or 0 @property def excavation_working_time_worked(self): return self.ground_jobs.all().aggregate(Max("days_worked"))["days_worked__max"] or 0 @property def study_working_time_worked(self): return self.jobs.all().aggregate(Max("days_worked"))["days_worked__max"] or 0 @property def excavation_average_team_planned(self): return round( sum([(job.man_by_day_planned or 0) * (job.days_planned or 0) for job in self.ground_jobs.all()]) / (self.excavation_working_time_planned or 1) ) @property def excavation_average_team_worked(self): return round( sum([(job.man_by_day_worked or 0) * (job.days_worked or 0) for job in self.ground_jobs.all()]) / (self.excavation_working_time_worked or 1) ) @property def study_average_team_planned(self): return round( sum([(job.man_by_day_planned or 0) * (job.days_planned or 0) for job in self.jobs.all()]) / (self.study_working_time_planned or 1) ) @property def study_average_team_worked(self): return round( sum([(job.man_by_day_worked or 0) * (job.days_worked or 0) for job in self.jobs.all()]) / (self.study_working_time_worked or 1) ) @property def job_cost_planned(self): return sum(job.cost_planned for job in self.jobs.all()) @property def ground_job_cost_planned(self): return sum(job.cost_planned for job in self.ground_jobs.all()) @property def job_cost_worked(self): return sum(job.cost_worked for job in self.jobs.all()) @property def ground_job_cost_worked(self): return sum(job.cost_worked for job in self.ground_jobs.all()) @property def job_cost_diff_planned_worked(self): return self.job_cost_planned - self.job_cost_worked @property def ground_job_cost_diff_planned_worked(self): return self.ground_job_cost_planned - self.ground_job_cost_worked @property def used_equipments(self): equipments = [] service_types = list(GenericEquipmentServiceType.objects.all()) for service_type in service_types: q = self.equipment_costs.filter( equipment_service_cost__equipment_service_type__generic_equipment_type=service_type) if not q.count(): continue equipments.append([service_type.label, 0, 0, 0, []]) for cost in q.all(): equipments[-1][-4] += cost.cost_planned equipments[-1][-3] += cost.cost_worked equipments[-1][-1].append(cost) equipments[-1][-2] = equipments[-1][-4] - equipments[-1][-3] return equipments def get_extra_actions(self, request): # url, base_text, icon, extra_text, extra css class, is a quick action actions = super(File, self).get_extra_actions(request) if self.can_do(request, "add_administrativeact"): actions += [ ( reverse("file-edit-preventive", args=[self.pk]), _("Edit intervention plan"), "fa fa-pencil", _("plan"), "", False, ), ( reverse("file-add-adminact", args=[self.pk]), _("Add associated administrative act"), "fa fa-plus", _("admin. act"), "", False, ), ] if self.can_do(request, "add_operation"): actions += [ ( reverse("file-add-operation", args=[self.pk]), _("Add operation"), "fa fa-plus", _("operation"), "", False, ) ] return actions def save(self, *args, **kwargs): returned = super(File, self).save(*args, **kwargs) if ( not getattr(self, "_no_new_add", None) and self.main_town and self.main_town not in list(self.towns.all()) ): self._no_new_add = True self.towns.add(self.main_town) updated = self.update_raw_town_planning_service() updated += self.update_planning_service() self.update_resp_planning_service() updated += self.update_raw_general_contractor() updated += self.update_corpo_general_contractor() if not self.external_id or self.auto_external_id: external_id = get_generated_id("file_external_id", self) if external_id != self.external_id: updated = True self.auto_external_id = True self.external_id = external_id if updated: self._cached_label_checked = False self.save() return returned self.update_delay_date() self.update_short_menu_class() return returned def is_preventive(self): return FileType.is_preventive(self.file_type.pk) m2m_changed.connect(cached_label_changed, sender=File.towns.through) post_save.connect(cached_label_changed, sender=File) for attr in File.HISTORICAL_M2M: m2m_changed.connect(m2m_historization_changed, sender=getattr(File, attr).through) class FileByDepartment(models.Model): """ Database view for dashboard """ CREATE_SQL = """ CREATE VIEW file_department (id, department_id, file_id) as select town."id", town."departement_id", file_towns."file_id" from ishtar_common_town town inner join archaeological_files_file_towns file_towns on file_towns."town_id"=town."id" order by town."departement_id"; CREATE RULE file_department_delete AS ON DELETE TO file_department DO INSTEAD(); """ DELETE_SQL = """ DROP VIEW IF EXISTS file_department; """ file = models.ForeignKey(File, verbose_name=_("File")) department = models.ForeignKey( Department, verbose_name=_("Department"), on_delete=models.DO_NOTHING, blank=True, null=True, ) class Meta: managed = False db_table = "file_department" def __str__(self): return "{} - {}".format(self.file, self.department) class FileDashboard: def __init__(self): from archaeological_operations.models import AdministrativeAct main_dashboard = Dashboard(File) self.total_number = main_dashboard.total_number types = File.objects.values("file_type", "file_type__label") self.types = types.annotate(number=Count("pk")).order_by("file_type") by_year = File.objects.extra({"date": "date_trunc('year', creation_date)"}) self.by_year = ( by_year.values("date").annotate(number=Count("pk")).order_by("-date") ) now = datetime.date.today() limit = datetime.date(now.year, now.month, 1) - datetime.timedelta(365) by_month = File.objects.filter(creation_date__gt=limit).extra( {"date": "date_trunc('month', creation_date)"} ) self.by_month = ( by_month.values("date").annotate(number=Count("pk")).order_by("-date") ) # research self.research = {} prog_type = FileType.objects.get(txt_idx="prog") researchs = File.objects.filter(file_type=prog_type) self.research["total_number"] = researchs.count() by_year = researchs.extra({"date": "date_trunc('year', creation_date)"}) self.research["by_year"] = ( by_year.values("date").annotate(number=Count("pk")).order_by("-date") ) by_month = researchs.filter(creation_date__gt=limit).extra( {"date": "date_trunc('month', creation_date)"} ) self.research["by_month"] = ( by_month.values("date").annotate(number=Count("pk")).order_by("-date") ) self.research["by_dpt"] = ( FileByDepartment.objects.filter( file__file_type=prog_type, department__isnull=False ) .values("department__label") .annotate(number=Count("file")) .order_by("department__label") ) FileTown = File.towns.through self.research["towns"] = ( FileTown.objects.filter(file__file_type=prog_type) .values("town__name") .annotate(number=Count("file")) .order_by("-number", "town__name")[:10] ) # rescue rescue_type = FileType.objects.get(txt_idx="preventive") rescues = File.objects.filter(file_type=rescue_type) self.rescue = {} self.rescue["total_number"] = rescues.count() self.rescue["saisine"] = ( rescues.values("saisine_type__label") .annotate(number=Count("pk")) .order_by("saisine_type__label") ) self.rescue["administrative_act"] = ( AdministrativeAct.objects.filter(associated_file__isnull=False) .values("act_type__label") .annotate(number=Count("pk")) .order_by("act_type__pk") ) by_year = rescues.extra({"date": "date_trunc('year', creation_date)"}) self.rescue["by_year"] = ( by_year.values("date").annotate(number=Count("pk")).order_by("-date") ) by_month = rescues.filter(creation_date__gt=limit).extra( {"date": "date_trunc('month', creation_date)"} ) self.rescue["by_month"] = ( by_month.values("date").annotate(number=Count("pk")).order_by("-date") ) self.rescue["by_dpt"] = ( FileByDepartment.objects.filter( file__file_type=rescue_type, department__isnull=False ) .values("department__label") .annotate(number=Count("file")) .order_by("department__label") ) self.rescue["towns"] = ( FileTown.objects.filter(file__file_type=rescue_type) .values("town__name") .annotate(number=Count("file")) .order_by("-number", "town__name")[:10] ) self.rescue["with_associated_operation"] = rescues.filter( operations__isnull=False ).count() if self.rescue["total_number"]: self.rescue["with_associated_operation_percent"] = round( float(self.rescue["with_associated_operation"]) / self.rescue["total_number"] * 100, 2, ) by_year_operationnal = rescues.filter(operations__isnull=False).extra( {"date": "date_trunc('year', " '"archaeological_files_file".creation_date)'} ) by_year_operationnal = ( by_year_operationnal.values("date") .annotate(number=Count("pk")) .order_by("-date") ) percents, idx = [], 0 for dct in self.rescue["by_year"]: if idx >= len(by_year_operationnal): break if by_year_operationnal[idx]["date"] != dct["date"] or not dct["number"]: continue val = round( float(by_year_operationnal[idx]["number"]) / dct["number"] * 100, 2 ) percents.append({"date": dct["date"], "number": val}) self.rescue["operational_by_year"] = percents self.rescue["surface_by_town"] = ( FileTown.objects.filter(file__file_type=rescue_type) .values("town__name") .annotate(number=Sum("file__total_surface")) .order_by("-number", "town__name")[:10] ) self.rescue["surface_by_dpt"] = ( FileByDepartment.objects.filter( file__file_type=rescue_type, department__isnull=False ) .values("department__label") .annotate(number=Sum("file__total_surface")) .order_by("department__label") ) class ManDays(models.Model): man_by_day_planned = models.FloatField( _("Man by day - planned"), null=True, blank=True ) days_planned = models.FloatField(_("Days - planned"), null=True, blank=True) man_by_day_worked = models.FloatField( _("Man by day - worked"), null=True, blank=True ) days_worked = models.FloatField(_("Days - worked"), null=True, blank=True) class Meta: abstract = True @property def quantity_planned(self): if not self.days_planned or not self.man_by_day_planned: return 0 return self.days_planned * self.man_by_day_planned @property def quantity_worked(self): if not self.days_worked or not self.man_by_day_worked: return 0 return self.days_worked * self.man_by_day_worked class PreventiveFileGroundJob(ManDays): file = models.ForeignKey(File, related_name="ground_jobs") job = models.ForeignKey(Job, verbose_name=_("Job")) class Meta: ordering = ("job",) @property def cost_planned(self): return (self.job.ground_daily_cost or 0) * self.quantity_planned @property def cost_worked(self): return (self.job.ground_daily_cost or 0) * self.quantity_worked class PreventiveFileJob(ManDays): file = models.ForeignKey(File, related_name="jobs") job = models.ForeignKey(Job, verbose_name=_("Job")) class Meta: ordering = ("job",) @property def cost_planned(self): return (self.job.daily_cost or 0) * self.quantity_planned @property def cost_worked(self): return (self.job.daily_cost or 0) * self.quantity_worked class PreventiveFileEquipmentServiceCost(models.Model): file = models.ForeignKey(File, related_name="equipment_costs") equipment_service_cost = models.ForeignKey(EquipmentServiceCost) quantity_by_day_planned = models.FloatField( _("Quantity by day - planned"), null=True, blank=True ) days_planned = models.FloatField(_("Days - planned"), null=True, blank=True) quantity_by_day_worked = models.FloatField( _("Quantity by day - worked"), null=True, blank=True ) days_worked = models.FloatField(_("Days - worked"), null=True, blank=True) class Meta: ordering = ("equipment_service_cost",) @property def quantity_planned(self): if self.equipment_service_cost.flat_rate: return self.quantity_by_day_planned or 0 if not self.days_planned or not self.quantity_by_day_planned: return 0 return self.days_planned * self.quantity_by_day_planned @property def quantity_worked(self): if self.equipment_service_cost.flat_rate: return self.quantity_by_day_worked or 0 if not self.quantity_by_day_worked or not self.days_worked: return 0 return self.days_worked * self.quantity_by_day_worked @property def cost_planned(self): return (self.equipment_service_cost.unitary_cost or 0) * self.quantity_planned @property def cost_worked(self): return (self.equipment_service_cost.unitary_cost or 0) * self.quantity_worked