diff options
Diffstat (limited to 'ishtar/ishtar_base/models.py')
| -rw-r--r-- | ishtar/ishtar_base/models.py | 2163 |
1 files changed, 0 insertions, 2163 deletions
diff --git a/ishtar/ishtar_base/models.py b/ishtar/ishtar_base/models.py deleted file mode 100644 index 41979129b..000000000 --- a/ishtar/ishtar_base/models.py +++ /dev/null @@ -1,2163 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# Copyright (C) 2010-2011 Étienne Loks <etienne.loks_AT_peacefrogsDOTnet> - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -# See the file COPYING for details. - -""" -Models description -""" -import datetime - -from django.core.exceptions import ObjectDoesNotExist, ValidationError -from django.core.validators import validate_slug -from django.utils.translation import ugettext_lazy as _, ugettext -from django.utils.safestring import SafeUnicode, mark_safe -from django.core.urlresolvers import reverse, NoReverseMatch -from django.db.utils import DatabaseError -from django.db.models import Q, Max, Count, Sum, Avg -from django.db.models.signals import m2m_changed, post_save - -from django.contrib.auth.models import User -from django.contrib.gis.db import models -from django.contrib import admin - -from simple_history.models import HistoricalRecords as BaseHistoricalRecords - -from ishtar import settings - -JOINT = u" - " - -# HistoricalRecords enhancement: don't save identical versions -class HistoricalRecords(BaseHistoricalRecords): - def create_historical_record(self, instance, type): - manager = getattr(instance, self.manager_name) - attrs = {} - for field in instance._meta.fields: - attrs[field.attname] = getattr(instance, field.attname) - history = instance.history.all() - if not history: - manager.create(history_type=type, **attrs) - return - old_instance = history[0] - for field in instance._meta.fields: - if getattr(old_instance, field.attname) != attrs[field.attname]: - manager.create(history_type=type, **attrs) - return - -# valid ID validator for models -def valid_id(cls): - def func(value): - try: - cls.objects.get(pk=value) - except ObjectDoesNotExist: - raise ValidationError(_(u"Not a valid item.")) - return func - -def valid_ids(cls): - def func(value): - if "," in value: - value = value.split(",") - for v in value: - try: - cls.objects.get(pk=v) - except ObjectDoesNotExist: - raise ValidationError( - _(u"An item selected is not a valid item.")) - return func - -# unique validator for models -def is_unique(cls, field): - def func(value): - query = {field:value} - try: - assert cls.objects.filter(**query).count() == 0 - except AssertionError: - raise ValidationError(_(u"This item already exist.")) - return func - -class OwnPerms: - """ - Manage special permissions for object's owner - """ - @classmethod - def get_query_owns(cls, user): - """ - Query object to get own items - """ - return None # implement for each object - - def is_own(self, user): - """ - Check if the current object is owned by the user - """ - query = self.get_query_owns(user) - if not query: - return False - query = query & Q(pk=self.pk) - return cls.objects.filter(query).count() - - @classmethod - def has_item_of(cls, user): - """ - Check if the user own some items - """ - query = cls.get_query_owns(user) - if not query: - return False - return cls.objects.filter(query).count() - - @classmethod - def get_owns(cls, user): - """ - Get Own items - """ - if isinstance(user, User): - user = IshtarUser.objects.get(user_ptr=user) - if user.is_anonymous(): - return [] - query = cls.get_query_owns(user) - if not query: - return [] - return cls.objects.filter(query).order_by(*cls._meta.ordering).all() - -class GeneralType(models.Model): - """ - Abstract class for "types" - """ - label = models.CharField(_(u"Label"), max_length=100) - txt_idx = models.CharField(_(u"Textual ID"), - validators=[validate_slug], max_length=30, unique=True) - comment = models.TextField(_(u"Comment"), blank=True, null=True) - available = models.BooleanField(_(u"Available")) - HELP_TEXT = u"" - - class Meta: - abstract = True - - def __unicode__(self): - return self.label - - @classmethod - def get_help(cls, dct={}, exclude=[]): - help_text = cls.HELP_TEXT - c_rank = -1 - help_items = u"\n" - for item in cls.get_types(dct=dct, instances=True, exclude=exclude): - if hasattr(item, '__iter__'): - # TODO: manage multiple levels - continue - if not item.comment: - continue - if c_rank > item.rank: - help_items += u"</dl>\n" - elif c_rank < item.rank: - help_items += u"<dl>\n" - c_rank = item.rank - help_items += u"<dt>%s</dt><dd>%s</dd>" % (item.label, - u"<br/>".join(item.comment.split('\n'))) - c_rank += 1 - if c_rank: - help_items += c_rank*u"</dl>" - if help_text or help_items != u'\n': - return mark_safe(help_text + help_items) - return u"" - - @classmethod - def get_types(cls, dct={}, instances=False, exclude=[]): - base_dct = dct.copy() - if hasattr(cls, 'parent'): - return cls._get_parent_types(base_dct, instances, exclude=exclude) - return cls._get_types(base_dct, instances, exclude=exclude) - - @classmethod - def _get_types(cls, dct={}, instances=False, exclude=[]): - dct['available'] = True - if not instances: - yield ('', '--') - items = cls.objects.filter(**dct) - if exclude: - items = items.exclude(txt_idx__in=exclude) - for item in items.all(): - if instances: - item.rank = 0 - yield item - else: - yield (item.pk, _(unicode(item))) - - PREFIX = "› " - - @classmethod - def _get_childs(cls, item, dct, prefix=0, instances=False, exclude=[]): - prefix += 1 - dct['parent'] = item - childs = cls.objects.filter(**dct) - if exclude: - childs = childs.exclude(txt_idx__in=exclude) - if hasattr(cls, 'order'): - childs = childs.order_by('order') - for child in childs.all(): - if instances: - child.rank = prefix - yield child - else: - yield (child.pk, SafeUnicode(prefix*cls.PREFIX + \ - unicode(_(unicode(child))) )) - for sub_child in cls._get_childs(child, dct, prefix, instances, - exclude=exclude): - yield sub_child - - @classmethod - def _get_parent_types(cls, dct={}, instances=False, exclude=[]): - dct['available'] = True - if not instances: - yield ('', '--') - dct['parent'] = None - items = cls.objects.filter(**dct) - if exclude: - items = items.exclude(txt_idx__in=exclude) - if hasattr(cls, 'order'): - items = items.order_by('order') - for item in items.all(): - if instances: - item.rank = 0 - yield item - else: - yield (item.pk, unicode(item)) - for child in cls._get_childs(item, dct, instances, exclude=exclude): - yield child - -class HistoryError(Exception): - def __init__(self, value): - self.value = value - def __str__(self): - return repr(self.value) - -class BaseHistorizedItem(models.Model): - history_modifier = models.ForeignKey(User, related_name='+', - verbose_name=_(u"Last editor")) - class Meta: - abstract = True - - def save(self, *args, **kwargs): - assert hasattr(self, 'history_modifier') == True - super(BaseHistorizedItem, self).save(*args, **kwargs) - return True - - def get_previous(self, step=None, date=None, strict=True): - """ - Get a "step" previous state of the item - """ - assert step or date - historized = self.history.all() - item = None - if step: - assert len(historized) > step - item = historized[step] - else: - for step, item in enumerate(historized): - if item.history_date == date: - break - # ended with no match - if item.history_date != date: - return - item._step = step - if len(historized) != (step + 1): - item._previous = historized[step + 1].history_date - else: - item._previous = None - if step > 0: - item._next = historized[step - 1].history_date - else: - item._next = None - item.history_date = historized[step].history_date - model = self.__class__ - for k in model._meta.get_all_field_names(): - field = model._meta.get_field_by_name(k)[0] - if hasattr(field, 'rel') and field.rel: - if not hasattr(item, k+'_id'): - setattr(item, k, getattr(self, k)) - continue - val = getattr(item, k+'_id') - if not val: - setattr(item, k, None) - continue - try: - val = field.rel.to.objects.get(pk=val) - setattr(item, k, val) - except ObjectDoesNotExist: - if strict: - raise HistoryError(u"The class %s has no pk %d" % ( - unicode(field.rel.to), val)) - setattr(item, k, None) - item.pk = self.pk - return item - - def rollback(self, date): - """ - Rollback to a previous state - """ - to_del, new_item = [], None - for item in self.history.all(): - to_del.append(item) - if item.history_date == date: - new_item = item - break - if not new_item: - raise HistoryError(u"The date to rollback to doesn't exist.") - try: - for f in self._meta.fields: - k = f.name - if k != 'id' and hasattr(self, k): - if not hasattr(new_item, k): - k = k + "_id" - setattr(self, k, getattr(new_item, k)) - self.save() - except: - raise HistoryError(u"The rollback has failed.") - # clean the obsolete history - for historized_item in to_del: - historized_item.delete() - - def values(self): - values = {} - for f in self._meta.fields: - k = f.name - if k != 'id': - values[k] = getattr(self, k) - return values - - def get_show_url(self): - try: - return reverse('show-'+self.__class__.__name__.lower(), - args=[self.pk, '']) - except NoReverseMatch: - return - -class LightHistorizedItem(BaseHistorizedItem): - history_date = models.DateTimeField(default=datetime.datetime.now) - class Meta: - abstract = True - - def save(self, *args, **kwargs): - super(LightHistorizedItem, self).save(*args, **kwargs) - return True - -class Wizard(models.Model): - url_name = models.CharField(_(u"URL name"), max_length=128, unique=True) - class Meta: - verbose_name = _(u"Wizard") - ordering = ['url_name'] - - def __unicode__(self): - return unicode(self.url_name) - -class WizardStep(models.Model): - order = models.IntegerField(_(u"Order")) - wizard = models.ForeignKey(Wizard, verbose_name=_(u"Wizard")) - url_name = models.CharField(_(u"URL name"), max_length=128) - name = models.CharField(_(u"Label"), max_length=128) - class Meta: - verbose_name = _(u"Wizard step") - ordering = ['wizard', 'order'] - - def __unicode__(self): - return u"%s » %s" % (unicode(self.wizard), unicode(self.name)) - -class UserDashboard: - def __init__(self): - types = IshtarUser.objects.values('person__person_type', - 'person__person_type__label') - self.types = types.annotate(number=Count('pk'))\ - .order_by('person__person_type') - -class FileDashboard: - def __init__(self): - main_dashboard = Dashboard(File) - - self.total_number = main_dashboard.total_number - - types = File.objects.values('file_type', 'file_type__label') - self.types = types.annotate(number=Count('pk')).order_by('file_type') - - by_year = File.objects.extra( - {'date':"date_trunc('year', creation_date)"}) - self.by_year = by_year.values('date')\ - .annotate(number=Count('pk')).order_by('-date') - - now = datetime.date.today() - limit = datetime.date(now.year, now.month, 1) - datetime.timedelta(365) - by_month = File.objects.filter(creation_date__gt=limit).extra( - {'date':"date_trunc('month', creation_date)"}) - self.by_month = by_month.values('date')\ - .annotate(number=Count('pk')).order_by('-date') - - # research - self.research = {} - prog_type = FileType.objects.get(txt_idx='prog') - researchs = File.objects.filter(file_type=prog_type) - self.research['total_number'] = researchs.count() - by_year = researchs.extra({'date':"date_trunc('year', creation_date)"}) - self.research['by_year'] = by_year.values('date')\ - .annotate(number=Count('pk'))\ - .order_by('-date') - by_month = researchs.filter(creation_date__gt=limit)\ - .extra({'date':"date_trunc('month', creation_date)"}) - self.research['by_month'] = by_month.values('date')\ - .annotate(number=Count('pk'))\ - .order_by('-date') - - self.research['by_dpt'] = FileByDepartment.objects\ - .filter(file__file_type=prog_type, - department__isnull=False)\ - .values('department__label')\ - .annotate(number=Count('file'))\ - .order_by('department__label') - FileTown = File.towns.through - self.research['towns'] = FileTown.objects\ - .filter(file__file_type=prog_type)\ - .values('town__name')\ - .annotate(number=Count('file'))\ - .order_by('-number','town__name')[:10] - - # rescue - rescue_type = FileType.objects.get(txt_idx='preventive') - rescues = File.objects.filter(file_type=rescue_type) - self.rescue = {} - self.rescue['total_number'] = rescues.count() - self.rescue['saisine'] = rescues.values('saisine_type__label')\ - .annotate(number=Count('pk'))\ - .order_by('saisine_type__label') - self.rescue['administrative_act'] = AdministrativeAct.objects\ - .filter(associated_file__isnull=False)\ - .values('act_type__label')\ - .annotate(number=Count('pk'))\ - .order_by('act_type__pk') - - by_year = rescues.extra({'date':"date_trunc('year', creation_date)"}) - self.rescue['by_year'] = by_year.values('date')\ - .annotate(number=Count('pk'))\ - .order_by('-date') - by_month = rescues.filter(creation_date__gt=limit)\ - .extra({'date':"date_trunc('month', creation_date)"}) - self.rescue['by_month'] = by_month.values('date')\ - .annotate(number=Count('pk'))\ - .order_by('-date') - - self.rescue['by_dpt'] = FileByDepartment.objects\ - .filter(file__file_type=rescue_type, - department__isnull=False)\ - .values('department__label')\ - .annotate(number=Count('file'))\ - .order_by('department__label') - self.rescue['towns'] = FileTown.objects\ - .filter(file__file_type=rescue_type)\ - .values('town__name')\ - .annotate(number=Count('file'))\ - .order_by('-number','town__name')[:10] - - self.rescue['with_associated_operation'] = rescues\ - .filter(operations__isnull=False).count() - - self.rescue['with_associated_operation_percent'] = round( - float(self.rescue['with_associated_operation'])\ - /self.rescue['total_number']*100, 2) - - by_year_operationnal = rescues.filter(operations__isnull=False)\ - .extra({'date':"date_trunc('year', creation_date)"}) - by_year_operationnal = by_year_operationnal.values('date')\ - .annotate(number=Count('pk'))\ - .order_by('-date') - percents, idx = [], 0 - for dct in self.rescue['by_year']: - if idx > len(by_year_operationnal): - break - if by_year_operationnal[idx]['date'] != dct['date'] or\ - not dct['number']: - continue - val = round(float(by_year_operationnal[idx]['number'])/\ - dct['number']*100, 2) - percents.append({'date':dct['date'], 'number':val}) - self.rescue['operational_by_year'] = percents - - self.rescue['surface_by_town'] = FileTown.objects\ - .filter(file__file_type=rescue_type)\ - .values('town__name')\ - .annotate(number=Sum('file__total_surface'))\ - .order_by('-number','town__name')[:10] - self.rescue['surface_by_dpt'] = FileByDepartment.objects\ - .filter(file__file_type=rescue_type, - department__isnull=False)\ - .values('department__label')\ - .annotate(number=Sum('file__total_surface'))\ - .order_by('department__label') - -class OperationDashboard: - def __init__(self): - main_dashboard = Dashboard(Operation) - - self.total_number = main_dashboard.total_number - - self.filters_keys = ['recorded', 'effective', 'active', 'field', - 'documented', 'closed', 'documented_closed'] - filters = { - 'recorded':{}, - 'effective':{'in_charge__isnull':False}, - 'active':{'in_charge__isnull':False, 'end_date__isnull':True}, - 'field':{'excavation_end_date__isnull':True}, - 'documented':{'source__isnull':False}, - 'documented_closed':{'source__isnull':False, - 'end_date__isnull':False}, - 'closed':{'end_date__isnull':False} - } - filters_label = { - 'recorded':_(u"Recorded"), - 'effective':_(u"Effective"), - 'active':_(u"Active"), - 'field':_(u"Field completed"), - 'documented':_(u"Associated report"), - 'closed':_(u"Closed"), - 'documented_closed':_(u"Documented and closed"), - } - self.filters_label = [filters_label[k] for k in self.filters_keys] - self.total = [] - for fltr_key in self.filters_keys: - fltr, lbl = filters[fltr_key], filters_label[fltr_key] - nb = Operation.objects.filter(**fltr).count() - self.total.append((lbl, nb)) - - self.surface_by_type = Operation.objects\ - .values('operation_type__label')\ - .annotate(number=Sum('surface'))\ - .order_by('-number','operation_type__label') - - self.by_type = [] - self.types = OperationType.objects.filter(available=True).all() - for fltr_key in self.filters_keys: - fltr, lbl = filters[fltr_key], filters_label[fltr_key] - type_res = Operation.objects.filter(**fltr).\ - values('operation_type', 'operation_type__label').\ - annotate(number=Count('pk')).\ - order_by('operation_type') - types_dct = {} - for typ in type_res.all(): - types_dct[typ['operation_type']] = typ["number"] - types = [] - for typ in self.types: - if typ.pk in types_dct: - types.append(types_dct[typ.pk]) - else: - types.append(0) - self.by_type.append((lbl, types)) - - self.by_year = [] - self.years = [res['year'] for res in Operation.objects.values('year')\ - .order_by('-year').distinct()] - for fltr_key in self.filters_keys: - fltr, lbl = filters[fltr_key], filters_label[fltr_key] - year_res = Operation.objects.filter(**fltr).\ - values('year').\ - annotate(number=Count('pk')).\ - order_by('year') - years_dct = {} - for yr in year_res.all(): - years_dct[yr['year']] = yr["number"] - years = [] - for yr in self.years: - if yr in years_dct: - years.append(years_dct[yr]) - else: - years.append(0) - self.by_year.append((lbl, years)) - - self.by_realisation_year = [] - self.realisation_years = [res['date'] for res in \ - Operation.objects.extra( - {'date':"date_trunc('year', start_date)"}).values('date')\ - .filter(start_date__isnull=False).order_by('-date').distinct()] - for fltr_key in self.filters_keys: - fltr, lbl = filters[fltr_key], filters_label[fltr_key] - year_res = Operation.objects.filter(**fltr).extra( - {'date':"date_trunc('year', start_date)"}).values('date').\ - values('date').filter(start_date__isnull=False).\ - annotate(number=Count('pk')).\ - order_by('-date') - years_dct = {} - for yr in year_res.all(): - years_dct[yr['date']] = yr["number"] - years = [] - for yr in self.realisation_years: - if yr in years_dct: - years.append(years_dct[yr]) - else: - years.append(0) - self.by_realisation_year.append((lbl, years)) - - self.effective = [] - for typ in self.types: - year_res = Operation.objects.filter(**{'in_charge__isnull':False, - 'operation_type':typ}).\ - values('year').\ - annotate(number=Count('pk')).\ - order_by('-year').distinct() - years_dct = {} - for yr in year_res.all(): - years_dct[yr['year']] = yr["number"] - years = [] - for yr in self.years: - if yr in years_dct: - years.append(years_dct[yr]) - else: - years.append(0) - self.effective.append((typ, years)) - - # TODO: by date - now = datetime.date.today() - limit = datetime.date(now.year, now.month, 1) - datetime.timedelta(365) - by_realisation_month = Operation.objects.filter(start_date__gt=limit, - start_date__isnull=False).extra( - {'date':"date_trunc('month', start_date)"}) - self.last_months = [] - date = datetime.datetime(now.year, now.month, 1) - for mt_idx in xrange(12): - self.last_months.append(date) - if date.month > 1: - date = datetime.datetime(date.year, date.month - 1, 1) - else: - date = datetime.datetime(date.year - 1, 12, 1) - self.by_realisation_month = [] - for fltr_key in self.filters_keys: - fltr, lbl = filters[fltr_key], filters_label[fltr_key] - month_res = by_realisation_month.filter(**fltr).\ - annotate(number=Count('pk')).\ - order_by('-date') - month_dct = {} - for mt in month_res.all(): - month_dct[mt.date] = mt.number - date = datetime.date(now.year, now.month, 1) - months = [] - for date in self.last_months: - if date in month_dct: - months.append(month_dct[date]) - else: - months.append(0) - self.by_realisation_month.append((lbl, months)) - - # survey and excavations - self.survey, self.excavation = {}, {} - for dct_res, ope_types in ((self.survey, ('arch_diagnostic',)), - (self.excavation, ('prev_excavation', - 'prog_excavation'))): - dct_res['total'] = [] - operation_type = {'operation_type__txt_idx__in':ope_types} - for fltr_key in self.filters_keys: - fltr, lbl = filters[fltr_key], filters_label[fltr_key] - fltr.update(operation_type) - nb = Operation.objects.filter(**fltr).count() - dct_res['total'].append((lbl, nb)) - - dct_res['by_year'] = [] - for fltr_key in self.filters_keys: - fltr, lbl = filters[fltr_key], filters_label[fltr_key] - fltr.update(operation_type) - year_res = Operation.objects.filter(**fltr).\ - values('year').\ - annotate(number=Count('pk')).\ - order_by('year') - years_dct = {} - for yr in year_res.all(): - years_dct[yr['year']] = yr["number"] - years = [] - for yr in self.years: - if yr in years_dct: - years.append(years_dct[yr]) - else: - years.append(0) - dct_res['by_year'].append((lbl, years)) - - dct_res['by_realisation_year'] = [] - for fltr_key in self.filters_keys: - fltr, lbl = filters[fltr_key], filters_label[fltr_key] - fltr.update(operation_type) - year_res = Operation.objects.filter(**fltr).extra( - {'date':"date_trunc('year', start_date)"}).values('date').\ - filter(start_date__isnull=False).\ - annotate(number=Count('pk')).\ - order_by('-date') - years_dct = {} - for yr in year_res.all(): - years_dct[yr['date']] = yr["number"] - years = [] - for yr in self.realisation_years: - if yr in years_dct: - years.append(years_dct[yr]) - else: - years.append(0) - dct_res['by_realisation_year'].append((lbl, years)) - - current_year_ope = Operation.objects.filter(**operation_type)\ - .filter(year=datetime.date.today().year) - current_realisation_year_ope = Operation.objects\ - .filter(**operation_type)\ - .filter(start_date__year=datetime.date.today().year) - res_keys = [('area_realised', current_realisation_year_ope)] - if dct_res == self.survey: - res_keys.append(('area', - current_year_ope)) - for res_key, base_ope in res_keys: - dct_res[res_key] = [] - for fltr_key in self.filters_keys: - fltr, lbl = filters[fltr_key], filters_label[fltr_key] - area_res = base_ope.filter(**fltr)\ - .annotate(number=Sum('surface')).all() - val = 0 - if area_res: - val = area_res[0].number - dct_res[res_key].append(val) - # TODO... - res_keys = [('manday_realised', current_realisation_year_ope)] - if dct_res == self.survey: - res_keys.append(('manday', - current_year_ope)) - for res_key, base_ope in res_keys: - dct_res[res_key] = [] - for fltr_key in self.filters_keys: - dct_res[res_key].append('-') - # TODO... - res_keys = [('mandayhect_realised', current_realisation_year_ope)] - if dct_res == self.survey: - res_keys.append(('mandayhect', - current_year_ope)) - for res_key, base_ope in res_keys: - dct_res[res_key] = [] - for fltr_key in self.filters_keys: - dct_res[res_key].append('-') - # TODO... - dct_res['mandayhect_real_effective'] = '-' - if dct_res == self.survey: - dct_res['mandayhect_effective'] = '-' - - - res_keys = [('org_realised', current_realisation_year_ope)] - if dct_res == self.survey: - res_keys.append(('org', current_year_ope)) - for res_key, base_ope in res_keys: - org_res = base_ope.filter(in_charge__attached_to__isnull=False)\ - .values('in_charge__attached_to', - 'in_charge__attached_to__name')\ - .annotate(area=Sum('surface'))\ - .order_by('in_charge__attached_to__name').all() - # TODO: man-days, man-days/hectare - dct_res[res_key] = org_res - - - year_ope = Operation.objects.filter(**operation_type) - res_keys = ['org_by_year'] - if dct_res == self.survey: - res_keys.append('org_by_year_realised') - q = year_ope.values('in_charge__attached_to', - 'in_charge__attached_to__name').\ - filter(in_charge__attached_to__isnull=False).\ - order_by('in_charge__attached_to__name').distinct() - org_list = [(org['in_charge__attached_to'], - org['in_charge__attached_to__name']) for org in q] - org_list_dct = dict(org_list) - for res_key in res_keys: - dct_res[res_key] = [] - years = self.years - if res_key == 'org_by_year_realised': - years = self.realisation_years - for org_id, org_label in org_list: - org_res = year_ope.filter(in_charge__attached_to__pk=org_id) - key_date = '' - if res_key == 'org_by_year': - org_res = org_res.values('year') - key_date = 'year' - else: - org_res = org_res.extra( - {'date':"date_trunc('year', start_date)"}).values('date').\ - filter(start_date__isnull=False) - key_date = 'date' - org_res = org_res.annotate(area=Sum('surface'), - cost=Sum('cost')) - years_dct = {} - for yr in org_res.all(): - area = yr['area'] if yr['area'] else 0 - cost = yr['cost'] if yr['cost'] else 0 - years_dct[yr[key_date]] = (area, cost) - r_years = [] - for yr in years: - if yr in years_dct: - r_years.append(years_dct[yr]) - else: - r_years.append((0, 0)) - dct_res[res_key].append((org_label, r_years)) - area_means, area_sums = [], [] - cost_means, cost_sums = [], [] - for idx, year in enumerate(years): - vals = [r_years[idx] for lbl, r_years in dct_res[res_key]] - sum_area = sum([a for a, c in vals]) - sum_cost = sum([c for a, c in vals]) - area_means.append(sum_area/len(vals)) - area_sums.append(sum_area) - cost_means.append(sum_cost/len(vals)) - cost_sums.append(sum_cost) - dct_res[res_key+'_area_mean'] = area_means - dct_res[res_key+'_area_sum'] = area_sums - dct_res[res_key+'_cost_mean'] = cost_means - dct_res[res_key+'_cost_mean'] = cost_sums - - if dct_res == self.survey: - self.survey['effective'] = [] - for yr in self.years: - year_res = Operation.objects.filter(in_charge__isnull=False, - year=yr).\ - annotate(number=Sum('surface'), - mean=Avg('surface')) - nb = year_res[0].number if year_res.count() else 0 - nb = nb if nb else 0 - mean = year_res[0].mean if year_res.count() else 0 - mean = mean if mean else 0 - self.survey['effective'].append((nb, mean)) - - # TODO:Man-Days/hectare by Year - - # CHECK: month of realisation or month? - dct_res['by_month'] = [] - for fltr_key in self.filters_keys: - fltr, lbl = filters[fltr_key], filters_label[fltr_key] - fltr.update(operation_type) - month_res = by_realisation_month.filter(**fltr).\ - annotate(number=Count('pk')).\ - order_by('-date') - month_dct = {} - for mt in month_res.all(): - month_dct[mt.date] = mt.number - date = datetime.date(now.year, now.month, 1) - months = [] - for date in self.last_months: - if date in month_dct: - months.append(month_dct[date]) - else: - months.append(0) - dct_res['by_month'].append((lbl, months)) - - operation_type = {'operation_type__txt_idx__in':ope_types} - self.departments = [(fd['department__pk'], fd['department__label']) - for fd in OperationByDepartment.objects\ - .filter(department__isnull=False)\ - .values('department__label', 'department__pk')\ - .order_by('department__label').distinct()] - dct_res['by_dpt'] = [] - for dpt_id, dpt_label in self.departments: - vals = OperationByDepartment.objects\ - .filter(department__pk=dpt_id, - operation__operation_type__txt_idx__in=ope_types)\ - .values('department__pk', 'operation__year')\ - .annotate(number=Count('operation'))\ - .order_by('operation__year') - dct_years = {} - for v in vals: - dct_years[v['operation__year']] = v['number'] - years = [] - for y in self.years: - if y in dct_years: - years.append(dct_years[y]) - else: - years.append(0) - years.append(sum(years)) - dct_res['by_dpt'].append((dpt_label, years)) - dct_res['effective_by_dpt'] = [] - for dpt_id, dpt_label in self.departments: - vals = OperationByDepartment.objects\ - .filter(department__pk=dpt_id, - operation__in_charge__isnull=False, - operation__operation_type__txt_idx__in=ope_types)\ - .values('department__pk', 'operation__year')\ - .annotate(number=Count('operation'), - area=Sum('operation__surface'), - fnap=Sum('operation__fnap_cost'), - cost=Sum('operation__cost'))\ - .order_by('operation__year') - dct_years = {} - for v in vals: - values = [] - for value in (v['number'], v['area'], v['cost'], v['fnap']): - values.append(value if value else 0) - dct_years[v['operation__year']] = values - years = [] - for y in self.years: - if y in dct_years: - years.append(dct_years[y]) - else: - years.append((0, 0, 0, 0)) - nbs, areas, costs, fnaps = zip(*years) - years.append((sum(nbs), sum(areas), sum(costs), sum(fnaps))) - dct_res['effective_by_dpt'].append((dpt_label, years)) - - OperationTown = Operation.towns.through - query = OperationTown.objects\ - .filter(operation__in_charge__isnull=False, - operation__operation_type__txt_idx__in=ope_types)\ - .values('town__name', 'town__departement__number')\ - .annotate(nb=Count('operation'))\ - .order_by('-nb', 'town__name')[:10] - dct_res['towns'] = [] - for r in query: - dct_res['towns'].append((u"%s (%s)" % (r['town__name'], - r['town__departement__number']), - r['nb'])) - - if dct_res == self.survey: - query = OperationTown.objects\ - .filter(operation__in_charge__isnull=False, - operation__operation_type__txt_idx__in=ope_types, - operation__surface__isnull=False)\ - .values('town__name', 'town__departement__number')\ - .annotate(nb=Sum('operation__surface'))\ - .order_by('-nb', 'town__name')[:10] - dct_res['towns_surface'] = [] - for r in query: - dct_res['towns_surface'].append((u"%s (%s)" % ( - r['town__name'], r['town__departement__number']), - r['nb'])) - else: - query = OperationTown.objects\ - .filter(operation__in_charge__isnull=False, - operation__operation_type__txt_idx__in=ope_types, - operation__cost__isnull=False)\ - .values('town__name', 'town__departement__number')\ - .annotate(nb=Sum('operation__cost'))\ - .order_by('-nb', 'town__name')[:10] - dct_res['towns_cost'] = [] - for r in query: - dct_res['towns_cost'].append((u"%s (%s)" % (r['town__name'], - r['town__departement__number']), - r['nb'])) - -class Dashboard: - def __init__(self, model): - self.model = model - self.total_number = model.get_total_number() - history_model = self.model.history.model - # last edited - created - self.recents, self.lasts = [], [] - for last_lst, modif_type in ((self.lasts, '+'), (self.recents, '~')): - last_ids = history_model.objects.values('id')\ - .annotate(hd=Max('history_date')) - last_ids = last_ids.filter(history_type=modif_type) - if self.model == Item: - last_ids = last_ids.filter(downstream_treatment_id__isnull=True) - if modif_type == '+': - last_ids = last_ids.filter(upstream_treatment_id__isnull=True) - last_ids = last_ids.order_by('-hd').distinct().all()[:5] - for idx in last_ids: - try: - obj = self.model.objects.get(pk=idx['id']) - except: - # deleted object are always referenced in history - continue - obj.history_date = idx['hd'] - last_lst.append(obj) - # years - self.years = model.get_years() - self.years.sort() - if not self.total_number or not self.years: - return - self.values = [('year', _(u"Year"), reversed(self.years))] - # numbers - self.numbers = [model.get_by_year(year).count() for year in self.years] - self.values += [('number', _(u"Number"), reversed(self.numbers))] - # calculate - self.average = self.get_average() - self.variance = self.get_variance() - self.standard_deviation = self.get_standard_deviation() - self.median = self.get_median() - self.mode = self.get_mode() - # by operation - if not hasattr(model, 'get_by_operation'): - return - operations = model.get_operations() - operation_numbers = [model.get_by_operation(op).count() - for op in operations] - # calculate - self.operation_average = self.get_average(operation_numbers) - self.operation_variance = self.get_variance(operation_numbers) - self.operation_standard_deviation = self.get_standard_deviation( - operation_numbers) - self.operation_median = self.get_median(operation_numbers) - operation_mode_pk = self.get_mode(dict(zip(operations, - operation_numbers))) - if operation_mode_pk: - self.operation_mode = unicode(Operation.objects.get( - pk=operation_mode_pk)) - - def get_average(self, vals=[]): - if not vals: - vals = self.numbers - return sum(vals)/len(vals) - - def get_variance(self, vals=[]): - if not vals: - vals = self.numbers - avrg = self.get_average(vals) - return self.get_average([(x-avrg)**2 for x in vals]) - - def get_standard_deviation(self, vals=[]): - if not vals: - vals = self.numbers - return round(self.get_variance(vals)**0.5, 3) - - def get_median(self, vals=[]): - if not vals: - vals = self.numbers - len_vals = len(vals) - vals.sort() - if (len_vals % 2) == 1: - return vals[len_vals/2] - else: - return (vals[len_vals/2-1] + vals[len_vals/2])/2.0 - - def get_mode(self, vals={}): - if not vals: - vals = dict(zip(self.years, self.numbers)) - mx = max(vals.values()) - for v in vals: - if vals[v] == mx: - return v - -class Departement(models.Model): - label = models.CharField(_(u"Label"), max_length=30) - number = models.CharField(_(u"Number"), unique=True, max_length=3) - - class Meta: - verbose_name = _(u"Departement") - verbose_name_plural = _(u"Departements") - ordering = ['number'] - - def __unicode__(self): - return unicode(self.number) + JOINT + self.label - -class Address(BaseHistorizedItem): - address = models.TextField(_(u"Address"), null=True, blank=True) - address_complement = models.TextField(_(u"Address complement"), null=True, - blank=True) - postal_code = models.CharField(_(u"Postal code"), max_length=10, null=True, - blank=True) - town = models.CharField(_(u"Town"), max_length=30, null=True, blank=True) - country = models.CharField(_(u"Country"), max_length=30, null=True, - blank=True) - phone = models.CharField(_(u"Phone"), max_length=18, null=True, blank=True) - mobile_phone = models.CharField(_(u"Mobile phone"), max_length=18, - null=True, blank=True) - history = HistoricalRecords() - - class Meta: - abstract = True - -class OrganizationType(GeneralType): - class Meta: - verbose_name = _(u"Organization type") - verbose_name_plural = _(u"Organization types") - -class Organization(Address, OwnPerms): - name = models.CharField(_(u"Name"), max_length=100) - organization_type = models.ForeignKey(OrganizationType, - verbose_name=_(u"Type")) - history = HistoricalRecords() - class Meta: - verbose_name = _(u"Organization") - verbose_name_plural = _(u"Organizations") - permissions = ( - ("view_own_organization", ugettext(u"Can view own Organization")), - ("add_own_organization", ugettext(u"Can add own Organization")), - ("change_own_organization", ugettext(u"Can change own Organization")), - ("delete_own_organization", ugettext(u"Can delete own Organization")), - ) - - def __unicode__(self): - return self.name - -class PersonType(GeneralType): - rights = models.ManyToManyField(WizardStep, verbose_name=_(u"Rights")) - class Meta: - verbose_name = _(u"Person type") - verbose_name_plural = _(u"Person types") - -class Person(Address, OwnPerms) : - TYPE = (('Mr', _(u'Mr')), - ('Ms', _(u'Miss')), - ('Md', _(u'Mrs')), - ('Dr', _(u'Doctor')), - ) - title = models.CharField(_(u"Title"), max_length=2, choices=TYPE) - surname = models.CharField(_(u"Surname"), max_length=20) - name = models.CharField(_(u"Name"), max_length=30) - email = models.CharField(_(u"Email"), max_length=40, blank=True, null=True) - person_type = models.ForeignKey(PersonType, verbose_name=_(u"Type")) - attached_to = models.ForeignKey('Organization', - verbose_name=_(u"Is attached to"), blank=True, null=True) - - class Meta: - verbose_name = _(u"Person") - verbose_name_plural = _(u"Persons") - permissions = ( - ("view_person", ugettext(u"Can view Person")), - ("view_own_person", ugettext(u"Can view own Person")), - ("add_own_person", ugettext(u"Can add own Person")), - ("change_own_person", ugettext(u"Can change own Person")), - ("delete_own_person", ugettext(u"Can delete own Person")), - ) - - def __unicode__(self): - lbl = u"%s %s" % (self.name, self.surname) - lbl += JOINT - if self.attached_to: - lbl += unicode(self.attached_to) - elif self.email: - lbl += self.email - return lbl - - def full_label(self): - return u" ".join([unicode(getattr(self, attr)) - for attr in ('title', 'surname', 'name', 'attached_to') - if getattr(self, attr)]) - -class IshtarUser(User): - person = models.ForeignKey(Person, verbose_name=_(u"Person"), unique=True) - - class Meta: - verbose_name = _(u"Ishtar user") - verbose_name_plural = _(u"Ishtar users") - -class AuthorType(GeneralType): - class Meta: - verbose_name = _(u"Author type") - verbose_name_plural = _(u"Author types") - -class Author(models.Model): - person = models.ForeignKey(Person, verbose_name=_(u"Person")) - author_type = models.ForeignKey(AuthorType, verbose_name=_(u"Author type")) - - class Meta: - verbose_name = _(u"Author") - verbose_name_plural = _(u"Authors") - - def __unicode__(self): - return unicode(self.person) + JOINT + unicode(self.author_type) - -class SourceType(GeneralType): - class Meta: - verbose_name = _(u"Source type") - verbose_name_plural = _(u"Source types") - -class Source(models.Model): - title = models.CharField(_(u"Title"), max_length=200) - source_type = models.ForeignKey(SourceType, verbose_name=_(u"Type")) - authors = models.ManyToManyField(Author, verbose_name=_(u"Authors")) - associated_url = models.URLField(verify_exists=False, blank=True, null=True, - verbose_name=_(u"Numerical ressource (web address)")) - receipt_date = models.DateField(blank=True, null=True, - verbose_name=_(u"Receipt date")) - creation_date = models.DateField(blank=True, null=True, - verbose_name=_(u"Creation date")) - TABLE_COLS = ['title', 'source_type', 'authors',] - - class Meta: - abstract = True - - def __unicode__(self): - return self.title - -class FileType(GeneralType): - class Meta: - verbose_name = _(u"Archaeological file type") - verbose_name_plural = _(u"Archaeological file types") - - @classmethod - def is_preventive(cls, file_type_id, key=''): - key = key or 'preventive' - try: - preventive = FileType.objects.get(txt_idx=key).pk - return file_type_id == preventive - except ObjectDoesNotExist: - return False - -class PermitType(GeneralType): - class Meta: - verbose_name = _(u"Permit type") - verbose_name_plural = _(u"Permit types") - -if settings.COUNTRY == 'fr': - class SaisineType(GeneralType): - delay = models.IntegerField(_(u"Delay (in days)")) - class Meta: - verbose_name = u"Type Saisine" - verbose_name_plural = u"Types Saisine" - -class File(BaseHistorizedItem, OwnPerms): - TABLE_COLS = ['numeric_reference', 'year', 'internal_reference', - 'file_type', 'saisine_type', 'towns', ] - year = models.IntegerField(_(u"Year"), - default=lambda:datetime.datetime.now().year) - numeric_reference = models.IntegerField(_(u"Numeric reference")) - internal_reference = models.CharField(_(u"Internal reference"), - max_length=60, unique=True) - file_type = models.ForeignKey(FileType, verbose_name=_(u"File type")) - in_charge = models.ForeignKey(Person, related_name='+', - verbose_name=_(u"Person in charge")) - general_contractor = models.ForeignKey(Person, related_name='+', - verbose_name=_(u"General contractor"), blank=True, null=True) - town_planning_service = models.ForeignKey(Organization, related_name='+', - verbose_name=_(u"Town planning service"), blank=True, null=True) - permit_type = models.ForeignKey(PermitType, verbose_name=_(u"Permit type"), - blank=True, null=True) - permit_reference = models.CharField(_(u"Permit reference"), - max_length=60, blank=True, null=True) - end_date = models.DateField(_(u"Closing date"), null=True, blank=True) - towns = models.ManyToManyField("Town", verbose_name=_(u"Towns"), - related_name='file') - creation_date = models.DateField(_(u"Creation date"), - default=datetime.date.today) - reception_date = models.DateField(_(u'Reception date'), blank=True, - null=True) - related_file = models.ForeignKey("File", verbose_name=_(u"Related file"), - blank=True, null=True) - if settings.COUNTRY == 'fr': - saisine_type = models.ForeignKey(SaisineType, blank=True, null=True, - verbose_name= u"Type de saisine") - reference_number = models.IntegerField(_(u"Reference number"), - blank=True, null=True) - total_surface = models.IntegerField(_(u"Total surface (m²)"), - blank=True, null=True) - total_developed_surface = models.IntegerField( - _(u"Total developed surface (m²)"), blank=True, null=True) - address = models.TextField(_(u"Main address"), null=True, blank=True) - address_complement = models.TextField(_(u"Main address - complement"), - null=True, blank=True) - postal_code = models.CharField(_(u"Main address - postal code"), - max_length=10, null=True, blank=True) - comment = models.TextField(_(u"Comment"), null=True, blank=True) - history = HistoricalRecords() - - class Meta: - verbose_name = _(u"Archaeological file") - verbose_name_plural = _(u"Archaeological files") - permissions = ( - ("view_own_file", ugettext(u"Can view own Archaelogical file")), - ("add_own_file", ugettext(u"Can add own Archaelogical file")), - ("change_own_file", ugettext(u"Can change own Archaelogical file")), - ("delete_own_file", ugettext(u"Can delete own Archaelogical file")), - ) - ordering = ['-year', '-numeric_reference'] - - @classmethod - def get_years(cls): - return [res['year'] for res in list(cls.objects.values('year').annotate( - Count("id")).order_by())] - - @classmethod - def get_by_year(cls, year): - return cls.objects.filter(year=year) - - @classmethod - def get_total_number(cls): - return cls.objects.count() - - def __unicode__(self): - items = [unicode(_('Intercommunal'))] - if self.towns.count() == 1: - items[0] = unicode(self.towns.all()[0]) - items.append("-".join((unicode(self.year), - unicode(self.numeric_reference)))) - items += [unicode(getattr(self, k))[:36] - for k in ['internal_reference',] if getattr(self, k)] - return JOINT.join(items) - - @classmethod - def get_query_owns(cls, user): - return Q(history_modifier=user) & Q(end_date__isnull=True) - - def is_active(self): - return not bool(self.end_date) - - def closing(self): - if self.is_active(): - return - for item in self.history.all(): - if not item.end_date: - break - return {'date':item.history_date, - 'user':IshtarUser.objects.get(pk=item.history_modifier_id)} - - def total_surface_ha(self): - if self.total_surface: - return self.total_surface/10000.0 - - def total_developed_surface_ha(self): - if self.total_developed_surface: - return self.total_developed_surface/10000.0 - - def operation_acts(self): - acts = [] - for ope in self.operations.all(): - for act in ope.administrative_act.all(): - acts.append(act) - return acts - - def is_preventive(self): - return FileType.is_preventive(self.file_type.pk) - -class FileByDepartment(models.Model): - ''' - Database view: don't forget to create it - - create view file_department (id, department_id, file_id) as - select town."id", town."departement_id", file_towns."file_id" - from ishtar_base_town town - inner join ishtar_base_file_towns file_towns on - file_towns."town_id"=town."id" order by town."departement_id"; - CREATE RULE file_department_delete - AS ON DELETE TO file_department DO INSTEAD(); - ''' - file = models.ForeignKey(File, verbose_name=_(u"File")) - department = models.ForeignKey(Departement, verbose_name=_(u"Department"), - blank=True, null=True) - class Meta: - managed = False - db_table = 'file_department' - -class OperationType(GeneralType): - class Meta: - verbose_name = _(u"Operation type") - verbose_name_plural = _(u"Operation types") - - @classmethod - def is_preventive(cls, ope_type_id, key=''): - key = key or 'prev_excavation' - try: - preventive = OperationType.objects.get(txt_idx=key).pk - return ope_type_id == preventive - except ObjectDoesNotExist: - return False - -class RemainType(GeneralType): - class Meta: - verbose_name = _(u"Remain type") - verbose_name_plural = _(u"Remain types") - -class Operation(BaseHistorizedItem, OwnPerms): - TABLE_COLS = ['year_index', 'operation_type', 'remains', 'towns', - 'associated_file', 'start_date', 'excavation_end_date'] - start_date = models.DateField(_(u"Start date"), null=True, blank=True) - excavation_end_date = models.DateField(_(u"Excavation end date"), null=True, - blank=True) - end_date = models.DateField(_(u"Closing date"), null=True, blank=True) - in_charge = models.ForeignKey('Person', related_name='+', null=True, - blank=True, verbose_name=_(u"In charge")) - year = models.IntegerField(_(u"Year")) - operation_code = models.IntegerField(_(u"Operation code")) - associated_file = models.ForeignKey(File, related_name='operations', - verbose_name=_(u"File"), blank=True, null=True) - operation_type = models.ForeignKey(OperationType, related_name='+', - verbose_name=_(u"Operation type")) - surface = models.IntegerField(_(u"Surface (m²)"), blank=True, null=True) - remains = models.ManyToManyField("RemainType", verbose_name=_(u'Remains')) - towns = models.ManyToManyField("Town", verbose_name=_(u"Towns")) - cost = models.IntegerField(_(u"Cost (€)"), blank=True, null=True) - periods = models.ManyToManyField('Period', verbose_name=_(u"Periods")) - scheduled_man_days = models.IntegerField(_(u"Scheduled man-days"), - blank=True, null=True) - optional_man_days = models.IntegerField(_(u"Optional man-days"), - blank=True, null=True) - effective_man_days = models.IntegerField(_(u"Effective man-days"), - blank=True, null=True) - if settings.COUNTRY == 'fr': - code_patriarche = models.IntegerField(u"Code PATRIARCHE", null=True, - blank=True) - TABLE_COLS = ['code_patriarche'] + TABLE_COLS - code_dracar = models.CharField(u"Code DRACAR", max_length=10, null=True, - blank=True) - fnap_financing = models.FloatField(u"Financement FNAP (%)", - blank=True, null=True) - fnap_cost = models.IntegerField(u"Financement FNAP (€)", - blank=True, null=True) - zoning_prescription = models.NullBooleanField( - _(u"Prescription on zoning"), blank=True, null=True) - large_area_prescription = models.NullBooleanField( - _(u"Prescription on large area"), blank=True, null=True) - geoarchaeological_context_prescription = models.NullBooleanField( - _(u"Prescription on geoarchaeological context"), blank=True, null=True) - operator_reference = models.CharField(_(u"Operator reference"), - max_length=20, null=True, blank=True) - common_name = models.CharField(_(u"Generic name"), max_length=120, null=True, - blank=True) - comment = models.TextField(_(u"Comment"), null=True, blank=True) - history = HistoricalRecords() - - class Meta: - verbose_name = _(u"Operation") - verbose_name_plural = _(u"Operations") - permissions = ( - ("view_own_operation", ugettext(u"Can view own Operation")), - ("add_own_operation", ugettext(u"Can add own Operation")), - ("change_own_operation", ugettext(u"Can change own Operation")), - ("delete_own_operation", ugettext(u"Can delete own Operation")), - ) - - def __unicode__(self): - items = [unicode(_('Intercommunal'))] - if self.towns.count() == 1: - items[0] = unicode(self.towns.all()[0]) - items.append("-".join((unicode(self.year), - unicode(self.operation_code)))) - return JOINT.join(items) - - @classmethod - def get_available_operation_code(cls, year=None): - if not year: - year = datetime.date.today().year - max_val = cls.objects.filter(year=year).aggregate( - Max('operation_code'))["operation_code__max"] - return (max_val + 1) if max_val else 1 - - @classmethod - def get_years(cls): - return [res['year'] for res in list(cls.objects.values('year').annotate( - Count("id")).order_by())] - - @classmethod - def get_by_year(cls, year): - return cls.objects.filter(year=year) - - @classmethod - def get_total_number(cls): - return cls.objects.count() - - year_index_lbl = _(u"Operation code") - @property - def year_index(self): - lbl = unicode(self.operation_code) - lbl = u"%d-%s%s" % (self.year, (3-len(lbl))*"0", lbl) - return lbl - - def clean(self): - objs = self.__class__.objects.filter(year=self.year, - operation_code=self.operation_code) - if self.pk: - objs = objs.exclude(pk=self.pk) - if objs.count(): - raise ValidationError(_(u"This operation code already exists for " - u"this year")) - - def is_own(self, person): - return False - - @property - def surface_ha(self): - if self.surface: - return self.surface/10000.0 - - @property - def cost_by_m2(self): - if not self.surface or not self.cost: - return - return round(float(self.cost)/self.surface, 2) - - @property - def cost_by_m2(self): - if not self.surface or not self.cost: - return - return round(float(self.cost)/self.surface, 2) - - @classmethod - def get_query_owns(cls, user): - return Q(in_charge=user.person)|Q(history_modifier=user)\ - & Q(end_date__isnull=True) - - def is_active(self): - return not bool(self.end_date) - - def closing(self): - if self.is_active(): - return - for item in self.history.all(): - if not item.end_date: - break - return {'date':item.history_date, - 'user':IshtarUser.objects.get(pk=item.history_modifier_id)} - -def operation_post_save(sender, **kwargs): - if not kwargs['instance']: - return - operation = kwargs['instance'] - if operation.fnap_financing and operation.cost: - fnap_cost = int(float(operation.cost)/100*operation.fnap_financing) - if not operation.fnap_cost or operation.fnap_cost != fnap_cost: - operation.fnap_cost = fnap_cost - operation.save() - elif operation.fnap_cost and operation.cost: - fnap_percent = float(operation.fnap_cost)*100/operation.cost - operation.fnap_financing = fnap_percent - operation.save() -post_save.connect(operation_post_save, sender=Operation) - -class OperationByDepartment(models.Model): - ''' - Database view: don't forget to create it - - create view operation_department (id, department_id, operation_id) as - select town."id", town."departement_id", operation_towns."operation_id" - from ishtar_base_town town - inner join ishtar_base_operation_towns operation_towns on - operation_towns."town_id"=town."id" order by town."departement_id"; - CREATE RULE operation_department_delete - AS ON DELETE TO operation_department DO INSTEAD(); - ''' - operation = models.ForeignKey(Operation, verbose_name=_(u"Operation")) - department = models.ForeignKey(Departement, verbose_name=_(u"Department"), - blank=True, null=True) - class Meta: - managed = False - db_table = 'operation_department' - -class OperationSource(Source): - class Meta: - verbose_name = _(u"Operation documentation") - verbose_name_plural = _(u"Operation documentations") - operation = models.ForeignKey(Operation, verbose_name=_(u"Operation"), - related_name="source") - index = models.IntegerField(verbose_name=_(u"Index")) - TABLE_COLS = ['operation.year', 'operation.operation_code'] + \ - Source.TABLE_COLS - -class Parcel(LightHistorizedItem): - associated_file = models.ForeignKey(File, related_name='parcels', - blank=True, null=True, verbose_name=_(u"File")) - operation = models.ForeignKey(Operation, related_name='parcels', blank=True, - null=True, verbose_name=_(u"Operation")) - year = models.IntegerField(_(u"Year"), blank=True, null=True) - town = models.ForeignKey("Town", related_name='parcels', - verbose_name=_(u"Town")) - section = models.CharField(_(u"Section"), max_length=4) - parcel_number = models.CharField(_(u"Parcel number"), max_length=6) - - class Meta: - verbose_name = _(u"Parcel") - verbose_name_plural = _(u"Parcels") - - def short_label(self): - return JOINT.join([unicode(item) for item in [self.section, - self.parcel_number] if item]) - - def __unicode__(self): - return self.short_label() - - def long_label(self): - items = [unicode(self.operation or self.associated_file)] - items += [unicode(item) for item in [self.section, self.parcel_number] - if item] - return JOINT.join(items) - -class Period(GeneralType) : - order = models.IntegerField(_(u"Order")) - start_date = models.IntegerField(_(u"Start date")) - end_date = models.IntegerField(_(u"End date")) - parent = models.ForeignKey("Period", verbose_name=_(u"Parent period"), - blank=True, null=True) - - class Meta: - verbose_name = _(u"Type Period") - verbose_name_plural = _(u"Types Period") - - def __unicode__(self): - return self.label - -class DatingType(GeneralType): - class Meta: - verbose_name = _(u"Dating type") - verbose_name_plural = _(u"Dating types") - -class DatingQuality(GeneralType): - class Meta: - verbose_name = _(u"Dating quality") - verbose_name_plural = _(u"Dating qualities") - -class Dating(models.Model): - period = models.ForeignKey(Period, verbose_name=_(u"Period")) - start_date = models.IntegerField(_(u"Start date"), blank=True, null=True) - end_date = models.IntegerField(_(u"End date"), blank=True, null=True) - dating_type = models.ForeignKey(DatingType, verbose_name=_(u"Dating type"), - blank=True, null=True) - quality = models.ForeignKey(DatingQuality, verbose_name=_(u"Quality"), - blank=True, null=True) - - class Meta: - verbose_name = _(u"Dating") - verbose_name_plural = _(u"Datings") - - def __unicode__(self): - start_date = self.start_date and unicode(self.start_date) or u"" - end_date = self.end_date and unicode(self.end_date) or u"" - if not start_date and not end_date: - return unicode(self.period) - return u"%s (%s-%s)" % (self.period, start_date, end_date) - -class Unit(GeneralType): - order = models.IntegerField(_(u"Order")) - parent = models.ForeignKey("Unit", verbose_name=_(u"Parent unit"), - blank=True, null=True) - - class Meta: - verbose_name = _(u"Type Unit") - verbose_name_plural = _(u"Types Unit") - - def __unicode__(self): - return self.label - -class ActivityType(GeneralType): - order = models.IntegerField(_(u"Order")) - - class Meta: - verbose_name = _(u"Type Activity") - verbose_name_plural = _(u"Types Activity") - - def __unicode__(self): - return self.label - -class IdentificationType(GeneralType): - order = models.IntegerField(_(u"Order")) - class Meta: - verbose_name = _(u"Type Identification") - verbose_name_plural = _(u"Types Identification") - - def __unicode__(self): - return self.label - -class ContextRecord(BaseHistorizedItem, OwnPerms): - TABLE_COLS = ['parcel.town', 'operation.year', - 'operation.operation_code', - 'label', 'unit'] - if settings.COUNTRY == 'fr': - TABLE_COLS.insert(1, 'parcel.operation.code_patriarche') - parcel = models.ForeignKey(Parcel, verbose_name=_(u"Parcel"), - related_name='context_record') - operation = models.ForeignKey(Operation, verbose_name=_(u"Operation"), - related_name='context_record') - label = models.CharField(_(u"ID"), max_length=200) - description = models.TextField(_(u"Description"), blank=True, null=True) - length = models.IntegerField(_(u"Length (cm)"), blank=True, null=True) - width = models.IntegerField(_(u"Width (cm)"), blank=True, null=True) - thickness = models.IntegerField(_(u"Thickness (cm)"), blank=True, null=True) - depth = models.IntegerField(_(u"Depth (cm)"), blank=True, null=True) - location = models.CharField(_(u"Location"), blank=True, null=True, - max_length=200, - help_text=_(u"A short description of the location of the context record")) - datings = models.ManyToManyField(Dating) - unit = models.ForeignKey(Unit, verbose_name=_(u"Unit"), related_name='+', - blank=True, null=True) - has_furniture = models.NullBooleanField(u"Has furniture?", blank=True, - null=True) - filling = models.TextField(_(u"Filling"), blank=True, null=True) - interpretation = models.TextField(_(u"Interpretation"), blank=True, - null=True) - taq = models.IntegerField(_(u"TAQ"), blank=True, null=True, - help_text=_(u"\"Terminus Ante Quem\" the context record can't have been " - "created after this date")) - taq_estimated = models.IntegerField(_(u"Estimated TAQ"), blank=True, - null=True, help_text=_(u"Estimation of a \"Terminus Ante Quem\"")) - tpq = models.IntegerField(_(u"TPQ"), blank=True, null=True, - help_text=_(u"\"Terminus Post Quem\" the context record can't have been " - " created before this date")) - tpq_estimated = models.IntegerField(_(u"Estimated TPQ"), blank=True, - null=True, help_text=_(u"Estimation of a \"Terminus Post Quem\"")) - identification = models.ForeignKey(IdentificationType, blank=True, - null=True, verbose_name=_(u"Identification"),) - activity = models.ForeignKey(ActivityType,blank=True, null=True, - verbose_name=_(u"Activity"),) - history = HistoricalRecords() - - class Meta: - verbose_name = _(u"Context Record") - verbose_name_plural = _(u"Context Record") - permissions = ( - ("view_own_contextrecord", ugettext(u"Can view own Context Record")), - ("add_own_contextrecord", ugettext(u"Can add own Context Record")), - ("change_own_contextrecord", ugettext(u"Can change own Context Record")), - ("delete_own_contextrecord", ugettext(u"Can delete own Context Record")), - ) - - def __unicode__(self): - return self.short_label() - - def short_label(self): - return JOINT.join([unicode(item) for item in [self.parcel, - self.label] if item]) - - def full_label(self): - if not self.parcel.operation: - return unicode(self) - return self._real_label() or self._temp_label() - - def _real_label(self): - if not self.parcel.operation.code_patriarche: - return - return JOINT.join((self.parcel.operation.code_patriarche, - self.label)) - - def _temp_label(self): - if self.parcel.operation.code_patriarche: - return - return JOINT.join([unicode(lbl) for lbl in [self.parcel.operation.year, - self.parcel.operation.operation_code, - self.label] if lbl]) - - @classmethod - def get_years(cls): - years = set() - for res in list(cls.objects.values('operation__start_date')): - yr = res['operation__start_date'].year - years.add(yr) - return list(years) - - @classmethod - def get_by_year(cls, year): - return cls.objects.filter(operation__start_date__year=year) - - @classmethod - def get_operations(cls): - return [dct['operation__pk'] - for dct in cls.objects.values('operation__pk').distinct()] - - @classmethod - def get_by_operation(cls, operation_id): - return cls.objects.filter(operation__pk=operation_id) - - @classmethod - def get_total_number(cls): - return cls.objects.filter(operation__start_date__isnull=False).count() - -class ContextRecordSource(Source): - class Meta: - verbose_name = _(u"Context record documentation") - verbose_name_plural = _(u"Context record documentations") - context_record = models.ForeignKey(ContextRecord, - verbose_name=_(u"Context record"), related_name="source") - -class MaterialType(GeneralType): - recommendation = models.TextField(_(u"Recommendation")) - parent = models.ForeignKey("MaterialType", blank=True, null=True, - verbose_name=_(u"Parent material")) - - class Meta: - verbose_name = _(u"Material type") - verbose_name_plural = _(u"Material types") - -class BaseItem(BaseHistorizedItem, OwnPerms): - label = models.CharField(_(u"ID"), max_length=60) - description = models.TextField(_(u"Description")) - context_record = models.ForeignKey(ContextRecord, - related_name='base_items', verbose_name=_(u"Context Record")) - is_isolated = models.NullBooleanField(_(u"Is isolated?"), blank=True, - null=True) - index = models.IntegerField(u"Index", default=0) - material_index = models.IntegerField(u"Material index", default=0) - history = HistoricalRecords() - - class Meta: - verbose_name = _(u"Base item") - verbose_name_plural = _(u"Base items") - permissions = ( - ("view_own_baseitem", ugettext(u"Can view own Base item")), - ("add_own_baseitem", ugettext(u"Can add own Base item")), - ("change_own_baseitem", ugettext(u"Can change own Base item")), - ("delete_own_baseitem", ugettext(u"Can delete own Base item")), - ) - - def __unicode__(self): - return self.label - - def get_last_item(self): - #TODO: manage virtuals - property(last_item) ? - items = self.item.filter().order_by("-order").all() - return items and items[0] - - def full_label(self): - return self._real_label() or self._temp_label() - - def material_type_label(self): - item = self.get_last_item() - items = [item and unicode(item.material_type) or ''] - ope = self.context_record.operation - items += [ope.code_patriarche or \ - (unicode(ope.year) + "-" + unicode(ope.operation_code))] - items += [self.context_record.label, unicode(self.material_index)] - return JOINT.join(items) - - def _real_label(self): - if not self.context_record.parcel.operation.code_patriarche: - return - item = self.get_last_item() - lbl = item.label or self.label - return JOINT.join([unicode(it) for it in ( - self.context_record.parcel.operation.code_patriarche, - self.context_record.label, - lbl) if it]) - - def _temp_label(self): - if self.context_record.parcel.operation.code_patriarche: - return - item = self.get_last_item() - lbl = item.label or self.label - return JOINT.join([unicode(it) for it in ( - self.context_record.parcel.year, - self.index, - self.context_record.label, - lbl) if it]) - -class Item(BaseHistorizedItem, OwnPerms): - TABLE_COLS = ['label', 'material_type', 'dating.period', - 'base_items.context_record.parcel.town', - 'base_items.context_record.parcel.operation.year', - 'base_items.context_record.parcel.operation.operation_code', - 'base_items.is_isolated'] - if settings.COUNTRY == 'fr': - TABLE_COLS.insert(6, - 'base_items.context_record.parcel.operation.code_patriarche') - base_items = models.ManyToManyField(BaseItem, verbose_name=_(u"Base item"), - related_name='item') - order = models.IntegerField(_(u"Order")) - label = models.CharField(_(u"ID"), max_length=60) - description = models.TextField(_(u"Description"), blank=True, null=True) - material_type = models.ForeignKey(MaterialType, - verbose_name = _(u"Material type")) - volume = models.FloatField(_(u"Volume (l)"), blank=True, null=True) - weight = models.FloatField(_(u"Weight (g)"), blank=True, null=True) - item_number = models.IntegerField(_("Item number"), blank=True, null=True) - upstream_treatment = models.ForeignKey("Treatment", blank=True, null=True, - related_name='downstream_treatment', verbose_name=_("Upstream treatment")) - downstream_treatment = models.ForeignKey("Treatment", blank=True, null=True, - related_name='upstream_treatment', verbose_name=_("Downstream treatment")) - dating = models.ForeignKey(Dating, verbose_name=_(u"Dating")) - container = models.ForeignKey('Container', verbose_name=_(u"Container"), - blank=True, null=True, related_name='items') - history = HistoricalRecords() - - @classmethod - def get_years(cls): - years = set() - items = cls.objects.filter(downstream_treatment__isnull=True) - for item in items: - bi = item.base_items.all() - if not bi: - continue - bi = bi[0] - yr = bi.context_record.operation.start_date.year - years.add(yr) - return list(years) - - @classmethod - def get_by_year(cls, year): - return cls.objects.filter(downstream_treatment__isnull=True, - base_items__context_record__operation__start_date__year=year) - - @classmethod - def get_operations(cls): - operations = set() - items = cls.objects.filter(downstream_treatment__isnull=True) - for item in items: - bi = item.base_items.all() - if not bi: - continue - bi = bi[0] - pk = bi.context_record.operation.pk - operations.add(pk) - return list(operations) - - @classmethod - def get_by_operation(cls, operation_id): - return cls.objects.filter(downstream_treatment__isnull=True, - base_items__context_record__operation__pk=operation_id) - - @classmethod - def get_total_number(cls): - return cls.objects.filter(downstream_treatment__isnull=True).count() - - def duplicate(self, user): - dct = dict([(attr, getattr(self, attr)) for attr in ('order', 'label', - 'description', 'material_type', 'volume', 'weight', - 'item_number', 'dating')]) - dct['order'] += 1 - dct['history_modifier'] = user - new = self.__class__(**dct) - new.save() - for base_item in self.base_items.all(): - new.base_items.add(base_item) - return new - - class Meta: - verbose_name = _(u"Item") - verbose_name_plural = _(u"Items") - permissions = ( - ("view_own_item", ugettext(u"Can view own Item")), - ("add_own_item", ugettext(u"Can add own Item")), - ("change_own_item", ugettext(u"Can change own Item")), - ("delete_own_item", ugettext(u"Can delete own Item")), - ) - - def __unicode__(self): - return self.label - - def save(self, *args, **kwargs): - if not self.pk: - super(Item, self).save(*args, **kwargs) - for base_item in self.base_items.all(): - if not base_item.index: - idx = BaseItem.objects.filter(context_record=\ - base_item.context_record).aggregate(Max('index')) - base_item.index = idx and idx['index__max'] + 1 or 1 - if not base_item.material_index: - idx = BaseItem.objects.filter(context_record=\ - base_item.context_record, - item__material_type=self.material_type).aggregate( - Max('material_index')) - base_item.material_index = idx and \ - idx['material_index__max'] + 1 or 1 - base_item.save() - super(Item, self).save(*args, **kwargs) - -class ItemSource(Source): - class Meta: - verbose_name = _(u"Item documentation") - verbose_name_plural = _(u"Item documentations") - item = models.ForeignKey(Item, verbose_name=_(u"Item"), - related_name="source") - -class ParcelOwner(LightHistorizedItem): - owner = models.ForeignKey(Person, verbose_name=_(u"Owner")) - parcel = models.ForeignKey(Parcel, verbose_name=_(u"Parcel")) - start_date = models.DateField(_(u"Start date")) - end_date = models.DateField(_(u"End date")) - - class Meta: - verbose_name = _(u"Parcel owner") - verbose_name_plural = _(u"Parcel owners") - - def __unicode__(self): - return self.owner + JOINT + self.parcel - -class WarehouseType(GeneralType): - class Meta: - verbose_name = _(u"Warehouse type") - verbose_name_plural = _(u"Warehouse types") - -class Warehouse(Address, OwnPerms): - name = models.CharField(_(u"Name"), max_length=40) - warehouse_type = models.ForeignKey(WarehouseType, - verbose_name=_(u"Warehouse type")) - person_in_charge = models.ForeignKey(Person, - verbose_name=_(u"Person in charge"), null=True, blank=True) - comment = models.TextField(_(u"Comment"), null=True, blank=True) - - class Meta: - verbose_name = _(u"Warehouse") - verbose_name_plural = _(u"Warehouses") - permissions = ( - ("view_own_warehouse", ugettext(u"Can view own Warehouse")), - ("add_own_warehouse", ugettext(u"Can add own Warehouse")), - ("change_own_warehouse", ugettext(u"Can change own Warehouse")), - ("delete_own_warehouse", ugettext(u"Can delete own Warehouse")), - ) - - def __unicode__(self): - return u"%s (%s)" % (self.name, unicode(self.warehouse_type)) - -class ActType(GeneralType): - TYPE = (('F', _(u'Archaelogical file')), - ('O', _(u'Operation')), - ) - intented_to = models.CharField(_(u"Intended to"), max_length=1, - choices=TYPE) - class Meta: - verbose_name = _(u"Act type") - verbose_name_plural = _(u"Act types") - -class AdministrativeAct(BaseHistorizedItem, OwnPerms): - TABLE_COLS = ['act_type', 'associated_file', 'operation', - 'associated_file.towns', 'operation.towns'] - TABLE_COLS_FILE = ['act_type', 'associated_file', 'associated_file.towns',] - TABLE_COLS_OPE = ['act_type', 'operation', 'operation.towns'] - act_type = models.ForeignKey(ActType, verbose_name=_(u"Act type")) - in_charge = models.ForeignKey(Person, blank=True, null=True, - related_name='+', verbose_name=_(u"Person in charge of the operation")) - operator = models.ForeignKey(Organization, blank=True, null=True, - verbose_name=_(u"Archaeological preventive operator")) - scientific = models.ForeignKey(Person, blank=True, null=True, -related_name='+', verbose_name=_(u"Person in charge of the scientific part")) - signatory = models.ForeignKey(Person, blank=True, null=True, - related_name='+', verbose_name=_(u"Signatory")) - operation = models.ForeignKey(Operation, blank=True, null=True, - related_name='administrative_act', verbose_name=_(u"Operation")) - associated_file = models.ForeignKey(File, blank=True, null=True, - related_name='administrative_act', verbose_name=_(u"Archaelogical file")) - signature_date = models.DateField(_(u"Signature date"), blank=True, - null=True) - act_object = models.CharField(_(u"Object"), max_length=200) - if settings.COUNTRY == 'fr': - ref_sra = models.CharField(u"Référence SRA", max_length=15) - history = HistoricalRecords() - - class Meta: - verbose_name = _(u"Administrative act") - verbose_name_plural = _(u"Administrative acts") - permissions = ( -("view_own_administrativeact", ugettext(u"Can view own Administrative act")), -("add_own_administrativeact", ugettext(u"Can add own Administrative act")), -("change_own_administrativeact", ugettext(u"Can change own Administrative act")), -("delete_own_administrativeact", ugettext(u"Can delete own Administrative act")), - ) - - def __unicode__(self): - return JOINT.join([unicode(item) - for item in [self.operation, self.associated_file, self.act_object] - if item]) - -class ContainerType(GeneralType): - length = models.IntegerField(_(u"Length (mm)"), blank=True, null=True) - width = models.IntegerField(_(u"Width (mm)"), blank=True, null=True) - height = models.IntegerField(_(u"Height (mm)"), blank=True, null=True) - volume = models.IntegerField(_(u"Volume (l)"), blank=True, null=True) - reference = models.CharField(_(u"Reference"), max_length=30) - - class Meta: - verbose_name = _(u"Container type") - verbose_name_plural = _(u"Container types") - -class Container(LightHistorizedItem): - TABLE_COLS = ['reference', 'container_type', 'location',] - location = models.ForeignKey(Warehouse, verbose_name=_(u"Warehouse")) - container_type = models.ForeignKey(ContainerType, - verbose_name=_("Container type")) - reference = models.CharField(_(u"Reference"), max_length=40) - comment = models.TextField(_(u"Comment")) - - class Meta: - verbose_name = _(u"Container") - verbose_name_plural = _(u"Containers") - - def __unicode__(self): - lbl = u" - ".join((self.reference, unicode(self.container_type), - unicode(self.location))) - return lbl - -if settings.COUNTRY == 'fr': - class Arrondissement(models.Model): - name = models.CharField(u"Nom", max_length=30) - department = models.ForeignKey(Departement, verbose_name=u"Département") - - def __unicode__(self): - return JOINT.join((self.name, unicode(self.department))) - - class Canton(models.Model): - name = models.CharField(u"Nom", max_length=30) - arrondissement = models.ForeignKey(Arrondissement, - verbose_name=u"Arrondissement") - def __unicode__(self): - return JOINT.join((self.name, unicode(self.arrondissement))) - -class Town(models.Model): - name = models.CharField(_(u"Name"), max_length=100) - surface = models.IntegerField(_(u"Surface (m²)"), blank=True, null=True) - center = models.PointField(_(u"Localisation"), srid=settings.SRID, - blank=True, null=True) - if settings.COUNTRY == 'fr': - numero_insee = models.CharField(u"Numéro INSEE", max_length=6, - unique=True) - departement = models.ForeignKey(Departement, verbose_name=u"Département", - null=True, blank=True) - canton = models.ForeignKey(Canton, verbose_name=u"Canton", null=True, - blank=True) - objects = models.GeoManager() - - class Meta: - verbose_name = _(u"Town") - verbose_name_plural = _(u"Towns") - if settings.COUNTRY == 'fr': - ordering = ['numero_insee'] - - def __unicode__(self): - if settings.COUNTRY == "fr": - return u"%s (%s)" % (self.name, self.numero_insee) - return self.name - -class TreatmentType(GeneralType): - virtual = models.BooleanField(_(u"Virtual")) - class Meta: - verbose_name = _(u"Treatment type") - verbose_name_plural = _(u"Treatment types") - -class Treatment(BaseHistorizedItem, OwnPerms): - container = models.ForeignKey(Container, verbose_name=_(u"Container"), - blank=True, null=True) - description = models.TextField(_(u"Description"), blank=True, null=True) - treatment_type = models.ForeignKey(TreatmentType, - verbose_name=_(u"Treatment type")) - location = models.ForeignKey(Warehouse, verbose_name=_(u"Location"), - blank=True, null=True) - person = models.ForeignKey(Person, verbose_name=_(u"Person"), - blank=True, null=True) - start_date = models.DateField(_(u"Start date"), blank=True, null=True) - end_date = models.DateField(_(u"End date"), blank=True, null=True) - history = HistoricalRecords() - - class Meta: - verbose_name = _(u"Treatment") - verbose_name_plural = _(u"Treatments") - permissions = ( - ("view_own_treatment", ugettext(u"Can view own Treatment")), - ("add_own_treatment", ugettext(u"Can add own Treatment")), - ("change_own_treatment", ugettext(u"Can change own Treatment")), - ("delete_own_treatment", ugettext(u"Can delete own Treatment")), - ) - - def __unicode__(self): - lbl = unicode(self.treatment_type) - if self.person: - lbl += u" %s %s" % (_(u"by"), unicode(self.person)) - return lbl - -class TreatmentSource(Source): - class Meta: - verbose_name = _(u"Treatment documentation") - verbose_name_plural = _(u"Treament documentations") - treatment = models.ForeignKey(Treatment, verbose_name=_(u"Treatment"), - related_name="source") - -class Property(LightHistorizedItem): - item = models.ForeignKey(Item, verbose_name=_(u"Item")) - administrative_act = models.ForeignKey(AdministrativeAct, - verbose_name=_(u"Administrative act")) - person = models.ForeignKey(Person, verbose_name=_(u"Person")) - start_date = models.DateField(_(u"Start date")) - end_date = models.DateField(_(u"End date")) - - class Meta: - verbose_name = _(u"Property") - verbose_name_plural = _(u"Properties") - - def __unicode__(self): - return self.person + JOINT + self.item - |
