#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2010-2012 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. """ Models description """ import datetime from django.conf import settings from django.core.exceptions import ObjectDoesNotExist, ValidationError from django.core.validators import validate_slug from django.utils.translation import ugettext_lazy as _, ugettext from django.utils.safestring import SafeUnicode, mark_safe from django.core.urlresolvers import reverse, NoReverseMatch from django.db.utils import DatabaseError from django.db.models import Q, Max, Count, Sum, Avg from django.db.models.signals import post_save from django.contrib.auth.models import User from django.contrib.gis.db import models from django.contrib import admin from simple_history.models import HistoricalRecords as BaseHistoricalRecords JOINT = u" - " def post_save_user(sender, **kwargs): user = kwargs['instance'] ishtaruser = None q = IshtarUser.objects.filter(username=user.username) if not q.count(): ishtaruser = IshtarUser.create_from_user(user) else: ishtaruser = q.all()[0] if ishtaruser.is_superuser \ and ishtaruser.person.person_type.txt_idx != 'administrator': ishtaruser.person.person_type = PersonType.objects.get( txt_idx='administrator') ishtaruser.person.save() post_save.connect(post_save_user, sender=User) # HistoricalRecords enhancement: don't save identical versions class HistoricalRecords(BaseHistoricalRecords): def create_historical_record(self, instance, type): manager = getattr(instance, self.manager_name) attrs = {} for field in instance._meta.fields: attrs[field.attname] = getattr(instance, field.attname) history = instance.history.all() if not history: manager.create(history_type=type, **attrs) return old_instance = history[0] for field in instance._meta.fields: if getattr(old_instance, field.attname) != attrs[field.attname]: manager.create(history_type=type, **attrs) return # valid ID validator for models def valid_id(cls): def func(value): try: cls.objects.get(pk=value) except ObjectDoesNotExist: raise ValidationError(_(u"Not a valid item.")) return func def valid_ids(cls): def func(value): if "," in value: value = value.split(",") for v in value: try: cls.objects.get(pk=v) except ObjectDoesNotExist: raise ValidationError( _(u"An item selected is not a valid item.")) return func # unique validator for models def is_unique(cls, field): def func(value): query = {field:value} try: assert cls.objects.filter(**query).count() == 0 except AssertionError: raise ValidationError(_(u"This item already exist.")) return func class OwnPerms: """ Manage special permissions for object's owner """ @classmethod def get_query_owns(cls, user): """ Query object to get own items """ return None # implement for each object def is_own(self, user): """ Check if the current object is owned by the user """ query = self.get_query_owns(user) if not query: return False query = query & Q(pk=self.pk) return cls.objects.filter(query).count() @classmethod def has_item_of(cls, user): """ Check if the user own some items """ query = cls.get_query_owns(user) if not query: return False return cls.objects.filter(query).count() @classmethod def get_owns(cls, user): """ Get Own items """ if isinstance(user, User): user = IshtarUser.objects.get(user_ptr=user) if user.is_anonymous(): return [] query = cls.get_query_owns(user) if not query: return [] return cls.objects.filter(query).order_by(*cls._meta.ordering).all() class GeneralType(models.Model): """ Abstract class for "types" """ label = models.CharField(_(u"Label"), max_length=100) txt_idx = models.CharField(_(u"Textual ID"), validators=[validate_slug], max_length=30, unique=True) comment = models.TextField(_(u"Comment"), blank=True, null=True) available = models.BooleanField(_(u"Available")) HELP_TEXT = u"" class Meta: abstract = True def __unicode__(self): return self.label @classmethod def get_help(cls, dct={}, exclude=[]): help_text = cls.HELP_TEXT c_rank = -1 help_items = u"\n" for item in cls.get_types(dct=dct, instances=True, exclude=exclude): if hasattr(item, '__iter__'): # TODO: manage multiple levels continue if not item.comment: continue if c_rank > item.rank: help_items += u"\n" elif c_rank < item.rank: help_items += u"
\n" c_rank = item.rank help_items += u"
%s
%s
" % (item.label, u"
".join(item.comment.split('\n'))) c_rank += 1 if c_rank: help_items += c_rank*u"
" if help_text or help_items != u'\n': return mark_safe(help_text + help_items) return u"" @classmethod def get_types(cls, dct={}, instances=False, exclude=[]): base_dct = dct.copy() if hasattr(cls, 'parent'): return cls._get_parent_types(base_dct, instances, exclude=exclude) return cls._get_types(base_dct, instances, exclude=exclude) @classmethod def _get_types(cls, dct={}, instances=False, exclude=[]): dct['available'] = True if not instances: yield ('', '--') items = cls.objects.filter(**dct) if exclude: items = items.exclude(txt_idx__in=exclude) for item in items.all(): if instances: item.rank = 0 yield item else: yield (item.pk, _(unicode(item))) PREFIX = "› " @classmethod def _get_childs(cls, item, dct, prefix=0, instances=False, exclude=[]): prefix += 1 dct['parent'] = item childs = cls.objects.filter(**dct) if exclude: childs = childs.exclude(txt_idx__in=exclude) if hasattr(cls, 'order'): childs = childs.order_by('order') for child in childs.all(): if instances: child.rank = prefix yield child else: yield (child.pk, SafeUnicode(prefix*cls.PREFIX + \ unicode(_(unicode(child))) )) for sub_child in cls._get_childs(child, dct, prefix, instances, exclude=exclude): yield sub_child @classmethod def _get_parent_types(cls, dct={}, instances=False, exclude=[]): dct['available'] = True if not instances: yield ('', '--') dct['parent'] = None items = cls.objects.filter(**dct) if exclude: items = items.exclude(txt_idx__in=exclude) if hasattr(cls, 'order'): items = items.order_by('order') for item in items.all(): if instances: item.rank = 0 yield item else: yield (item.pk, unicode(item)) for child in cls._get_childs(item, dct, instances, exclude=exclude): yield child class HistoryError(Exception): def __init__(self, value): self.value = value def __str__(self): return repr(self.value) class BaseHistorizedItem(models.Model): history_modifier = models.ForeignKey(User, related_name='+', verbose_name=_(u"Last editor")) class Meta: abstract = True def save(self, *args, **kwargs): assert hasattr(self, 'history_modifier') == True super(BaseHistorizedItem, self).save(*args, **kwargs) return True def get_previous(self, step=None, date=None, strict=True): """ Get a "step" previous state of the item """ assert step or date historized = self.history.all() item = None if step: assert len(historized) > step item = historized[step] else: for step, item in enumerate(historized): if item.history_date == date: break # ended with no match if item.history_date != date: return item._step = step if len(historized) != (step + 1): item._previous = historized[step + 1].history_date else: item._previous = None if step > 0: item._next = historized[step - 1].history_date else: item._next = None item.history_date = historized[step].history_date model = self.__class__ for k in model._meta.get_all_field_names(): field = model._meta.get_field_by_name(k)[0] if hasattr(field, 'rel') and field.rel: if not hasattr(item, k+'_id'): setattr(item, k, getattr(self, k)) continue val = getattr(item, k+'_id') if not val: setattr(item, k, None) continue try: val = field.rel.to.objects.get(pk=val) setattr(item, k, val) except ObjectDoesNotExist: if strict: raise HistoryError(u"The class %s has no pk %d" % ( unicode(field.rel.to), val)) setattr(item, k, None) item.pk = self.pk return item def rollback(self, date): """ Rollback to a previous state """ to_del, new_item = [], None for item in self.history.all(): to_del.append(item) if item.history_date == date: new_item = item break if not new_item: raise HistoryError(u"The date to rollback to doesn't exist.") try: for f in self._meta.fields: k = f.name if k != 'id' and hasattr(self, k): if not hasattr(new_item, k): k = k + "_id" setattr(self, k, getattr(new_item, k)) self.save() except: raise HistoryError(u"The rollback has failed.") # clean the obsolete history for historized_item in to_del: historized_item.delete() def values(self): values = {} for f in self._meta.fields: k = f.name if k != 'id': values[k] = getattr(self, k) return values def get_show_url(self): try: return reverse('show-'+self.__class__.__name__.lower(), args=[self.pk, '']) except NoReverseMatch: return class LightHistorizedItem(BaseHistorizedItem): history_date = models.DateTimeField(default=datetime.datetime.now) class Meta: abstract = True def save(self, *args, **kwargs): super(LightHistorizedItem, self).save(*args, **kwargs) return True class Wizard(models.Model): url_name = models.CharField(_(u"URL name"), max_length=128, unique=True) class Meta: verbose_name = _(u"Wizard") ordering = ['url_name'] def __unicode__(self): return unicode(self.url_name) class WizardStep(models.Model): order = models.IntegerField(_(u"Order")) wizard = models.ForeignKey(Wizard, verbose_name=_(u"Wizard")) url_name = models.CharField(_(u"URL name"), max_length=128) name = models.CharField(_(u"Label"), max_length=128) class Meta: verbose_name = _(u"Wizard step") ordering = ['wizard', 'order'] def __unicode__(self): return u"%s » %s" % (unicode(self.wizard), unicode(self.name)) class UserDashboard: def __init__(self): types = IshtarUser.objects.values('person__person_type', 'person__person_type__label') self.types = types.annotate(number=Count('pk'))\ .order_by('person__person_type') class FileDashboard: def __init__(self): main_dashboard = Dashboard(File) self.total_number = main_dashboard.total_number types = File.objects.values('file_type', 'file_type__label') self.types = types.annotate(number=Count('pk')).order_by('file_type') by_year = File.objects.extra( {'date':"date_trunc('year', creation_date)"}) self.by_year = by_year.values('date')\ .annotate(number=Count('pk')).order_by('-date') now = datetime.date.today() limit = datetime.date(now.year, now.month, 1) - datetime.timedelta(365) by_month = File.objects.filter(creation_date__gt=limit).extra( {'date':"date_trunc('month', creation_date)"}) self.by_month = by_month.values('date')\ .annotate(number=Count('pk')).order_by('-date') # research self.research = {} prog_type = FileType.objects.get(txt_idx='prog') researchs = File.objects.filter(file_type=prog_type) self.research['total_number'] = researchs.count() by_year = researchs.extra({'date':"date_trunc('year', creation_date)"}) self.research['by_year'] = by_year.values('date')\ .annotate(number=Count('pk'))\ .order_by('-date') by_month = researchs.filter(creation_date__gt=limit)\ .extra({'date':"date_trunc('month', creation_date)"}) self.research['by_month'] = by_month.values('date')\ .annotate(number=Count('pk'))\ .order_by('-date') self.research['by_dpt'] = FileByDepartment.objects\ .filter(file__file_type=prog_type, department__isnull=False)\ .values('department__label')\ .annotate(number=Count('file'))\ .order_by('department__label') FileTown = File.towns.through self.research['towns'] = FileTown.objects\ .filter(file__file_type=prog_type)\ .values('town__name')\ .annotate(number=Count('file'))\ .order_by('-number','town__name')[:10] # rescue rescue_type = FileType.objects.get(txt_idx='preventive') rescues = File.objects.filter(file_type=rescue_type) self.rescue = {} self.rescue['total_number'] = rescues.count() self.rescue['saisine'] = rescues.values('saisine_type__label')\ .annotate(number=Count('pk'))\ .order_by('saisine_type__label') self.rescue['administrative_act'] = AdministrativeAct.objects\ .filter(associated_file__isnull=False)\ .values('act_type__label')\ .annotate(number=Count('pk'))\ .order_by('act_type__pk') by_year = rescues.extra({'date':"date_trunc('year', creation_date)"}) self.rescue['by_year'] = by_year.values('date')\ .annotate(number=Count('pk'))\ .order_by('-date') by_month = rescues.filter(creation_date__gt=limit)\ .extra({'date':"date_trunc('month', creation_date)"}) self.rescue['by_month'] = by_month.values('date')\ .annotate(number=Count('pk'))\ .order_by('-date') self.rescue['by_dpt'] = FileByDepartment.objects\ .filter(file__file_type=rescue_type, department__isnull=False)\ .values('department__label')\ .annotate(number=Count('file'))\ .order_by('department__label') self.rescue['towns'] = FileTown.objects\ .filter(file__file_type=rescue_type)\ .values('town__name')\ .annotate(number=Count('file'))\ .order_by('-number','town__name')[:10] self.rescue['with_associated_operation'] = rescues\ .filter(operations__isnull=False).count() self.rescue['with_associated_operation_percent'] = round( float(self.rescue['with_associated_operation'])\ /self.rescue['total_number']*100, 2) by_year_operationnal = rescues.filter(operations__isnull=False)\ .extra({'date':"date_trunc('year', creation_date)"}) by_year_operationnal = by_year_operationnal.values('date')\ .annotate(number=Count('pk'))\ .order_by('-date') percents, idx = [], 0 for dct in self.rescue['by_year']: if idx > len(by_year_operationnal): break if by_year_operationnal[idx]['date'] != dct['date'] or\ not dct['number']: continue val = round(float(by_year_operationnal[idx]['number'])/\ dct['number']*100, 2) percents.append({'date':dct['date'], 'number':val}) self.rescue['operational_by_year'] = percents self.rescue['surface_by_town'] = FileTown.objects\ .filter(file__file_type=rescue_type)\ .values('town__name')\ .annotate(number=Sum('file__total_surface'))\ .order_by('-number','town__name')[:10] self.rescue['surface_by_dpt'] = FileByDepartment.objects\ .filter(file__file_type=rescue_type, department__isnull=False)\ .values('department__label')\ .annotate(number=Sum('file__total_surface'))\ .order_by('department__label') class OperationDashboard: def __init__(self): main_dashboard = Dashboard(Operation) self.total_number = main_dashboard.total_number self.filters_keys = ['recorded', 'effective', 'active', 'field', 'documented', 'closed', 'documented_closed'] filters = { 'recorded':{}, 'effective':{'in_charge__isnull':False}, 'active':{'in_charge__isnull':False, 'end_date__isnull':True}, 'field':{'excavation_end_date__isnull':True}, 'documented':{'source__isnull':False}, 'documented_closed':{'source__isnull':False, 'end_date__isnull':False}, 'closed':{'end_date__isnull':False} } filters_label = { 'recorded':_(u"Recorded"), 'effective':_(u"Effective"), 'active':_(u"Active"), 'field':_(u"Field completed"), 'documented':_(u"Associated report"), 'closed':_(u"Closed"), 'documented_closed':_(u"Documented and closed"), } self.filters_label = [filters_label[k] for k in self.filters_keys] self.total = [] for fltr_key in self.filters_keys: fltr, lbl = filters[fltr_key], filters_label[fltr_key] nb = Operation.objects.filter(**fltr).count() self.total.append((lbl, nb)) self.surface_by_type = Operation.objects\ .values('operation_type__label')\ .annotate(number=Sum('surface'))\ .order_by('-number','operation_type__label') self.by_type = [] self.types = OperationType.objects.filter(available=True).all() for fltr_key in self.filters_keys: fltr, lbl = filters[fltr_key], filters_label[fltr_key] type_res = Operation.objects.filter(**fltr).\ values('operation_type', 'operation_type__label').\ annotate(number=Count('pk')).\ order_by('operation_type') types_dct = {} for typ in type_res.all(): types_dct[typ['operation_type']] = typ["number"] types = [] for typ in self.types: if typ.pk in types_dct: types.append(types_dct[typ.pk]) else: types.append(0) self.by_type.append((lbl, types)) self.by_year = [] self.years = [res['year'] for res in Operation.objects.values('year')\ .order_by('-year').distinct()] for fltr_key in self.filters_keys: fltr, lbl = filters[fltr_key], filters_label[fltr_key] year_res = Operation.objects.filter(**fltr).\ values('year').\ annotate(number=Count('pk')).\ order_by('year') years_dct = {} for yr in year_res.all(): years_dct[yr['year']] = yr["number"] years = [] for yr in self.years: if yr in years_dct: years.append(years_dct[yr]) else: years.append(0) self.by_year.append((lbl, years)) self.by_realisation_year = [] self.realisation_years = [res['date'] for res in \ Operation.objects.extra( {'date':"date_trunc('year', start_date)"}).values('date')\ .filter(start_date__isnull=False).order_by('-date').distinct()] for fltr_key in self.filters_keys: fltr, lbl = filters[fltr_key], filters_label[fltr_key] year_res = Operation.objects.filter(**fltr).extra( {'date':"date_trunc('year', start_date)"}).values('date').\ values('date').filter(start_date__isnull=False).\ annotate(number=Count('pk')).\ order_by('-date') years_dct = {} for yr in year_res.all(): years_dct[yr['date']] = yr["number"] years = [] for yr in self.realisation_years: if yr in years_dct: years.append(years_dct[yr]) else: years.append(0) self.by_realisation_year.append((lbl, years)) self.effective = [] for typ in self.types: year_res = Operation.objects.filter(**{'in_charge__isnull':False, 'operation_type':typ}).\ values('year').\ annotate(number=Count('pk')).\ order_by('-year').distinct() years_dct = {} for yr in year_res.all(): years_dct[yr['year']] = yr["number"] years = [] for yr in self.years: if yr in years_dct: years.append(years_dct[yr]) else: years.append(0) self.effective.append((typ, years)) # TODO: by date now = datetime.date.today() limit = datetime.date(now.year, now.month, 1) - datetime.timedelta(365) by_realisation_month = Operation.objects.filter(start_date__gt=limit, start_date__isnull=False).extra( {'date':"date_trunc('month', start_date)"}) self.last_months = [] date = datetime.datetime(now.year, now.month, 1) for mt_idx in xrange(12): self.last_months.append(date) if date.month > 1: date = datetime.datetime(date.year, date.month - 1, 1) else: date = datetime.datetime(date.year - 1, 12, 1) self.by_realisation_month = [] for fltr_key in self.filters_keys: fltr, lbl = filters[fltr_key], filters_label[fltr_key] month_res = by_realisation_month.filter(**fltr).\ annotate(number=Count('pk')).\ order_by('-date') month_dct = {} for mt in month_res.all(): month_dct[mt.date] = mt.number date = datetime.date(now.year, now.month, 1) months = [] for date in self.last_months: if date in month_dct: months.append(month_dct[date]) else: months.append(0) self.by_realisation_month.append((lbl, months)) # survey and excavations self.survey, self.excavation = {}, {} for dct_res, ope_types in ((self.survey, ('arch_diagnostic',)), (self.excavation, ('prev_excavation', 'prog_excavation'))): dct_res['total'] = [] operation_type = {'operation_type__txt_idx__in':ope_types} for fltr_key in self.filters_keys: fltr, lbl = filters[fltr_key], filters_label[fltr_key] fltr.update(operation_type) nb = Operation.objects.filter(**fltr).count() dct_res['total'].append((lbl, nb)) dct_res['by_year'] = [] for fltr_key in self.filters_keys: fltr, lbl = filters[fltr_key], filters_label[fltr_key] fltr.update(operation_type) year_res = Operation.objects.filter(**fltr).\ values('year').\ annotate(number=Count('pk')).\ order_by('year') years_dct = {} for yr in year_res.all(): years_dct[yr['year']] = yr["number"] years = [] for yr in self.years: if yr in years_dct: years.append(years_dct[yr]) else: years.append(0) dct_res['by_year'].append((lbl, years)) dct_res['by_realisation_year'] = [] for fltr_key in self.filters_keys: fltr, lbl = filters[fltr_key], filters_label[fltr_key] fltr.update(operation_type) year_res = Operation.objects.filter(**fltr).extra( {'date':"date_trunc('year', start_date)"}).values('date').\ filter(start_date__isnull=False).\ annotate(number=Count('pk')).\ order_by('-date') years_dct = {} for yr in year_res.all(): years_dct[yr['date']] = yr["number"] years = [] for yr in self.realisation_years: if yr in years_dct: years.append(years_dct[yr]) else: years.append(0) dct_res['by_realisation_year'].append((lbl, years)) current_year_ope = Operation.objects.filter(**operation_type)\ .filter(year=datetime.date.today().year) current_realisation_year_ope = Operation.objects\ .filter(**operation_type)\ .filter(start_date__year=datetime.date.today().year) res_keys = [('area_realised', current_realisation_year_ope)] if dct_res == self.survey: res_keys.append(('area', current_year_ope)) for res_key, base_ope in res_keys: dct_res[res_key] = [] for fltr_key in self.filters_keys: fltr, lbl = filters[fltr_key], filters_label[fltr_key] area_res = base_ope.filter(**fltr)\ .annotate(number=Sum('surface')).all() val = 0 if area_res: val = area_res[0].number dct_res[res_key].append(val) # TODO... res_keys = [('manday_realised', current_realisation_year_ope)] if dct_res == self.survey: res_keys.append(('manday', current_year_ope)) for res_key, base_ope in res_keys: dct_res[res_key] = [] for fltr_key in self.filters_keys: dct_res[res_key].append('-') # TODO... res_keys = [('mandayhect_realised', current_realisation_year_ope)] if dct_res == self.survey: res_keys.append(('mandayhect', current_year_ope)) for res_key, base_ope in res_keys: dct_res[res_key] = [] for fltr_key in self.filters_keys: dct_res[res_key].append('-') # TODO... dct_res['mandayhect_real_effective'] = '-' if dct_res == self.survey: dct_res['mandayhect_effective'] = '-' res_keys = [('org_realised', current_realisation_year_ope)] if dct_res == self.survey: res_keys.append(('org', current_year_ope)) for res_key, base_ope in res_keys: org_res = base_ope.filter(in_charge__attached_to__isnull=False)\ .values('in_charge__attached_to', 'in_charge__attached_to__name')\ .annotate(area=Sum('surface'))\ .order_by('in_charge__attached_to__name').all() # TODO: man-days, man-days/hectare dct_res[res_key] = org_res year_ope = Operation.objects.filter(**operation_type) res_keys = ['org_by_year'] if dct_res == self.survey: res_keys.append('org_by_year_realised') q = year_ope.values('in_charge__attached_to', 'in_charge__attached_to__name').\ filter(in_charge__attached_to__isnull=False).\ order_by('in_charge__attached_to__name').distinct() org_list = [(org['in_charge__attached_to'], org['in_charge__attached_to__name']) for org in q] org_list_dct = dict(org_list) for res_key in res_keys: dct_res[res_key] = [] years = self.years if res_key == 'org_by_year_realised': years = self.realisation_years for org_id, org_label in org_list: org_res = year_ope.filter(in_charge__attached_to__pk=org_id) key_date = '' if res_key == 'org_by_year': org_res = org_res.values('year') key_date = 'year' else: org_res = org_res.extra( {'date':"date_trunc('year', start_date)"}).values('date').\ filter(start_date__isnull=False) key_date = 'date' org_res = org_res.annotate(area=Sum('surface'), cost=Sum('cost')) years_dct = {} for yr in org_res.all(): area = yr['area'] if yr['area'] else 0 cost = yr['cost'] if yr['cost'] else 0 years_dct[yr[key_date]] = (area, cost) r_years = [] for yr in years: if yr in years_dct: r_years.append(years_dct[yr]) else: r_years.append((0, 0)) dct_res[res_key].append((org_label, r_years)) area_means, area_sums = [], [] cost_means, cost_sums = [], [] for idx, year in enumerate(years): vals = [r_years[idx] for lbl, r_years in dct_res[res_key]] sum_area = sum([a for a, c in vals]) sum_cost = sum([c for a, c in vals]) area_means.append(sum_area/len(vals)) area_sums.append(sum_area) cost_means.append(sum_cost/len(vals)) cost_sums.append(sum_cost) dct_res[res_key+'_area_mean'] = area_means dct_res[res_key+'_area_sum'] = area_sums dct_res[res_key+'_cost_mean'] = cost_means dct_res[res_key+'_cost_mean'] = cost_sums if dct_res == self.survey: self.survey['effective'] = [] for yr in self.years: year_res = Operation.objects.filter(in_charge__isnull=False, year=yr).\ annotate(number=Sum('surface'), mean=Avg('surface')) nb = year_res[0].number if year_res.count() else 0 nb = nb if nb else 0 mean = year_res[0].mean if year_res.count() else 0 mean = mean if mean else 0 self.survey['effective'].append((nb, mean)) # TODO:Man-Days/hectare by Year # CHECK: month of realisation or month? dct_res['by_month'] = [] for fltr_key in self.filters_keys: fltr, lbl = filters[fltr_key], filters_label[fltr_key] fltr.update(operation_type) month_res = by_realisation_month.filter(**fltr).\ annotate(number=Count('pk')).\ order_by('-date') month_dct = {} for mt in month_res.all(): month_dct[mt.date] = mt.number date = datetime.date(now.year, now.month, 1) months = [] for date in self.last_months: if date in month_dct: months.append(month_dct[date]) else: months.append(0) dct_res['by_month'].append((lbl, months)) operation_type = {'operation_type__txt_idx__in':ope_types} self.departments = [(fd['department__pk'], fd['department__label']) for fd in OperationByDepartment.objects\ .filter(department__isnull=False)\ .values('department__label', 'department__pk')\ .order_by('department__label').distinct()] dct_res['by_dpt'] = [] for dpt_id, dpt_label in self.departments: vals = OperationByDepartment.objects\ .filter(department__pk=dpt_id, operation__operation_type__txt_idx__in=ope_types)\ .values('department__pk', 'operation__year')\ .annotate(number=Count('operation'))\ .order_by('operation__year') dct_years = {} for v in vals: dct_years[v['operation__year']] = v['number'] years = [] for y in self.years: if y in dct_years: years.append(dct_years[y]) else: years.append(0) years.append(sum(years)) dct_res['by_dpt'].append((dpt_label, years)) dct_res['effective_by_dpt'] = [] for dpt_id, dpt_label in self.departments: vals = OperationByDepartment.objects\ .filter(department__pk=dpt_id, operation__in_charge__isnull=False, operation__operation_type__txt_idx__in=ope_types)\ .values('department__pk', 'operation__year')\ .annotate(number=Count('operation'), area=Sum('operation__surface'), fnap=Sum('operation__fnap_cost'), cost=Sum('operation__cost'))\ .order_by('operation__year') dct_years = {} for v in vals: values = [] for value in (v['number'], v['area'], v['cost'], v['fnap']): values.append(value if value else 0) dct_years[v['operation__year']] = values years = [] for y in self.years: if y in dct_years: years.append(dct_years[y]) else: years.append((0, 0, 0, 0)) nbs, areas, costs, fnaps = zip(*years) years.append((sum(nbs), sum(areas), sum(costs), sum(fnaps))) dct_res['effective_by_dpt'].append((dpt_label, years)) OperationTown = Operation.towns.through query = OperationTown.objects\ .filter(operation__in_charge__isnull=False, operation__operation_type__txt_idx__in=ope_types)\ .values('town__name', 'town__departement__number')\ .annotate(nb=Count('operation'))\ .order_by('-nb', 'town__name')[:10] dct_res['towns'] = [] for r in query: dct_res['towns'].append((u"%s (%s)" % (r['town__name'], r['town__departement__number']), r['nb'])) if dct_res == self.survey: query = OperationTown.objects\ .filter(operation__in_charge__isnull=False, operation__operation_type__txt_idx__in=ope_types, operation__surface__isnull=False)\ .values('town__name', 'town__departement__number')\ .annotate(nb=Sum('operation__surface'))\ .order_by('-nb', 'town__name')[:10] dct_res['towns_surface'] = [] for r in query: dct_res['towns_surface'].append((u"%s (%s)" % ( r['town__name'], r['town__departement__number']), r['nb'])) else: query = OperationTown.objects\ .filter(operation__in_charge__isnull=False, operation__operation_type__txt_idx__in=ope_types, operation__cost__isnull=False)\ .values('town__name', 'town__departement__number')\ .annotate(nb=Sum('operation__cost'))\ .order_by('-nb', 'town__name')[:10] dct_res['towns_cost'] = [] for r in query: dct_res['towns_cost'].append((u"%s (%s)" % (r['town__name'], r['town__departement__number']), r['nb'])) class Dashboard: def __init__(self, model): self.model = model self.total_number = model.get_total_number() history_model = self.model.history.model # last edited - created self.recents, self.lasts = [], [] for last_lst, modif_type in ((self.lasts, '+'), (self.recents, '~')): last_ids = history_model.objects.values('id')\ .annotate(hd=Max('history_date')) last_ids = last_ids.filter(history_type=modif_type) if self.model == Item: last_ids = last_ids.filter(downstream_treatment_id__isnull=True) if modif_type == '+': last_ids = last_ids.filter(upstream_treatment_id__isnull=True) last_ids = last_ids.order_by('-hd').distinct().all()[:5] for idx in last_ids: try: obj = self.model.objects.get(pk=idx['id']) except: # deleted object are always referenced in history continue obj.history_date = idx['hd'] last_lst.append(obj) # years self.years = model.get_years() self.years.sort() if not self.total_number or not self.years: return self.values = [('year', _(u"Year"), reversed(self.years))] # numbers self.numbers = [model.get_by_year(year).count() for year in self.years] self.values += [('number', _(u"Number"), reversed(self.numbers))] # calculate self.average = self.get_average() self.variance = self.get_variance() self.standard_deviation = self.get_standard_deviation() self.median = self.get_median() self.mode = self.get_mode() # by operation if not hasattr(model, 'get_by_operation'): return operations = model.get_operations() operation_numbers = [model.get_by_operation(op).count() for op in operations] # calculate self.operation_average = self.get_average(operation_numbers) self.operation_variance = self.get_variance(operation_numbers) self.operation_standard_deviation = self.get_standard_deviation( operation_numbers) self.operation_median = self.get_median(operation_numbers) operation_mode_pk = self.get_mode(dict(zip(operations, operation_numbers))) if operation_mode_pk: self.operation_mode = unicode(Operation.objects.get( pk=operation_mode_pk)) def get_average(self, vals=[]): if not vals: vals = self.numbers return sum(vals)/len(vals) def get_variance(self, vals=[]): if not vals: vals = self.numbers avrg = self.get_average(vals) return self.get_average([(x-avrg)**2 for x in vals]) def get_standard_deviation(self, vals=[]): if not vals: vals = self.numbers return round(self.get_variance(vals)**0.5, 3) def get_median(self, vals=[]): if not vals: vals = self.numbers len_vals = len(vals) vals.sort() if (len_vals % 2) == 1: return vals[len_vals/2] else: return (vals[len_vals/2-1] + vals[len_vals/2])/2.0 def get_mode(self, vals={}): if not vals: vals = dict(zip(self.years, self.numbers)) mx = max(vals.values()) for v in vals: if vals[v] == mx: return v class Department(models.Model): label = models.CharField(_(u"Label"), max_length=30) number = models.CharField(_(u"Number"), unique=True, max_length=3) class Meta: verbose_name = _(u"Department") verbose_name_plural = _(u"Departments") ordering = ['number'] def __unicode__(self): return unicode(self.number) + JOINT + self.label class Address(BaseHistorizedItem): address = models.TextField(_(u"Address"), null=True, blank=True) address_complement = models.TextField(_(u"Address complement"), null=True, blank=True) postal_code = models.CharField(_(u"Postal code"), max_length=10, null=True, blank=True) town = models.CharField(_(u"Town"), max_length=30, null=True, blank=True) country = models.CharField(_(u"Country"), max_length=30, null=True, blank=True) phone = models.CharField(_(u"Phone"), max_length=18, null=True, blank=True) mobile_phone = models.CharField(_(u"Mobile phone"), max_length=18, null=True, blank=True) history = HistoricalRecords() class Meta: abstract = True class OrganizationType(GeneralType): class Meta: verbose_name = _(u"Organization type") verbose_name_plural = _(u"Organization types") class Organization(Address, OwnPerms): name = models.CharField(_(u"Name"), max_length=100) organization_type = models.ForeignKey(OrganizationType, verbose_name=_(u"Type")) history = HistoricalRecords() class Meta: verbose_name = _(u"Organization") verbose_name_plural = _(u"Organizations") permissions = ( ("view_own_organization", ugettext(u"Can view own Organization")), ("add_own_organization", ugettext(u"Can add own Organization")), ("change_own_organization", ugettext(u"Can change own Organization")), ("delete_own_organization", ugettext(u"Can delete own Organization")), ) def __unicode__(self): return self.name class PersonType(GeneralType): rights = models.ManyToManyField(WizardStep, verbose_name=_(u"Rights")) class Meta: verbose_name = _(u"Person type") verbose_name_plural = _(u"Person types") class Person(Address, OwnPerms) : TYPE = (('Mr', _(u'Mr')), ('Ms', _(u'Miss')), ('Md', _(u'Mrs')), ('Dr', _(u'Doctor')), ) title = models.CharField(_(u"Title"), max_length=2, choices=TYPE) surname = models.CharField(_(u"Surname"), max_length=20) name = models.CharField(_(u"Name"), max_length=30) email = models.CharField(_(u"Email"), max_length=40, blank=True, null=True) person_type = models.ForeignKey(PersonType, verbose_name=_(u"Type")) attached_to = models.ForeignKey('Organization', verbose_name=_(u"Is attached to"), blank=True, null=True) class Meta: verbose_name = _(u"Person") verbose_name_plural = _(u"Persons") permissions = ( ("view_person", ugettext(u"Can view Person")), ("view_own_person", ugettext(u"Can view own Person")), ("add_own_person", ugettext(u"Can add own Person")), ("change_own_person", ugettext(u"Can change own Person")), ("delete_own_person", ugettext(u"Can delete own Person")), ) def __unicode__(self): lbl = u"%s %s" % (self.name, self.surname) lbl += JOINT if self.attached_to: lbl += unicode(self.attached_to) elif self.email: lbl += self.email return lbl def full_label(self): return u" ".join([unicode(getattr(self, attr)) for attr in ('title', 'surname', 'name', 'attached_to') if getattr(self, attr)]) class IshtarUser(User): person = models.ForeignKey(Person, verbose_name=_(u"Person"), unique=True) class Meta: verbose_name = _(u"Ishtar user") verbose_name_plural = _(u"Ishtar users") @classmethod def create_from_user(cls, user): default = user.username surname = user.first_name or default name = user.last_name or default email = user.email person_type = None if user.is_superuser: person_type = PersonType.objects.get(txt_idx='administrator') else: person_type = PersonType.objects.get(txt_idx='public_access') person = Person.objects.create(title='Mr', surname=surname, name=name, email=email, person_type=person_type, history_modifier=user) return IshtarUser.objects.create(user_ptr=user, person=person) class AuthorType(GeneralType): class Meta: verbose_name = _(u"Author type") verbose_name_plural = _(u"Author types") class Author(models.Model): person = models.ForeignKey(Person, verbose_name=_(u"Person")) author_type = models.ForeignKey(AuthorType, verbose_name=_(u"Author type")) class Meta: verbose_name = _(u"Author") verbose_name_plural = _(u"Authors") def __unicode__(self): return unicode(self.person) + JOINT + unicode(self.author_type) class SourceType(GeneralType): class Meta: verbose_name = _(u"Source type") verbose_name_plural = _(u"Source types") class Source(models.Model): title = models.CharField(_(u"Title"), max_length=200) source_type = models.ForeignKey(SourceType, verbose_name=_(u"Type")) authors = models.ManyToManyField(Author, verbose_name=_(u"Authors")) associated_url = models.URLField(verify_exists=False, blank=True, null=True, verbose_name=_(u"Numerical ressource (web address)")) receipt_date = models.DateField(blank=True, null=True, verbose_name=_(u"Receipt date")) creation_date = models.DateField(blank=True, null=True, verbose_name=_(u"Creation date")) TABLE_COLS = ['title', 'source_type', 'authors',] class Meta: abstract = True def __unicode__(self): return self.title if settings.COUNTRY == 'fr': class Arrondissement(models.Model): name = models.CharField(u"Nom", max_length=30) department = models.ForeignKey(Department, verbose_name=u"Département") def __unicode__(self): return JOINT.join((self.name, unicode(self.department))) class Canton(models.Model): name = models.CharField(u"Nom", max_length=30) arrondissement = models.ForeignKey(Arrondissement, verbose_name=u"Arrondissement") def __unicode__(self): return JOINT.join((self.name, unicode(self.arrondissement))) class Town(models.Model): name = models.CharField(_(u"Name"), max_length=100) surface = models.IntegerField(_(u"Surface (m²)"), blank=True, null=True) center = models.PointField(_(u"Localisation"), srid=settings.SRID, blank=True, null=True) if settings.COUNTRY == 'fr': numero_insee = models.CharField(u"Numéro INSEE", max_length=6, unique=True) departement = models.ForeignKey(Department, verbose_name=u"Département", null=True, blank=True) canton = models.ForeignKey(Canton, verbose_name=u"Canton", null=True, blank=True) objects = models.GeoManager() class Meta: verbose_name = _(u"Town") verbose_name_plural = _(u"Towns") if settings.COUNTRY == 'fr': ordering = ['numero_insee'] def __unicode__(self): if settings.COUNTRY == "fr": return u"%s (%s)" % (self.name, self.numero_insee) return self.name