#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2012-2014 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import datetime from django.conf import settings from django.contrib.gis.db import models from django.core.cache import cache from django.db.models import Q, Count, Sum from django.db.models.signals import post_save, m2m_changed from django.utils.translation import ugettext_lazy as _, ugettext from ishtar_common.utils import cached_label_changed, get_cache from ishtar_common.models import GeneralType, BaseHistorizedItem, \ HistoricalRecords, OwnPerms, Person, Organization, Department, Town, \ Dashboard, IshtarUser, ValueGetter, ShortMenuItem class FileType(GeneralType): class Meta: verbose_name = _(u"Archaeological file type") verbose_name_plural = _(u"Archaeological file types") ordering = ('label',) @classmethod def is_preventive(cls, file_type_id, key=''): key = key or 'preventive' try: preventive = FileType.objects.get(txt_idx=key).pk return file_type_id == preventive except ObjectDoesNotExist: return False class PermitType(GeneralType): class Meta: verbose_name = _(u"Permit type") verbose_name_plural = _(u"Permit types") ordering = ('label',) if settings.COUNTRY == 'fr': class SaisineType(GeneralType, ValueGetter): delay = models.IntegerField(_(u"Delay (in days)")) class Meta: verbose_name = u"Type Saisine" verbose_name_plural = u"Types Saisine" ordering = ('label',) class File(BaseHistorizedItem, OwnPerms, ValueGetter, ShortMenuItem): TABLE_COLS = ['numeric_reference', 'year', 'internal_reference', 'file_type', 'saisine_type', 'towns', ] year = models.IntegerField(_(u"Year"), default=lambda:datetime.datetime.now().year) numeric_reference = models.IntegerField(_(u"Numeric reference"), blank=True, null=True) internal_reference = models.CharField(_(u"Internal reference"), blank=True, null=True, max_length=60) name = models.CharField(_(u"Name"), max_length=100, blank=True, null=True) file_type = models.ForeignKey(FileType, verbose_name=_(u"File type")) in_charge = models.ForeignKey(Person, related_name='file_responsability', verbose_name=_(u"Person in charge"), on_delete=models.SET_NULL, blank=True, null=True) general_contractor = models.ForeignKey(Person, related_name='general_contractor', verbose_name=_(u"General contractor"), blank=True, null=True, on_delete=models.SET_NULL,) responsible_town_planning_service = models.ForeignKey(Person, related_name='responsible_town_planning_service', blank=True, null=True, verbose_name=_(u"Responsible for town planning service"), on_delete=models.SET_NULL,) permit_type = models.ForeignKey(PermitType, verbose_name=_(u"Permit type"), blank=True, null=True) permit_reference = models.CharField(_(u"Permit reference"), max_length=60, blank=True, null=True) end_date = models.DateField(_(u"Closing date"), null=True, blank=True) towns = models.ManyToManyField(Town, verbose_name=_(u"Towns"), related_name='file') creation_date = models.DateField(_(u"Creation date"), default=datetime.date.today) reception_date = models.DateField(_(u'Reception date'), blank=True, null=True) related_file = models.ForeignKey("File", verbose_name=_(u"Related file"), blank=True, null=True) if settings.COUNTRY == 'fr': saisine_type = models.ForeignKey(SaisineType, blank=True, null=True, verbose_name= u"Type de saisine") reference_number = models.IntegerField(_(u"Ref. number"), blank=True, null=True) total_surface = models.IntegerField(_(u"Total surface (m²)"), blank=True, null=True) total_developed_surface = models.IntegerField( _(u"Total developed surface (m²)"), blank=True, null=True) address = models.TextField(_(u"Main address"), null=True, blank=True) address_complement = models.TextField(_(u"Main address - complement"), null=True, blank=True) postal_code = models.CharField(_(u"Main address - postal code"), max_length=10, null=True, blank=True) comment = models.TextField(_(u"Comment"), null=True, blank=True) # research archaeology --> departments = models.ManyToManyField(Department, verbose_name=_(u"Departments"), null=True, blank=True) requested_operation_type = models.ForeignKey( 'archaeological_operations.OperationType', related_name='+', null=True, blank=True, verbose_name=_(u"Requested operation type")) organization = models.ForeignKey(Organization, blank=True, null=True, verbose_name=_(u"Organization"), related_name='files', on_delete=models.SET_NULL) scientist = models.ForeignKey(Person, blank=True, null=True, related_name='scientist', on_delete=models.SET_NULL, verbose_name=_(u"Scientist in charge")) research_comment = models.TextField(_(u"Research archaeology comment"), null=True, blank=True) classified_area = models.NullBooleanField( _(u"Classified area"), blank=True, null=True) protected_area = models.NullBooleanField( _(u"Protected area"), blank=True, null=True) if settings.COUNTRY == 'fr': cira_advised = models.NullBooleanField( u"Passage en CIRA", blank=True, null=True) mh_register = models.NullBooleanField( u"Sur Monument Historique classé", blank=True, null=True) mh_listing = models.NullBooleanField( u"Sur Monument Historique inscrit", blank=True, null=True) # <-- research archaeology cached_label = models.CharField(_(u"Cached name"), max_length=500, null=True, blank=True) history = HistoricalRecords() class Meta: verbose_name = _(u"Archaeological file") verbose_name_plural = _(u"Archaeological files") permissions = ( ("view_file", ugettext(u"Can view all Archaelogical file")), ("view_own_file", ugettext(u"Can view own Archaelogical file")), ("add_own_file", ugettext(u"Can add own Archaelogical file")), ("change_own_file", ugettext(u"Can change own Archaelogical file")), ("delete_own_file", ugettext(u"Can delete own Archaelogical file")), ("close_file", ugettext(u"Can close File")), ) ordering = ('cached_label',) @property def short_class_name(self): return _(u"FILE") @property def delay_date(self): cache_key, val = get_cache(self.__class__, [self.pk, 'delay_date']) if val: return val return self.update_delay_date(cache_key) def update_delay_date(self, cache_key=None): if not cache_key: cache_key, val = get_cache(self.__class__, [self.pk, 'delay_date']) date = self.reception_date if not date: date = datetime.date(2500, 1, 1) elif settings.COUNTRY == 'fr' and self.saisine_type \ and self.saisine_type.delay: date += datetime.timedelta(days=self.saisine_type.delay) cache.set(cache_key, date, settings.CACHE_TIMEOUT) return date @property def has_adminact(self): cache_key, val = get_cache(self.__class__, [self.pk, 'has_adminact']) if val: return val return self.update_has_admin_act(cache_key) def update_has_admin_act(self, cache_key=None): if not cache_key: cache_key, val = get_cache(self.__class__, [self.pk, 'has_adminact']) has_adminact = self.administrative_act.exclude( act_type__txt_idx='a_receipt').count() \ or self.operations.count() cache.set(cache_key, has_adminact, settings.CACHE_TIMEOUT) return has_adminact def get_short_menu_class(self): cache_key, val = get_cache(self.__class__, [self.pk, 'short_class_name']) if val: return val return self.update_short_menu_class(cache_key) def update_short_menu_class(self, cache_key=None): if not cache_key: cache_key, val = get_cache(self.__class__, [self.pk, 'short_class_name']) cls = 'normal' if not self.has_adminact and self.reception_date: delta = datetime.date.today() - self.reception_date cls = 'red' if self.saisine_type and self.saisine_type.delay: if delta.days <= (self.saisine_type.delay*2/3): cls = 'green' elif delta.days <= self.saisine_type.delay: cls = 'orange' cache.set(cache_key, cls, settings.CACHE_TIMEOUT) return cls @classmethod def get_owns(cls, user): owns = super(File, cls).get_owns(user) return sorted(owns.all(), key=lambda x:x.cached_label) @classmethod def get_years(cls): return [res['year'] for res in list(cls.objects.values('year').annotate( Count("id")).order_by())] @classmethod def get_by_year(cls, year): return cls.objects.filter(year=year) @classmethod def get_total_number(cls): return cls.objects.count() def get_values(self, prefix=''): values = super(File, self).get_values(prefix=prefix) values['adminact_associated_file_towns_count'] = unicode( self.towns.count()) values['adminact_associated_file_towns'] = u", ".join( [unicode(town)for town in self.towns.all()]) values[prefix+'towns'] = '' values[prefix+'towns_count'] = unicode(self.towns.count()) if self.towns.count(): values[prefix+'towns'] = u", ".join([town.name for town in self.towns.all().order_by('name')]) return values def __unicode__(self): if self.cached_label: return self.cached_label self.save() return self.cached_label @property def short_label(self): return settings.JOINT.join(unicode(self).split(settings.JOINT)[1:]) @property def reference(self): return u"-".join((unicode(self.year), unicode(self.numeric_reference or '0'))) def _generate_cached_label(self): items = [self.get_town_label(), self.reference] items += [unicode(getattr(self, k)) for k in ['internal_reference', 'name'] if getattr(self, k)] return settings.JOINT.join(items) def grouped_parcels(self): from archaeological_operations.models import Parcel return Parcel.grouped_parcels(list(self.parcels.all())) def get_town_label(self): lbl = unicode(_('Intercommunal')) if self.towns.count() == 1: lbl = self.towns.all()[0].name return lbl def get_department(self): q = self.towns if not self.towns.count(): return '00' return self.towns.all()[0].numero_insee[:2] @classmethod def get_query_owns(cls, user): return (Q(history_creator=user) | Q(in_charge__ishtaruser=user.ishtaruser)) \ & Q(end_date__isnull=True) def is_active(self): return not bool(self.end_date) @property def town_list(self): return u", ".join([unicode(tw) for tw in self.towns.all()]) def closing(self): if self.is_active(): return for item in self.history.all(): if not item.end_date: break return {'date':item.history_date, 'user':IshtarUser.objects.get(pk=item.history_modifier_id)} def total_surface_ha(self): if self.total_surface: return self.total_surface/10000.0 def total_developed_surface_ha(self): if self.total_developed_surface: return self.total_developed_surface/10000.0 def operation_acts(self): acts = [] for ope in self.operations.all(): for act in ope.administrative_act.all(): acts.append(act) return acts def save(self, *args, **kwargs): returned = super(File, self).save(*args, **kwargs) self.update_delay_date() self.update_short_menu_class() return returned def is_preventive(self): return FileType.is_preventive(self.file_type.pk) m2m_changed.connect(cached_label_changed, sender=File.towns.through) post_save.connect(cached_label_changed, sender=File) class FileByDepartment(models.Model): ''' Database view for dashboard ''' file = models.ForeignKey(File, verbose_name=_(u"File")) department = models.ForeignKey(Department, verbose_name=_(u"Department"), blank=True, null=True) class Meta: managed = False db_table = 'file_department' class FileDashboard: def __init__(self): from archaeological_operations.models import AdministrativeAct main_dashboard = Dashboard(File) self.total_number = main_dashboard.total_number types = File.objects.values('file_type', 'file_type__label') self.types = types.annotate(number=Count('pk')).order_by('file_type') by_year = File.objects.extra( {'date':"date_trunc('year', creation_date)"}) self.by_year = by_year.values('date')\ .annotate(number=Count('pk')).order_by('-date') now = datetime.date.today() limit = datetime.date(now.year, now.month, 1) - datetime.timedelta(365) by_month = File.objects.filter(creation_date__gt=limit).extra( {'date':"date_trunc('month', creation_date)"}) self.by_month = by_month.values('date')\ .annotate(number=Count('pk')).order_by('-date') # research self.research = {} prog_type = FileType.objects.get(txt_idx='prog') researchs = File.objects.filter(file_type=prog_type) self.research['total_number'] = researchs.count() by_year = researchs.extra({'date':"date_trunc('year', creation_date)"}) self.research['by_year'] = by_year.values('date')\ .annotate(number=Count('pk'))\ .order_by('-date') by_month = researchs.filter(creation_date__gt=limit)\ .extra({'date':"date_trunc('month', creation_date)"}) self.research['by_month'] = by_month.values('date')\ .annotate(number=Count('pk'))\ .order_by('-date') self.research['by_dpt'] = FileByDepartment.objects\ .filter(file__file_type=prog_type, department__isnull=False)\ .values('department__label')\ .annotate(number=Count('file'))\ .order_by('department__label') FileTown = File.towns.through self.research['towns'] = FileTown.objects\ .filter(file__file_type=prog_type)\ .values('town__name')\ .annotate(number=Count('file'))\ .order_by('-number','town__name')[:10] # rescue rescue_type = FileType.objects.get(txt_idx='preventive') rescues = File.objects.filter(file_type=rescue_type) self.rescue = {} self.rescue['total_number'] = rescues.count() self.rescue['saisine'] = rescues.values('saisine_type__label')\ .annotate(number=Count('pk'))\ .order_by('saisine_type__label') self.rescue['administrative_act'] = AdministrativeAct.objects\ .filter(associated_file__isnull=False)\ .values('act_type__label')\ .annotate(number=Count('pk'))\ .order_by('act_type__pk') by_year = rescues.extra({'date':"date_trunc('year', creation_date)"}) self.rescue['by_year'] = by_year.values('date')\ .annotate(number=Count('pk'))\ .order_by('-date') by_month = rescues.filter(creation_date__gt=limit)\ .extra({'date':"date_trunc('month', creation_date)"}) self.rescue['by_month'] = by_month.values('date')\ .annotate(number=Count('pk'))\ .order_by('-date') self.rescue['by_dpt'] = FileByDepartment.objects\ .filter(file__file_type=rescue_type, department__isnull=False)\ .values('department__label')\ .annotate(number=Count('file'))\ .order_by('department__label') self.rescue['towns'] = FileTown.objects\ .filter(file__file_type=rescue_type)\ .values('town__name')\ .annotate(number=Count('file'))\ .order_by('-number','town__name')[:10] self.rescue['with_associated_operation'] = rescues\ .filter(operations__isnull=False).count() if self.rescue['total_number']: self.rescue['with_associated_operation_percent'] = round( float(self.rescue['with_associated_operation'])\ /self.rescue['total_number']*100, 2) by_year_operationnal = rescues.filter(operations__isnull=False)\ .extra({'date':"date_trunc('year', creation_date)"}) by_year_operationnal = by_year_operationnal.values('date')\ .annotate(number=Count('pk'))\ .order_by('-date') percents, idx = [], 0 for dct in self.rescue['by_year']: if idx >= len(by_year_operationnal): break if by_year_operationnal[idx]['date'] != dct['date'] or\ not dct['number']: continue val = round(float(by_year_operationnal[idx]['number'])/\ dct['number']*100, 2) percents.append({'date':dct['date'], 'number':val}) self.rescue['operational_by_year'] = percents self.rescue['surface_by_town'] = FileTown.objects\ .filter(file__file_type=rescue_type)\ .values('town__name')\ .annotate(number=Sum('file__total_surface'))\ .order_by('-number','town__name')[:10] self.rescue['surface_by_dpt'] = FileByDepartment.objects\ .filter(file__file_type=rescue_type, department__isnull=False)\ .values('department__label')\ .annotate(number=Sum('file__total_surface'))\ .order_by('department__label')