#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2010-2017 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. """ Models description """ import copy import datetime import inspect from importlib import import_module from jinja2 import TemplateSyntaxError import json import logging import os import pyqrcode import re import shutil import tempfile import time from cStringIO import StringIO from subprocess import Popen, PIPE from PIL import Image from django import forms from django.apps import apps from django.conf import settings from django.contrib.auth.models import User, Group from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.contenttypes.models import ContentType from django.contrib.gis.db import models from django.contrib.postgres.fields import JSONField from django.contrib.postgres.search import SearchVectorField, SearchVector from django.contrib.sites.models import Site from django.core.cache import cache from django.core.exceptions import ObjectDoesNotExist, ValidationError from django.core.files.uploadedfile import SimpleUploadedFile from django.core.files import File from django.core.serializers import serialize from django.core.urlresolvers import reverse, NoReverseMatch from django.core.validators import validate_slug from django.db import connection from django.db.models import Q, Max, Count from django.db.models.signals import post_save, post_delete, m2m_changed from django.db.utils import DatabaseError from django.template.defaultfilters import slugify from django.utils.functional import lazy from django.utils.safestring import SafeUnicode, mark_safe from django.utils.translation import ugettext_lazy as _, ugettext, \ pgettext_lazy, activate, deactivate from secretary import Renderer as SecretaryRenderer from simple_history.models import HistoricalRecords as BaseHistoricalRecords from unidecode import unidecode from ishtar_common.alternative_configs import ALTERNATE_CONFIGS, \ ALTERNATE_CONFIGS_CHOICES from ishtar_common.data_importer import pre_importer_action from ishtar_common.model_managers import SlugModelManager, ExternalIdManager, \ TypeManager from ishtar_common.model_merging import merge_model_objects from ishtar_common.models_imports import ImporterModel, ImporterType, \ ImporterDefault, ImporterDefaultValues, ImporterColumn, \ ImporterDuplicateField, Regexp, ImportTarget, TargetKey, FormaterType, \ Import, TargetKeyGroup, ValueFormater from ishtar_common.templatetags.link_to_window import simple_link_to_window from ishtar_common.utils import get_cache, disable_for_loaddata, create_slug, \ get_all_field_names, merge_tsvectors, cached_label_changed, \ generate_relation_graph, max_size_help __all__ = [ 'ImporterModel', 'ImporterType', 'ImporterDefault', 'ImporterDefaultValues', 'ImporterColumn', 'ImporterDuplicateField', 'Regexp', 'ImportTarget', 'TargetKey', 'FormaterType', 'Import', 'TargetKeyGroup', 'ValueFormater' ] logger = logging.getLogger(__name__) def post_save_user(sender, **kwargs): user = kwargs['instance'] if kwargs["created"]: try: IshtarUser.create_from_user(user) except DatabaseError: # manage when db is not synced pass IshtarUser.set_superuser(user) post_save.connect(post_save_user, sender=User) class ValueGetter(object): _prefix = "" GET_VALUES_EXTRA = [] COL_LABELS = {} GET_VALUE_EXCLUDE_FIELDS = [ 'search_vector', 'id', 'multi_polygon', 'point_2d', 'point', 'history_m2m'] def get_values(self, prefix=''): if not prefix: prefix = self._prefix values = {} for field_name in get_all_field_names(self): if not hasattr(self, field_name) or \ field_name in self.GET_VALUE_EXCLUDE_FIELDS or \ field_name.endswith('_id'): continue value = getattr(self, field_name) if hasattr(value, 'get_values'): values.update(value.get_values(prefix + field_name + '_')) else: values[prefix + field_name] = value for extra_field in self.GET_VALUES_EXTRA: values[prefix + extra_field] = getattr(self, extra_field) or '' for key in values.keys(): val = values[key] if val is None: val = '' elif isinstance(val, (tuple, list, dict)): pass else: val = unicode(val) if val.endswith('.None'): val = '' values[key] = val if prefix: # do not provide KEYS and VALUES for sub-items return values values['KEYS'] = u'\n'.join(values.keys()) value_list = [] for key in values.keys(): if key in ('KEYS', 'VALUES'): continue value_list.append((key, unicode(values[key]))) values['VALUES'] = u'\n'.join( [u"%s: %s" % (k, v) for k, v in sorted(value_list, key=lambda x: x[0])]) for global_var in GlobalVar.objects.all(): values[global_var.slug] = global_var.value or "" return values @classmethod def get_empty_values(cls, prefix=''): if not prefix: prefix = cls._prefix values = {} for field_name in get_all_field_names(cls): values[prefix + field_name] = '' return values class HistoryModel(models.Model): class Meta: abstract = True def m2m_listing(self, key, create=False): if not self.history_m2m or key not in self.history_m2m: return models = self.__class__.__module__ if not models.endswith('.models'): models += ".models" models = import_module(models) model = getattr( models, self.__class__.__name__[len('Historical'):]) related_model = getattr(model, key).rel.model return related_model.history_decompress(self.history_m2m[key], create=create) class HistoricalRecords(BaseHistoricalRecords): def create_historical_record(self, instance, type): try: history_modifier = getattr(instance, 'history_modifier', None) assert history_modifier except (User.DoesNotExist, AssertionError): # on batch removing of users, user could have disappeared return force = getattr(instance, "_force_history", False) manager = getattr(instance, self.manager_name) attrs = {} for field in instance._meta.fields: attrs[field.attname] = getattr(instance, field.attname) q_history = instance.history \ .filter(history_modifier_id=history_modifier.pk) \ .order_by('-history_date', '-history_id') # instance.skip_history_when_saving = True if not q_history.count(): if force: delattr(instance, '_force_history') manager.create(history_type=type, history_date=datetime.datetime.now(), **attrs) return old_instance = q_history.all()[0] # multiple saving by the same user in a very short time are generaly # caused by post_save signals it is not relevant to keep them min_history_date = datetime.datetime.now() \ - datetime.timedelta(seconds=5) q = q_history.filter(history_date__isnull=False, history_date__gt=min_history_date) \ .order_by('-history_date', '-history_id') if not force and q.count(): return if force: delattr(instance, '_force_history') if 'history_date' not in attrs or not attrs['history_date']: attrs['history_date'] = datetime.datetime.now() # record a new version only if data have been changed for field in instance._meta.fields: if getattr(old_instance, field.attname) != attrs[field.attname]: manager.create(history_type=type, **attrs) return def valid_id(cls): # valid ID validator for models def func(value): try: cls.objects.get(pk=value) except ObjectDoesNotExist: raise ValidationError(_(u"Not a valid item.")) return func def valid_ids(cls): def func(value): if "," in value: value = value.split(",") if type(value) not in (list, tuple): value = [value] for v in value: try: cls.objects.get(pk=v) except ObjectDoesNotExist: raise ValidationError( _(u"A selected item is not a valid item.")) return func def is_unique(cls, field): # unique validator for models def func(value): query = {field: value} try: assert cls.objects.filter(**query).count() == 0 except AssertionError: raise ValidationError(_(u"This item already exists.")) return func class OwnPerms(object): """ Manage special permissions for object's owner """ @classmethod def get_query_owns(cls, ishtaruser): """ Query object to get own items """ return None # implement for each object def can_view(self, request): if hasattr(self, "LONG_SLUG"): perm = "view_" + self.LONG_SLUG else: perm = "view_" + self.SLUG return self.can_do(request, perm) def can_do(self, request, action_name): """ Check permission availability for the current object. :param request: request object :param action_name: action name eg: "change_find" - "own" variation is checked :return: boolean """ if not getattr(request.user, 'ishtaruser', None): return False splited = action_name.split('_') action_own_name = splited[0] + '_own_' + '_'.join(splited[1:]) user = request.user return user.ishtaruser.has_right(action_name, request.session) or \ (user.ishtaruser.has_right(action_own_name, request.session) and self.is_own(user.ishtaruser)) def is_own(self, user, alt_query_own=None): """ Check if the current object is owned by the user """ if isinstance(user, IshtarUser): ishtaruser = user elif hasattr(user, 'ishtaruser'): ishtaruser = user.ishtaruser else: return False if not alt_query_own: query = self.get_query_owns(ishtaruser) else: query = getattr(self, alt_query_own)(ishtaruser) if not query: return False query &= Q(pk=self.pk) return self.__class__.objects.filter(query).count() @classmethod def has_item_of(cls, user): """ Check if the user own some items """ if isinstance(user, IshtarUser): ishtaruser = user elif hasattr(user, 'ishtaruser'): ishtaruser = user.ishtaruser else: return False query = cls.get_query_owns(ishtaruser) if not query: return False return cls.objects.filter(query).count() @classmethod def _return_get_owns(cls, owns, values, get_short_menu_class, label_key='cached_label'): if not owns: return [] sorted_values = [] if hasattr(cls, 'BASKET_MODEL'): owns_len = len(owns) for idx, item in enumerate(reversed(owns)): if get_short_menu_class: item = item[0] if type(item) == cls.BASKET_MODEL: basket = owns.pop(owns_len - idx - 1) sorted_values.append(basket) sorted_values = list(reversed(sorted_values)) if not values: if not get_short_menu_class: return sorted_values + list( sorted(owns, key=lambda x: getattr(x, label_key))) return sorted_values + list( sorted(owns, key=lambda x: getattr(x[0], label_key))) if not get_short_menu_class: return sorted_values + list( sorted(owns, key=lambda x: x[label_key])) return sorted_values + list( sorted(owns, key=lambda x: x[0][label_key])) @classmethod def get_owns(cls, user, replace_query=None, limit=None, values=None, get_short_menu_class=False, menu_filtr=None): """ Get Own items """ if not replace_query: replace_query = {} if hasattr(user, 'is_authenticated') and not user.is_authenticated(): returned = cls.objects.filter(pk__isnull=True) if values: returned = [] return returned if isinstance(user, User): try: ishtaruser = IshtarUser.objects.get(user_ptr=user) except IshtarUser.DoesNotExist: returned = cls.objects.filter(pk__isnull=True) if values: returned = [] return returned elif isinstance(user, IshtarUser): ishtaruser = user else: if values: return [] return cls.objects.filter(pk__isnull=True) items = [] if hasattr(cls, 'BASKET_MODEL'): items = list(cls.BASKET_MODEL.objects.filter(user=ishtaruser).all()) query = cls.get_query_owns(ishtaruser) if not query and not replace_query: returned = cls.objects.filter(pk__isnull=True) if values: returned = [] return returned if query: q = cls.objects.filter(query) else: # replace_query q = cls.objects.filter(replace_query) if values: q = q.values(*values) if limit: items += list(q.order_by('-pk')[:limit]) else: items += list(q.order_by(*cls._meta.ordering).all()) if get_short_menu_class: if values: if 'id' not in values: raise NotImplementedError( "Call of get_owns with get_short_menu_class option and" " no 'id' in values is not implemented") my_items = [] for i in items: if hasattr(cls, 'BASKET_MODEL') and \ type(i) == cls.BASKET_MODEL: dct = dict([(k, getattr(i, k)) for k in values]) my_items.append( (dct, cls.BASKET_MODEL.get_short_menu_class(i.pk))) else: my_items.append((i, cls.get_short_menu_class(i['id']))) items = my_items else: items = [(i, cls.get_short_menu_class(i.pk)) for i in items] return items @classmethod def _get_query_owns_dicts(cls, ishtaruser): """ List of query own dict to construct the query. Each dict are join with an AND operator, each dict key, values are joined with OR operator """ return [] @classmethod def _construct_query_own(cls, prefix, dct_list): q = None for subquery_dict in dct_list: subquery = None for k in subquery_dict: subsubquery = Q(**{prefix + k: subquery_dict[k]}) if subquery: subquery |= subsubquery else: subquery = subsubquery if not subquery: continue if q: q &= subquery else: q = subquery return q class CachedGen(object): @classmethod def refresh_cache(cls): raise NotImplementedError() @classmethod def _add_cache_key_to_refresh(cls, keys): cache_ckey, current_keys = get_cache(cls, ['_current_keys']) if type(current_keys) != list: current_keys = [] if keys not in current_keys: current_keys.append(keys) cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT) class Cached(CachedGen): slug_field = 'txt_idx' @classmethod def refresh_cache(cls): cache_ckey, current_keys = get_cache(cls, ['_current_keys']) if not current_keys: return for keys in current_keys: if len(keys) == 2 and keys[0] == '__slug': cls.get_cache(keys[1], force=True) elif keys[0] == '__get_types': default = None empty_first = True exclude = [] if len(keys) >= 2: default = keys.pop() if len(keys) > 1: empty_first = bool(keys.pop()) exclude = keys[1:] cls.get_types( exclude=exclude, empty_first=empty_first, default=default, force=True) elif keys[0] == '__get_help': cls.get_help(force=True) @classmethod def _add_cache_key_to_refresh(cls, keys): cache_ckey, current_keys = get_cache(cls, ['_current_keys']) if type(current_keys) != list: current_keys = [] if keys not in current_keys: current_keys.append(keys) cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT) @classmethod def get_cache(cls, slug, force=False): cache_key, value = get_cache(cls, ['__slug', slug]) if not force and value: return value try: k = {cls.slug_field: slug} obj = cls.objects.get(**k) cache.set(cache_key, obj, settings.CACHE_TIMEOUT) return obj except cls.DoesNotExist: cache.set(cache_key, None, settings.CACHE_TIMEOUT) return None @disable_for_loaddata def post_save_cache(sender, **kwargs): sender.refresh_cache() class GeneralType(Cached, models.Model): """ Abstract class for "types" """ label = models.TextField(_(u"Label")) txt_idx = models.TextField( _(u"Textual ID"), validators=[validate_slug], unique=True, help_text=_( u"The slug is the standardized version of the name. It contains " u"only lowercase letters, numbers and hyphens. Each slug must " u"be unique.")) comment = models.TextField(_(u"Comment"), blank=True, null=True) available = models.BooleanField(_(u"Available"), default=True) HELP_TEXT = u"" objects = TypeManager() class Meta: abstract = True def __unicode__(self): return self.label def natural_key(self): return (self.txt_idx,) def history_compress(self): return self.txt_idx @classmethod def history_decompress(cls, value, create=False): if not value: return [] res = [] for txt_idx in value: try: res.append(cls.objects.get(txt_idx=txt_idx)) except cls.DoesNotExist: continue return res @property def explicit_label(self): return u"{} ({})".format(self.label, self._meta.verbose_name) @classmethod def create_default_for_test(cls): return [cls.objects.create(label='Test %d' % i) for i in range(5)] @property def short_label(self): return self.label @property def name(self): return self.label @classmethod def get_or_create(cls, slug, label=''): """ Get or create a new item. :param slug: textual id :param label: label for initialization if the item doesn't exist (not mandatory) :return: instancied item of the base class """ item = cls.get_cache(slug) if item: return item item, created = cls.objects.get_or_create( txt_idx=slug, defaults={'label': label}) return item @classmethod def get_or_create_pk(cls, slug): """ Get an id from a slug. Create the associated item if needed. :param slug: textual id :return: id of the item (string) """ return unicode(cls.get_or_create(slug).pk) @classmethod def get_or_create_pks(cls, slugs): """ Get and merge a list of ids from a slug list. Create the associated items if needed. :param slugs: textual ids :return: string with ids separated by "_" """ items = [] for slug in slugs: items.append(str(cls.get_or_create(slug).pk)) return u"_".join(items) @classmethod def get_help(cls, dct={}, exclude=[], force=False): keys = ['__get_help'] keys += [u"{}".format(ex) for ex in exclude] keys += [u'{}-{}'.format(unicode(k), dct[k]) for k in dct] cache_key, value = get_cache(cls, keys) if value and not force: return mark_safe(value) help_text = cls.HELP_TEXT c_rank = -1 help_items = u"\n" for item in cls.get_types(dct=dct, instances=True, exclude=exclude): if hasattr(item, '__iter__'): pk = item[0] item = cls.objects.get(pk=pk) item.rank = c_rank + 1 if hasattr(item, 'parent'): c_item = item parents = [] while c_item.parent: parents.append(c_item.parent.label) c_item = c_item.parent parents.reverse() parents.append(item.label) item.label = u" / ".join(parents) if not item.comment: continue if c_rank > item.rank: help_items += u"\n" elif c_rank < item.rank: help_items += u"
\n" c_rank = item.rank help_items += u"
%s
%s
" % ( item.label, u"
".join(item.comment.split('\n'))) c_rank += 1 if c_rank: help_items += c_rank * u"
" if help_text or help_items != u'\n': help_text = help_text + help_items else: help_text = u"" cache.set(cache_key, help_text, settings.CACHE_TIMEOUT) return mark_safe(help_text) @classmethod def _get_initial_types(cls, initial, type_pks, instance=False): new_vals = [] if not initial: return [] if type(initial) not in (list, tuple): initial = [initial] for value in initial: try: pk = int(value) except (ValueError, TypeError): continue if pk in type_pks: continue try: extra_type = cls.objects.get(pk=pk) if instance: new_vals.append(extra_type) else: new_vals.append((extra_type.pk, unicode(extra_type))) except cls.DoesNotExist: continue return new_vals @classmethod def get_types(cls, dct={}, instances=False, exclude=[], empty_first=True, default=None, initial=None, force=False): types = [] if not instances and empty_first and not default: types = [('', '--')] types += cls._pre_get_types(dct, instances, exclude, default, force) if not initial: return types new_vals = cls._get_initial_types(initial, [idx for idx, lbl in types]) types += new_vals return types @classmethod def _pre_get_types(cls, dct={}, instances=False, exclude=[], default=None, force=False): # cache cache_key = None if not instances: keys = ['__get_types'] keys += [u"{}".format(ex) for ex in exclude] + \ [u"{}".format(default)] keys += [u'{}-{}'.format(unicode(k), dct[k]) for k in dct] cache_key, value = get_cache(cls, keys) if value and not force: return value base_dct = dct.copy() if hasattr(cls, 'parent'): if not cache_key: return cls._get_parent_types( base_dct, instances, exclude=exclude, default=default) vals = [v for v in cls._get_parent_types( base_dct, instances, exclude=exclude, default=default)] cache.set(cache_key, vals, settings.CACHE_TIMEOUT) return vals if not cache_key: return cls._get_types(base_dct, instances, exclude=exclude, default=default) vals = [ v for v in cls._get_types(base_dct, instances, exclude=exclude, default=default) ] cache.set(cache_key, vals, settings.CACHE_TIMEOUT) return vals @classmethod def _get_types(cls, dct=None, instances=False, exclude=None, default=None): if not dct: dct = {} if not exclude: exclude = [] dct['available'] = True if default: try: default = cls.objects.get(txt_idx=default) yield (default.pk, _(unicode(default))) except cls.DoesNotExist: pass items = cls.objects.filter(**dct) if default and default != "None": if hasattr(default, 'txt_idx'): exclude.append(default.txt_idx) else: exclude.append(default) if exclude: items = items.exclude(txt_idx__in=exclude) for item in items.order_by(*cls._meta.ordering).all(): if instances: item.rank = 0 yield item else: yield (item.pk, _(unicode(item)) if item and unicode(item) else '') @classmethod def _get_childs_list(cls, dct=None, exclude=None, instances=False): if not dct: dct = {} if not exclude: exclude = [] if 'parent' in dct: dct.pop('parent') childs = cls.objects.filter(**dct) if exclude: childs = childs.exclude(txt_idx__in=exclude) if hasattr(cls, 'order'): childs = childs.order_by('order') res = {} if instances: for item in childs.all(): parent_id = item.parent_id or 0 if parent_id not in res: res[parent_id] = [] res[parent_id].append(item) else: for item in childs.values("id", "parent_id", "label").all(): parent_id = item["parent_id"] or 0 if parent_id not in res: res[parent_id] = [] res[parent_id].append((item["id"], item["label"])) return res PREFIX = "│ " PREFIX_EMPTY = "  " PREFIX_MEDIUM = "├ " PREFIX_LAST = "└ " PREFIX_CODES = [u"\u2502", u"\u251C", u"\u2514"] @classmethod def _get_childs(cls, item, child_list, prefix=0, instances=False, is_last=False, last_of=None): if not last_of: last_of = [] prefix += 1 current_child_lst = [] if item in child_list: current_child_lst = child_list[item] lst = [] total = len(current_child_lst) for idx, child in enumerate(current_child_lst): mylast_of = last_of[:] if instances: child.rank = prefix lst.append(child) else: p = '' cprefix = prefix while cprefix: cprefix -= 1 if not cprefix: if (idx + 1) == total: p += cls.PREFIX_LAST else: p += cls.PREFIX_MEDIUM elif is_last: if mylast_of: clast = mylast_of.pop(0) if clast: p += cls.PREFIX_EMPTY else: p += cls.PREFIX else: p += cls.PREFIX_EMPTY else: p += cls.PREFIX lst.append(( child[0], SafeUnicode(p + unicode(_(child[1]))) )) clast_of = last_of[:] clast_of.append(idx + 1 == total) if instances: child_id = child.id else: child_id = child[0] for sub_child in cls._get_childs( child_id, child_list, prefix, instances, is_last=((idx + 1) == total), last_of=clast_of): lst.append(sub_child) return lst @classmethod def _get_parent_types(cls, dct={}, instances=False, exclude=[], default=None): dct['available'] = True dct['parent'] = None items = cls.objects.filter(**dct) if exclude: items = items.exclude(txt_idx__in=exclude) if hasattr(cls, 'order'): items = items.order_by('order') child_list = cls._get_childs_list(dct, exclude, instances) if 0 in child_list: for item in child_list[0]: if instances: item.rank = 0 item_id = item.pk yield item else: item_id = item[0] yield item for child in cls._get_childs( item_id, child_list, instances=instances): yield child def save(self, *args, **kwargs): if not self.id and not self.label: self.label = u" ".join(u" ".join(self.txt_idx.split('-')) .split('_')).title() if not self.txt_idx: self.txt_idx = slugify(self.label)[:100] # clean old keys if self.pk: old = self.__class__.objects.get(pk=self.pk) content_type = ContentType.objects.get_for_model(self.__class__) if slugify(self.label) != slugify(old.label): ItemKey.objects.filter( object_id=self.pk, key=slugify(old.label), content_type=content_type).delete() if self.txt_idx != old.txt_idx: ItemKey.objects.filter( object_id=self.pk, key=old.txt_idx, content_type=content_type).delete() obj = super(GeneralType, self).save(*args, **kwargs) self.generate_key(force=True) return obj def add_key(self, key, force=False, importer=None, group=None, user=None): content_type = ContentType.objects.get_for_model(self.__class__) if not importer and not force and ItemKey.objects.filter( key=key, content_type=content_type).count(): return filtr = {'key': key, 'content_type': content_type} if group: filtr['group'] = group elif user: filtr['user'] = user else: filtr['importer'] = importer if force: ItemKey.objects.filter(**filtr).exclude(object_id=self.pk).delete() filtr['object_id'] = self.pk ItemKey.objects.get_or_create(**filtr) def generate_key(self, force=False): for key in (slugify(self.label), self.txt_idx): self.add_key(key) def get_keys(self, importer): keys = [self.txt_idx] content_type = ContentType.objects.get_for_model(self.__class__) base_q = Q(content_type=content_type, object_id=self.pk) subquery = Q(importer__isnull=True, user__isnull=True, group__isnull=True) subquery |= Q(user__isnull=True, group__isnull=True, importer=importer) if importer.user: subquery |= Q(user=importer.user, group__isnull=True, importer=importer) if importer.associated_group: subquery |= Q(user__isnull=True, group=importer.associated_group, importer=importer) q = ItemKey.objects.filter(base_q & subquery) for ik in q.exclude(key=self.txt_idx).all(): keys.append(ik.key) return keys @classmethod def generate_keys(cls): # content_type = ContentType.objects.get_for_model(cls) for item in cls.objects.all(): item.generate_key() class HierarchicalType(GeneralType): parent = models.ForeignKey('self', blank=True, null=True, on_delete=models.SET_NULL, verbose_name=_(u"Parent")) class Meta: abstract = True def full_label(self): lbls = [self.label] item = self while item.parent: item = item.parent lbls.append(item.label) return u" > ".join(reversed(lbls)) class ItemKey(models.Model): key = models.TextField(_(u"Key")) content_type = models.ForeignKey(ContentType) object_id = models.PositiveIntegerField() content_object = GenericForeignKey('content_type', 'object_id') importer = models.ForeignKey( Import, null=True, blank=True, help_text=_(u"Specific key to an import")) user = models.ForeignKey('IshtarUser', blank=True, null=True) group = models.ForeignKey(TargetKeyGroup, blank=True, null=True) def __unicode__(self): return self.key def get_image_path(instance, filename): # when using migrations instance is not a real ImageModel instance if not hasattr(instance, '_get_image_path'): return u"upload/{}".format(filename) return instance._get_image_path(filename) class ImageContainerModel(object): def _get_image_path(self, filename): return u"{}/{}".format(self._get_base_image_path(), filename) def _get_base_image_path(self): return u"upload" class ImageModel(models.Model, ImageContainerModel): image = models.ImageField(upload_to=get_image_path, blank=True, null=True, max_length=255, help_text=max_size_help()) thumbnail = models.ImageField( upload_to=get_image_path, blank=True, null=True, max_length=255, help_text=max_size_help()) IMAGE_MAX_SIZE = settings.IMAGE_MAX_SIZE THUMB_MAX_SIZE = settings.THUMB_MAX_SIZE IMAGE_PREFIX = '' class Meta: abstract = True def has_changed(self, field): if not self.pk: return True manager = getattr(self.__class__, 'objects') old = getattr(manager.get(pk=self.pk), field) return not getattr(self, field) == old def create_thumb(self, image, size): """Returns the image resized to fit inside a box of the given size""" image.thumbnail(size, Image.ANTIALIAS) temp = StringIO() image.save(temp, 'jpeg') temp.seek(0) return SimpleUploadedFile('temp', temp.read()) def save(self, *args, **kwargs): if 'force_copy' in kwargs: kwargs.pop('force_copy') super(ImageModel, self).save(*args, **kwargs) return # manage images if not self.has_changed('image'): return super(ImageModel, self).save(*args, **kwargs) if not self.image: self.thumbnail = None return super(ImageModel, self).save(*args, **kwargs) # # generate thumbnail # convert to jpg filename = os.path.splitext(os.path.split(self.image.name)[-1])[0] old_path = self.image.path filename = "%s.jpg" % filename try: image = Image.open(self.image.file) # convert to RGB if image.mode not in ('L', 'RGB'): image = image.convert('RGB') # resize if necessary if self.IMAGE_MAX_SIZE: self.image.save(filename, self.create_thumb(image, self.IMAGE_MAX_SIZE), save=False) if old_path != self.image.path: try: os.remove(old_path) except OSError: # already clean pass # save the thumbnail thumb_filename = self._get_thumb_name(filename) self.thumbnail.save( thumb_filename, self.create_thumb(image, self.THUMB_MAX_SIZE), save=False) except IOError: pass return super(ImageModel, self).save(*args, **kwargs) def _get_thumb_name(self, filename): splited = filename.split('.') return u"{}-thumb.{}".format( u".".join(splited[:-1]), splited[-1] ) class HistoryError(Exception): def __init__(self, value): self.value = value def __str__(self): return repr(self.value) PRIVATE_FIELDS = ('id', 'history_modifier', 'order') class BulkUpdatedItem(object): @classmethod def bulk_recursion(cls, transaction_id, extra_args): """ Prevent infinite recursion. Should not happen but wrong manipulation in the database or messy imports can generate circular relations :param transaction_id: current transaction ID (unix time) - if null a transaction ID is generated :param extra_args: arguments dealing with :return: (transaction ID, is a recursion) """ if not transaction_id: transaction_id = unicode(time.time()) args = ['cached_label_bulk_update', transaction_id] + extra_args key, val = get_cache(cls, args) if val: return transaction_id, True cache.set(key, 1, settings.CACHE_SMALLTIMEOUT) return transaction_id, False class RelationItem(models.Model): """ Items with relation between them """ relation_image = models.FileField( _(u"Generated relation image (SVG)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help() ) class Meta: abstract = True def generate_relation_image(self): generate_relation_graph(self) class JsonDataSection(models.Model): content_type = models.ForeignKey(ContentType) name = models.CharField(_(u"Name"), max_length=200) order = models.IntegerField(_(u"Order"), default=10) class Meta: verbose_name = _(u"Json data - Menu") verbose_name_plural = _(u"Json data - Menus") ordering = ['order', 'name'] def __unicode__(self): return u"{} - {}".format(self.content_type, self.name) JSON_VALUE_TYPES = ( ('T', _(u"Text")), ('LT', _(u"Long text")), ('I', _(u"Integer")), ('B', _(u"Boolean")), ('F', _(u"Float")), ('D', _(u"Date")), ('C', _(u"Choices")), ) class JsonDataField(models.Model): name = models.CharField(_(u"Name"), max_length=200) content_type = models.ForeignKey(ContentType) key = models.CharField( _(u"Key"), max_length=200, help_text=_(u"Value of the key in the JSON schema. For hierarchical " u"key use \"__\" to explain it. For instance for the key " u"'my_subkey' with data such as {'my_key': {'my_subkey': " u"'value'}}, its value will be reached with my_key__my_subkey.")) display = models.BooleanField(_(u"Display"), default=True) value_type = models.CharField(_(u"Type"), default="T", max_length=10, choices=JSON_VALUE_TYPES) order = models.IntegerField(_(u"Order"), default=10) search_index = models.BooleanField(_(u"Use in search indexes"), default=False) section = models.ForeignKey(JsonDataSection, blank=True, null=True, on_delete=models.SET_NULL) custom_forms = models.ManyToManyField( "CustomForm", blank=True, through="CustomFormJsonField") class Meta: verbose_name = _(u"Json data - Field") verbose_name_plural = _(u"Json data - Fields") ordering = ['order', 'name'] def __unicode__(self): return u"{} - {}".format(self.content_type, self.name) def clean(self): if not self.section: return if self.section.content_type != self.content_type: raise ValidationError( _(u"Content types of the field and of the menu do not match")) class JsonData(models.Model, CachedGen): data = JSONField(default={}, db_index=True, blank=True) class Meta: abstract = True def pre_save(self): if not self.data: self.data = {} @property def json_sections(self): sections = [] try: content_type = ContentType.objects.get_for_model(self) except ContentType.DoesNotExists: return sections fields = list(JsonDataField.objects.filter( content_type=content_type, display=True, section__isnull=True ).all()) # no section fields fields += list(JsonDataField.objects.filter( content_type=content_type, display=True, section__isnull=False ).order_by('section__order', 'order').all()) for field in fields: value = None data = self.data.copy() for key in field.key.split('__'): if key in data: value = copy.copy(data[key]) data = data[key] else: value = None break if value is None: continue if type(value) in (list, tuple): value = u" ; ".join([unicode(v) for v in value]) section_name = field.section.name if field.section else None if not sections or section_name != sections[-1][0]: # if section name is identical it is the same sections.append((section_name, [])) sections[-1][1].append((field.name, value)) return sections @classmethod def refresh_cache(cls): __, refreshed = get_cache(cls, ['cache_refreshed']) if refreshed and time.time() - refreshed < 1: return cache_ckey, current_keys = get_cache(cls, ['_current_keys']) if not current_keys: return for keys in current_keys: if keys[0] == '__get_dynamic_choices': cls._get_dynamic_choices(keys[1], force=True) @classmethod def _get_dynamic_choices(cls, key, force=False): """ Get choice from existing values :param key: data key :param force: if set to True do not use cache :return: tuple of choices (id, value) """ cache_key, value = get_cache(cls, ['__get_dynamic_choices', key]) if not force and value: return value choices = set() splitted_key = key[len('data__'):].split('__') q = cls.objects.filter( data__has_key=key[len('data__'):]).values_list('data', flat=True) for value in q.all(): for k in splitted_key: value = value[k] choices.add(value) choices = [('', '')] + [(v, v) for v in sorted(list(choices))] cache.set(cache_key, choices, settings.CACHE_SMALLTIMEOUT) return choices class Imported(models.Model): imports = models.ManyToManyField( Import, blank=True, related_name="imported_%(app_label)s_%(class)s") class Meta: abstract = True class SearchAltName(object): def __init__(self, search_key, search_query, extra_query=None, distinct_query=False): self.search_key = search_key self.search_query = search_query self.extra_query = extra_query or {} self.distinct_query = distinct_query class DynamicRequest(object): def __init__(self, label, app_name, model_name, form_key, search_key, type_query, search_query): self.label = label self.form_key = form_key self.search_key = search_key self.app_name = app_name self.model_name = model_name self.type_query = type_query self.search_query = search_query def get_all_types(self): model = apps.get_app_config(self.app_name).get_model(self.model_name) return model.objects.filter(available=True) def get_form_fields(self): fields = {} for item in self.get_all_types().all(): fields[self.form_key + "-" + item.txt_idx] = forms.CharField( label=unicode(self.label) + u" " + unicode(item), required=False ) return fields def get_extra_query(self, slug): return { self.type_query: slug } def get_alt_names(self): alt_names = {} for item in self.get_all_types().all(): alt_names[self.form_key + "-" + item.txt_idx] = SearchAltName( self.search_key + "-" + item.txt_idx, self.search_query, self.get_extra_query(item.txt_idx), distinct_query=True ) return alt_names class SearchVectorConfig(object): def __init__(self, key, language=None, func=None): self.key = key if language: self.language = language if language == "local": self.language = settings.ISHTAR_SEARCH_LANGUAGE else: self.language = "simple" self.func = func def format(self, value): if not self.func: return [value] return self.func(value) class FullSearch(models.Model): search_vector = SearchVectorField(_("Search vector"), blank=True, null=True, help_text=_("Auto filled at save")) EXTRA_REQUEST_KEYS = {} DYNAMIC_REQUESTS = {} ALT_NAMES = {} BASE_SEARCH_VECTORS = [] PROPERTY_SEARCH_VECTORS = [] INT_SEARCH_VECTORS = [] M2M_SEARCH_VECTORS = [] PARENT_SEARCH_VECTORS = [] # prevent circular dependency PARENT_ONLY_SEARCH_VECTORS = [] class Meta: abstract = True @classmethod def general_types(cls): for k in get_all_field_names(cls): field = cls._meta.get_field(k) if not hasattr(field, 'rel') or not field.rel: continue rel_model = field.rel.to if issubclass(rel_model, (GeneralType, HierarchicalType)): yield k @classmethod def get_alt_names(cls): alt_names = cls.ALT_NAMES.copy() for dr_k in cls.DYNAMIC_REQUESTS: alt_names.update(cls.DYNAMIC_REQUESTS[dr_k].get_alt_names()) return alt_names @classmethod def get_query_parameters(cls): query_parameters = {} for v in cls.get_alt_names().values(): for language_code, language_lbl in settings.LANGUAGES: activate(language_code) query_parameters[unicode(v.search_key)] = v deactivate() return query_parameters def _update_search_field(self, search_vector_conf, search_vectors, data): for value in search_vector_conf.format(data): with connection.cursor() as cursor: cursor.execute("SELECT to_tsvector(%s, %s)", [ search_vector_conf.language, value]) row = cursor.fetchone() search_vectors.append(row[0]) def _update_search_number_field(self, search_vectors, val): search_vectors.append("'{}':1".format(val)) def update_search_vector(self, save=True, exclude_parent=False): """ Update the search vector :param save: True if you want to save the object immediately :return: True if modified """ if not hasattr(self, 'search_vector'): return if not self.pk: # logger.warning("Cannot update search vector before save or " # "after deletion.") return if not self.BASE_SEARCH_VECTORS and not self.M2M_SEARCH_VECTORS \ and not self.INT_SEARCH_VECTORS \ and not self.PROPERTY_SEARCH_VECTORS \ and not self.PARENT_SEARCH_VECTORS: logger.warning("No search_vectors defined for {}".format( self.__class__)) return if getattr(self, '_search_updated', None): return self._search_updated = True old_search = "" if self.search_vector: old_search = self.search_vector[:] search_vectors = [] base_q = self.__class__.objects.filter(pk=self.pk) # many to many have to be queried one by one otherwise only one is fetch for m2m_search_vector in self.M2M_SEARCH_VECTORS: key = m2m_search_vector.key.split('__')[0] rel_key = getattr(self, key) for item in rel_key.values('pk').all(): query_dct = {key + "__pk": item['pk']} q = copy.copy(base_q).filter(**query_dct) q = q.annotate( search=SearchVector( m2m_search_vector.key, config=m2m_search_vector.language) ).values('search') search_vectors.append(q.all()[0]['search']) # int/float are not well managed by the SearchVector for int_search_vector in self.INT_SEARCH_VECTORS: q = base_q.values(int_search_vector.key) for val in int_search_vector.format( q.all()[0][int_search_vector.key]): self._update_search_number_field(search_vectors, val) if not exclude_parent: # copy parent vector fields for PARENT_SEARCH_VECTOR in self.PARENT_SEARCH_VECTORS: parent = getattr(self, PARENT_SEARCH_VECTOR) if hasattr(parent, 'all'): # m2m for p in parent.all(): search_vectors.append(p.search_vector) elif parent: search_vectors.append(parent.search_vector) for PARENT_ONLY_SEARCH_VECTOR in self.PARENT_ONLY_SEARCH_VECTORS: parent = getattr(self, PARENT_ONLY_SEARCH_VECTOR) if hasattr(parent, 'all'): # m2m for p in parent.all(): search_vectors.append( p.update_search_vector(save=False, exclude_parent=True) ) elif parent: search_vectors.append( parent.update_search_vector(save=False, exclude_parent=True) ) if self.BASE_SEARCH_VECTORS: # query "simple" fields q = base_q.values(*[sv.key for sv in self.BASE_SEARCH_VECTORS]) res = q.all()[0] for base_search_vector in self.BASE_SEARCH_VECTORS: data = res[base_search_vector.key] data = unidecode(unicode(data)) self._update_search_field(base_search_vector, search_vectors, data) if self.PROPERTY_SEARCH_VECTORS: for property_search_vector in self.PROPERTY_SEARCH_VECTORS: data = getattr(self, property_search_vector.key) if callable(data): data = data() if not data: continue data = unicode(data) self._update_search_field(property_search_vector, search_vectors, data) if hasattr(self, 'data') and self.data: content_type = ContentType.objects.get_for_model(self) for json_field in JsonDataField.objects.filter( content_type=content_type, search_index=True).all(): data = copy.deepcopy(self.data) no_data = False for key in json_field.key.split('__'): if key not in data: no_data = True break data = data[key] if no_data or not data: continue if json_field.value_type == 'B': if data is True: data = json_field.name else: continue elif json_field.value_type in ('I', 'F'): self._update_search_number_field(search_vectors, data) continue elif json_field.value_type == 'D': # only index year self._update_search_number_field(search_vectors, data.year) continue for lang in ("simple", settings.ISHTAR_SEARCH_LANGUAGE): with connection.cursor() as cursor: cursor.execute("SELECT to_tsvector(%s, %s)", [lang, data]) row = cursor.fetchone() search_vectors.append(row[0]) new_search_vector = merge_tsvectors(search_vectors) changed = old_search != new_search_vector if save and changed: self.search_vector = new_search_vector self.skip_history_when_saving = True self.save() elif not save: return new_search_vector return changed class FixAssociated(object): ASSOCIATED = {} def fix_associated(self): for key in self.ASSOCIATED: item = getattr(self, key) if not item: continue dct = self.ASSOCIATED[key] for dct_key in dct: subkey, ctype = dct_key expected_values = dct[dct_key] if not isinstance(expected_values, (list, tuple)): expected_values = [expected_values] if hasattr(ctype, "txt_idx"): try: expected_values = [ctype.objects.get(txt_idx=v) for v in expected_values] except ctype.DoesNotExist: # type not yet initialized return current_vals = getattr(item, subkey) is_many = False if hasattr(current_vals, "all"): is_many = True current_vals = current_vals.all() else: current_vals = [current_vals] is_ok = False for current_val in current_vals: if current_val in expected_values: is_ok = True break if is_ok: continue # the first value is used new_value = expected_values[0] if is_many: getattr(item, subkey).add(new_value) else: setattr(item, subkey, new_value) class QRCodeItem(models.Model, ImageContainerModel): HAS_QR_CODE = True qrcode = models.ImageField(upload_to=get_image_path, blank=True, null=True, max_length=255) class Meta: abstract = True def generate_qrcode(self, request=None, secure=True, tmpdir=None): url = self.get_absolute_url() site = Site.objects.get_current() if request: scheme = request.scheme else: if secure: scheme = "https" else: scheme = "http" url = scheme + "://" + site.domain + url qr = pyqrcode.create(url, version=settings.ISHTAR_QRCODE_VERSION) tmpdir_created = False if not tmpdir: tmpdir = tempfile.mkdtemp("-qrcode") tmpdir_created = True filename = tmpdir + os.sep + 'qrcode.png' qr.png(filename, scale=settings.ISHTAR_QRCODE_SCALE) self.qrcode.save( "qrcode.png", File(open(filename, 'rb'))) self.skip_history_when_saving = True self._no_move = True self.save() if tmpdir_created: shutil.rmtree(tmpdir) class DocumentItem(object): @property def images(self): if not hasattr(self, 'documents'): return Document.objects.none() return self.documents.filter( image__isnull=False).exclude(image="").order_by("pk") @property def images_without_main_image(self): if not hasattr(self, 'main_image') or not hasattr(self, 'documents'): return self.images if not self.main_image: return self.documents.filter( image__isnull=False).exclude( image="").order_by("pk") return self.documents.filter( image__isnull=False).exclude( image="").exclude(pk=self.main_image.pk).order_by("pk") def get_extra_actions(self, request): """ For sheet template: return "Add document / image" action """ # url, base_text, icon, extra_text, extra css class, is a quick action actions = [] if not hasattr(self, 'SLUG'): return actions can_add_doc = self.can_do(request, 'add_document') if can_add_doc: actions = [ ( reverse("create-document") + "?{}={}".format( self.SLUG, self.pk), _(u"Add document/image"), "fa fa-plus", _(u"doc./image"), "", False ) ] return actions class SpatialReferenceSystem(GeneralType): order = models.IntegerField(_(u"Order"), default=10) auth_name = models.CharField( _(u"Authority name"), default=u'EPSG', max_length=256) srid = models.IntegerField(_(u"Authority SRID")) class Meta: verbose_name = _(u"Spatial reference system") verbose_name_plural = _(u"Spatial reference systems") ordering = ('label',) post_save.connect(post_save_cache, sender=SpatialReferenceSystem) post_delete.connect(post_save_cache, sender=SpatialReferenceSystem) class GeoItem(models.Model): GEO_SOURCE = ( ('T', _(u"Town")), ('P', _(u"Precise")), ('M', _("Polygon")) ) # gis x = models.FloatField(_(u'X'), blank=True, null=True) y = models.FloatField(_(u'Y'), blank=True, null=True) z = models.FloatField(_(u'Z'), blank=True, null=True) estimated_error_x = models.FloatField(_(u'Estimated error for X'), blank=True, null=True) estimated_error_y = models.FloatField(_(u'Estimated error for Y'), blank=True, null=True) estimated_error_z = models.FloatField(_(u'Estimated error for Z'), blank=True, null=True) spatial_reference_system = models.ForeignKey( SpatialReferenceSystem, verbose_name=_(u"Spatial Reference System"), blank=True, null=True) point = models.PointField(_(u"Point"), blank=True, null=True, dim=3) point_2d = models.PointField(_(u"Point (2D)"), blank=True, null=True) point_source = models.CharField( _(u"Point source"), choices=GEO_SOURCE, max_length=1, blank=True, null=True) point_source_item = models.CharField( _(u"Point source item"), max_length=100, blank=True, null=True) multi_polygon = models.MultiPolygonField(_(u"Multi polygon"), blank=True, null=True) multi_polygon_source = models.CharField( _(u"Multi-polygon source"), choices=GEO_SOURCE, max_length=1, blank=True, null=True) multi_polygon_source_item = models.CharField( _(u"Multi polygon source item"), max_length=100, blank=True, null=True) GEO_LABEL = "" class Meta: abstract = True def get_town_centroid(self): raise NotImplementedError def get_town_polygons(self): raise NotImplementedError def get_precise_points(self): if self.point_source == 'P' and self.point_2d: return self.point_2d, self.point, self.point_source_item def get_precise_polygons(self): if self.multi_polygon_source == 'P' and self.multi_polygon: return self.multi_polygon, self.multi_polygon_source_item def most_precise_geo(self): if self.point_source == 'M': return 'multi_polygon' current_source = unicode(self.__class__._meta.verbose_name) if self.multi_polygon_source_item == current_source \ and (self.multi_polygon_source == "P" or self.point_source_item != current_source): return 'multi_polygon' if self.point_source_item == current_source\ and self.point_source == 'P': return 'point' if self.multi_polygon_source == 'P': return 'multi_polygon' if self.point_source == 'P': return 'point' if self.multi_polygon: return 'multi_polygon' if self.point_2d: return 'point' def geo_point_source(self): if not self.point_source: return "" src = u"{} - {}".format( dict(self.GEO_SOURCE)[self.point_source], self.point_source_item ) return src def geo_polygon_source(self): if not self.multi_polygon_source: return "" src = u"{} - {}".format( dict(self.GEO_SOURCE)[self.multi_polygon_source], self.multi_polygon_source_item ) return src def _geojson_serialize(self, geom_attr): if not hasattr(self, geom_attr): return "" cached_label_key = 'cached_label' if self.GEO_LABEL: cached_label_key = self.GEO_LABEL if getattr(self, "CACHED_LABELS", None): cached_label_key = self.CACHED_LABELS[-1] geojson = serialize( 'geojson', self.__class__.objects.filter(pk=self.pk), geometry_field=geom_attr, fields=(cached_label_key,)) geojson_dct = json.loads(geojson) features = geojson_dct.pop('features') for idx in range(len(features)): feature = features[idx] lbl = feature['properties'].pop(cached_label_key) feature['properties']['name'] = lbl feature['properties']['id'] = self.pk geojson_dct['features'] = features geojson_dct['link_template'] = simple_link_to_window(self).replace( '999999', '' ) geojson = json.dumps(geojson_dct) return geojson @property def point_2d_geojson(self): return self._geojson_serialize('point_2d') @property def multi_polygon_geojson(self): return self._geojson_serialize('multi_polygon') class BaseHistorizedItem(DocumentItem, FullSearch, Imported, JsonData, FixAssociated): """ Historized item with external ID management. All historized items are searchable and have a data json field. """ IS_BASKET = False SHOW_URL = None EXTERNAL_ID_KEY = '' EXTERNAL_ID_DEPENDENCIES = [] HISTORICAL_M2M = [] history_modifier = models.ForeignKey( User, related_name='+', on_delete=models.SET_NULL, verbose_name=_(u"Last editor"), blank=True, null=True) history_creator = models.ForeignKey( User, related_name='+', on_delete=models.SET_NULL, verbose_name=_(u"Creator"), blank=True, null=True) last_modified = models.DateTimeField(auto_now=True) history_m2m = JSONField(default={}, blank=True) class Meta: abstract = True @classmethod def get_verbose_name(cls): return cls._meta.verbose_name def merge(self, item, keep_old=False): merge_model_objects(self, item, keep_old=keep_old) def update_external_id(self, save=False): if not self.EXTERNAL_ID_KEY or ( self.external_id and not getattr(self, 'auto_external_id', False)): return external_id = get_external_id(self.EXTERNAL_ID_KEY, self) if external_id == self.external_id: return self.auto_external_id = True self.external_id = external_id self._cached_label_checked = False if save: self.skip_history_when_saving = True self.save() return external_id def get_last_history_date(self): q = self.history.values("history_date").order_by('-history_date') if not q.count(): return return q.all()[0]['history_date'] def get_previous(self, step=None, date=None, strict=True): """ Get a "step" previous state of the item """ assert step or date historized = self.history.all() item = None if step: if len(historized) <= step: # silently return the last step if too far in the history item = historized[len(historized) - 1] else: item = historized[step] else: for step, item in enumerate(historized): if item.history_date == date: break # ended with no match if item.history_date != date: return item._step = step if len(historized) != (step + 1): item._previous = historized[step + 1].history_date else: item._previous = None if step > 0: item._next = historized[step - 1].history_date else: item._next = None item.history_date = historized[step].history_date model = self.__class__ for k in get_all_field_names(model): field = model._meta.get_field(k) if hasattr(field, 'rel') and field.rel: if not hasattr(item, k + '_id'): setattr(item, k, getattr(self, k)) continue val = getattr(item, k + '_id') if not val: setattr(item, k, None) continue try: val = field.rel.to.objects.get(pk=val) setattr(item, k, val) except ObjectDoesNotExist: if strict: raise HistoryError(u"The class %s has no pk %d" % ( unicode(field.rel.to), val)) setattr(item, k, None) item.pk = self.pk return item @property def last_edition_date(self): try: return self.history.order_by('-history_date').all()[0].history_date except (AttributeError, IndexError): return @property def history_creation_date(self): try: return self.history.order_by('history_date').all()[0].history_date except (AttributeError, IndexError): return def rollback(self, date): """ Rollback to a previous state """ to_del, new_item = [], None for item in self.history.all(): if item.history_date == date: new_item = item break to_del.append(item) if not new_item: raise HistoryError(u"The date to rollback to doesn't exist.") try: field_keys = [f.name for f in self._meta.fields] for k in field_keys: if k != 'id' and hasattr(self, k): if not hasattr(new_item, k): k = k + "_id" setattr(self, k, getattr(new_item, k)) try: self.history_modifier = User.objects.get( pk=new_item.history_modifier_id) except User.ObjectDoesNotExist: pass self.save() saved_m2m = new_item.history_m2m.copy() for hist_key in self.HISTORICAL_M2M: # after each association m2m is rewrite - force the original # to be reset new_item.history_m2m = saved_m2m values = new_item.m2m_listing(hist_key, create=True) or [] hist_field = getattr(self, hist_key) hist_field.clear() for val in values: hist_field.add(val) # force label regeneration self._cached_label_checked = False self.save() except ObjectDoesNotExist: raise HistoryError(u"The rollback has failed.") # clean the obsolete history for historized_item in to_del: historized_item.delete() def m2m_listing(self, key): return getattr(self, key).all() def values(self): values = {} for f in self._meta.fields: k = f.name if k != 'id': values[k] = getattr(self, k) return values def get_absolute_url(self): try: return reverse('display-item', args=[self.SLUG, self.pk]) except NoReverseMatch: return def get_show_url(self): show_url = self.SHOW_URL if not show_url: show_url = 'show-' + self.__class__.__name__.lower() try: return reverse(show_url, args=[self.pk, '']) except NoReverseMatch: return @property def associated_filename(self): if [True for attr in ('get_town_label', 'get_department', 'reference', 'short_class_name') if not hasattr(self, attr)]: return '' items = [slugify(self.get_department()), slugify(self.get_town_label()).upper(), slugify(self.short_class_name), slugify(self.reference), slugify(self.name or '').replace('-', '_').capitalize()] last_edition_date = self.last_edition_date if last_edition_date: items.append(last_edition_date.strftime('%Y%m%d')) else: items.append('00000000') return u"-".join([unicode(item) for item in items]) def save(self, *args, **kwargs): created = not self.pk if not getattr(self, 'skip_history_when_saving', False): assert hasattr(self, 'history_modifier') if created: self.history_creator = self.history_modifier # external ID can have related item not available before save external_id_updated = kwargs.pop('external_id_updated') \ if 'external_id_updated' in kwargs else False if not created and not external_id_updated: self.update_external_id() super(BaseHistorizedItem, self).save(*args, **kwargs) if created and self.update_external_id(): # force resave for external ID creation self.skip_history_when_saving = True self._updated_id = True return self.save(external_id_updated=True) for dep in self.EXTERNAL_ID_DEPENDENCIES: for obj in getattr(self, dep).all(): obj.update_external_id(save=True) self.fix_associated() return True LOGICAL_TYPES = ( ('above', _(u"Above")), ('bellow', _(u"Bellow")), ('equal', _(u"Equal")) ) class GeneralRelationType(GeneralType): order = models.IntegerField(_(u"Order"), default=1) symmetrical = models.BooleanField(_(u"Symmetrical")) tiny_label = models.CharField(_(u"Tiny label"), max_length=50, blank=True, null=True) inverse_relation = models.ForeignKey( 'self', verbose_name=_(u"Inverse relation"), blank=True, null=True) logical_relation = models.CharField( verbose_name=_(u"Logical relation"), max_length=10, choices=LOGICAL_TYPES, blank=True, null=True) class Meta: abstract = True def clean(self): # cannot have symmetrical and an inverse_relation if self.symmetrical and self.inverse_relation: raise ValidationError( _(u"Cannot have symmetrical and an inverse_relation")) def get_tiny_label(self): return self.tiny_label or self.label or u"" def save(self, *args, **kwargs): obj = super(GeneralRelationType, self).save(*args, **kwargs) # after saving check that the inverse_relation of the inverse_relation # point to the saved object if self.inverse_relation \ and (not self.inverse_relation.inverse_relation or self.inverse_relation.inverse_relation != self): self.inverse_relation.inverse_relation = self self.inverse_relation.symmetrical = False self.inverse_relation.save() return obj class GeneralRecordRelations(object): @classmethod def general_types(cls): return ['relation_type'] def save(self, *args, **kwargs): super(GeneralRecordRelations, self).save(*args, **kwargs) # after saving create the symetric or the inverse relation sym_rel_type = self.relation_type if not self.relation_type.symmetrical: sym_rel_type = self.relation_type.inverse_relation # no symetric/inverse is defined if not sym_rel_type: return dct = {'right_record': self.left_record, 'left_record': self.right_record, 'relation_type': sym_rel_type} self.__class__.objects.get_or_create(**dct) return self def post_delete_record_relation(sender, instance, **kwargs): # delete symmetrical or inverse relation sym_rel_type = instance.relation_type if not instance.relation_type.symmetrical: sym_rel_type = instance.relation_type.inverse_relation # no symetric/inverse is defined if not sym_rel_type: return dct = {'right_record_id': instance.left_record_id, 'left_record_id': instance.right_record_id, 'relation_type': sym_rel_type} q = instance.__class__.objects.filter(**dct) if q.count(): q.delete() class SearchQuery(models.Model): label = models.TextField(_(u"Label"), blank=True) query = models.TextField(_(u"Query"), blank=True) content_type = models.ForeignKey(ContentType, verbose_name=_(u"Content type")) profile = models.ForeignKey("UserProfile", verbose_name=_(u"Profile")) is_alert = models.BooleanField(_(u"Is an alert"), default=False) class Meta: verbose_name = _(u"Search query") verbose_name_plural = _(u"Search queries") ordering = ['label'] def __unicode__(self): return unicode(self.label) class ShortMenuItem(object): """ Item available in the short menu """ UP_MODEL_QUERY = {} @classmethod def get_short_menu_class(cls, pk): return '' @property def short_class_name(self): return "" class QuickAction(object): """ Quick action available from tables """ def __init__(self, url, icon_class='', text='', target=None, rights=None, module=None): self.url = url self.icon_class = icon_class self.text = text self.rights = rights self.target = target self.module = module assert self.target in ('one', 'many', None) def is_available(self, user, session=None, obj=None): if self.module and not getattr(get_current_profile(), self.module): return False if not self.rights: # no restriction return True if not user or not hasattr(user, 'ishtaruser') or not user.ishtaruser: return False user = user.ishtaruser for right in self.rights: if user.has_perm(right, session=session, obj=obj): return True return False @property def rendered_icon(self): if not self.icon_class: return "" return u"".format(self.icon_class) @property def base_url(self): if self.target is None: url = reverse(self.url) else: # put arbitrary pk for the target url = reverse(self.url, args=[0]) url = url[:-2] # all quick action url have to finish with the # pk of the selected item and a "/" return url class MainItem(ShortMenuItem): """ Item with quick actions available from tables """ QUICK_ACTIONS = [] @classmethod def get_quick_actions(cls, user, session=None, obj=None): """ Get a list of (url, title, icon, target) actions for an user """ qas = [] for action in cls.QUICK_ACTIONS: if not action.is_available(user, session=session, obj=obj): continue qas.append([action.base_url, mark_safe(action.text), mark_safe(action.rendered_icon), action.target or ""]) return qas class LightHistorizedItem(BaseHistorizedItem): history_date = models.DateTimeField(default=datetime.datetime.now) class Meta: abstract = True def save(self, *args, **kwargs): super(LightHistorizedItem, self).save(*args, **kwargs) return self PARSE_FORMULA = re.compile("{([^}]*)}") def _deduplicate(value): new_values = [] for v in value.split(u'-'): if v not in new_values: new_values.append(v) return u'-'.join(new_values) FORMULA_FILTERS = { 'upper': lambda x: x.upper(), 'lower': lambda x: x.lower(), 'capitalize': lambda x: x.capitalize(), 'slug': lambda x: slugify(x), 'deduplicate': _deduplicate } def get_external_id(key, item): profile = get_current_profile() if not hasattr(profile, key): return formula = getattr(profile, key) dct = {} for fkey in PARSE_FORMULA.findall(formula): filtered = fkey.split(u'|') initial_key = fkey[:] fkey = filtered[0] filters = [] for filtr in filtered[1:]: if filtr in FORMULA_FILTERS: filters.append(FORMULA_FILTERS[filtr]) if fkey.startswith('settings__'): dct[fkey] = getattr(settings, fkey[len('settings__'):]) or '' continue obj = item for k in fkey.split('__'): try: obj = getattr(obj, k) except ObjectDoesNotExist: obj = None if callable(obj): obj = obj() if obj is None: break if obj is None: dct[initial_key] = '' else: dct[initial_key] = unicode(obj) for filtr in filters: dct[initial_key] = filtr(dct[initial_key]) values = formula.format(**dct).split('||') value = values[0] for filtr in values[1:]: if filtr not in FORMULA_FILTERS: value += u'||' + filtr continue value = FORMULA_FILTERS[filtr](value) return value CURRENCY = ((u"€", _(u"Euro")), (u"$", _(u"US dollar"))) FIND_INDEX_SOURCE = ((u"O", _(u"Operations")), (u"CR", _(u"Context records"))) SITE_LABELS = [('site', _(u"Site")), ('entity', _(u"Archaeological entity"))] TRANSLATED_SITE_LABELS = { 'site': { 'search': _(u"Site search"), 'new': _(u"New site"), 'modification': _(u"Site modification"), 'deletion': _(u"Site deletion"), "attached-to-operation": _(u"Site (attached to the " u"operation)"), "name-attached-to-operation": _(u"Site name (attached " u"to the operation)"), "attached-to-cr": _(u"Site (attached to the context " u"record)"), "name-attached-to-cr": _(u"Site name (attached to the context record)"), }, 'entity': { 'search': _(u"Archaeological entity search"), 'new': _(u"New archaeological entity"), 'modification': _(u"Archaeological entity modification"), 'deletion': _(u"Archaeological entity deletion"), "attached-to-operation": _(u"Archaeological entity (attached to the " u"operation)"), "name-attached-to-operation": _(u"Archaeological entity name (attached " u"to the operation)"), "attached-to-cr": _(u"Archaeological entity (attached to the context " u"record)"), "name-attached-to-cr": _(u"Archaeological entity name (attached to the context record)"), }, } class IshtarSiteProfile(models.Model, Cached): slug_field = 'slug' label = models.TextField(_(u"Name")) slug = models.SlugField(_(u"Slug"), unique=True) active = models.BooleanField(_(u"Current active"), default=False) experimental_feature = models.BooleanField( _(u"Activate experimental feature"), default=False) description = models.TextField(_(u"Description"), null=True, blank=True) config = models.CharField( _(u"Alternate configuration"), max_length=200, choices=ALTERNATE_CONFIGS_CHOICES, help_text=_(u"Choose an alternate configuration for label, " u"index management"), null=True, blank=True ) files = models.BooleanField(_(u"Files module"), default=False) archaeological_site = models.BooleanField( _(u"Archaeological site module"), default=False) archaeological_site_label = models.CharField( _(u"Archaeological site type"), max_length=200, choices=SITE_LABELS, default='site' ) context_record = models.BooleanField(_(u"Context records module"), default=False) find = models.BooleanField(_(u"Finds module"), default=False, help_text=_(u"Need context records module")) find_index = models.CharField( _(u"Find index is based on"), default='O', max_length=2, choices=FIND_INDEX_SOURCE, help_text=_(u"To prevent irrelevant indexes, change this parameter " u"only if there is no find in the database")) warehouse = models.BooleanField( _(u"Warehouses module"), default=False, help_text=_(u"Need finds module")) preservation = models.BooleanField(_(u"Preservation module"), default=False) mapping = models.BooleanField(_(u"Mapping module"), default=False) locate_warehouses = models.BooleanField( _(u"Locate warehouse and containers"), default=False, help_text=_( u"Mapping module must be activated. With many containers and " u"background task not activated, activating this option may " u"consume many resources.") ) use_town_for_geo = models.BooleanField( _(u"Use town to locate when coordinates are missing"), default=True) underwater = models.BooleanField(_(u"Underwater module"), default=False) parcel_mandatory = models.BooleanField( _(u"Parcel are mandatory for context records"), default=True) homepage = models.TextField( _(u"Home page"), null=True, blank=True, help_text=_(u"Homepage of Ishtar - if not defined a default homepage " u"will appear. Use the markdown syntax. {random_image} " u"can be used to display a random image.")) operation_prefix = models.CharField( _(u"Main operation code prefix"), default=u'OA', null=True, blank=True, max_length=20 ) default_operation_prefix = models.CharField( _(u"Default operation code prefix"), default=u'OP', null=True, blank=True, max_length=20 ) operation_region_code = models.CharField( _(u"Operation region code"), null=True, blank=True, max_length=5 ) file_external_id = models.TextField( _(u"File external id"), default=u"{year}-{numeric_reference}", help_text=_(u"Formula to manage file external ID. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) parcel_external_id = models.TextField( _(u"Parcel external id"), default=u"{associated_file__external_id}{operation__code_patriarche}-" u"{town__numero_insee}-{section}{parcel_number}", help_text=_(u"Formula to manage parcel external ID. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) context_record_external_id = models.TextField( _(u"Context record external id"), default=u"{parcel__external_id}-{label}", help_text=_(u"Formula to manage context record external ID. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) base_find_external_id = models.TextField( _(u"Base find external id"), default=u"{context_record__external_id}-{label}", help_text=_(u"Formula to manage base find external ID. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) find_external_id = models.TextField( _(u"Find external id"), default=u"{get_first_base_find__context_record__external_id}-{label}", help_text=_(u"Formula to manage find external ID. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) container_external_id = models.TextField( _(u"Container external id"), default=u"{responsible__external_id}-{index}", help_text=_(u"Formula to manage container external ID. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) warehouse_external_id = models.TextField( _(u"Warehouse external id"), default=u"{name|slug}", help_text=_(u"Formula to manage warehouse external ID. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) document_external_id = models.TextField( _(u"Document external id"), default=u"{index}", help_text=_(u"Formula to manage document external ID. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) person_raw_name = models.TextField( _(u"Raw name for person"), default=u"{name|upper} {surname}", help_text=_(u"Formula to manage person raw_name. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) find_use_index = models.BooleanField(_(u"Use auto index for finds"), default=True) currency = models.CharField(_(u"Currency"), default=u"€", choices=CURRENCY, max_length=5) default_center = models.PointField( _(u"Maps - default center"), default='SRID=4326;POINT(2.4397 46.5528)') default_zoom = models.IntegerField( _(u"Maps - default zoom"), default=6) class Meta: verbose_name = _(u"Ishtar site profile") verbose_name_plural = _(u"Ishtar site profiles") ordering = ['label'] def __unicode__(self): return unicode(self.label) def has_overload(self, key): return self.config and self.config in ALTERNATE_CONFIGS and \ hasattr(ALTERNATE_CONFIGS[self.config], key) @classmethod def get_current_profile(cls, force=False): cache_key, value = get_cache(cls, ['is-current-profile']) if value and not force: return value q = cls.objects.filter(active=True) if not q.count(): obj = cls.objects.create( label="Default profile", slug='default', active=True) else: obj = q.all()[0] cache.set(cache_key, obj, settings.CACHE_TIMEOUT) return obj @classmethod def get_default_site_label(cls, key=None): return cls.get_current_profile().get_site_label(key) def get_site_label(self, key=None): if not key: return unicode(dict(SITE_LABELS)[self.archaeological_site_label]) return unicode( TRANSLATED_SITE_LABELS[self.archaeological_site_label][key] ) def save(self, *args, **kwargs): raw = False if 'raw' in kwargs: raw = kwargs.pop('raw') super(IshtarSiteProfile, self).save(*args, **kwargs) obj = self if raw: return obj q = self.__class__.objects.filter(active=True).exclude(slug=self.slug) if obj.active and q.count(): for profile in q.all(): profile.active = False profile.save(raw=True) changed = False if not obj.active and not q.count(): obj.active = True changed = True if obj.warehouse and not obj.find: obj.find = True changed = True if obj.find and not obj.context_record: obj.context_record = True changed = True if changed: obj = obj.save(raw=True) return obj def get_current_profile(force=False): return IshtarSiteProfile.get_current_profile(force=force) def cached_site_changed(sender, **kwargs): get_current_profile(force=True) from ishtar_common.menus import Menu MAIN_MENU = Menu(None) MAIN_MENU.init() MAIN_MENU.reinit_menu_for_all_user() post_save.connect(cached_site_changed, sender=IshtarSiteProfile) post_delete.connect(cached_site_changed, sender=IshtarSiteProfile) class CustomForm(models.Model): name = models.CharField(_(u"Name"), max_length=250) form = models.CharField(_(u"Form"), max_length=250) available = models.BooleanField(_(u"Available"), default=True) enabled = models.BooleanField( _(u"Enable this form"), default=True, help_text=_(u"Disable with caution: disabling a form with mandatory " u"fields may lead to database errors.")) apply_to_all = models.BooleanField( _(u"Apply to all"), default=False, help_text=_(u"Apply this form to all users. If set to True, selecting " u"user and user type is useless.")) users = models.ManyToManyField('IshtarUser', blank=True) user_types = models.ManyToManyField('PersonType', blank=True) class Meta: verbose_name = _(u"Custom form") verbose_name_plural = _(u"Custom forms") ordering = ['name', 'form'] def __unicode__(self): return u"{} - {}".format(self.name, self.form) def users_lbl(self): users = [unicode(user) for user in self.users.all()] return " ; ".join(users) users_lbl.short_description = _(u"Users") def user_types_lbl(self): user_types = [unicode(u) for u in self.user_types.all()] return " ; ".join(user_types) user_types_lbl.short_description = _(u"User types") @classmethod def register(cls): if hasattr(cls, '_register') and hasattr(cls, '_register_fields'): return cls._register, cls._register_fields cache_key, value = get_cache(cls.__class__, ['dct-forms'], app_label='ishtar_common') cache_key_fields, value_fields = get_cache( cls.__class__, ['dct-fields'], app_label='ishtar_common') if value and value_fields: cls._register = value cls._register_fields = value_fields return cls._register, cls._register_fields cls._register, cls._register_fields = {}, {} # ideally should be improved but only used in admin from ishtar_common.admin import ISHTAR_FORMS from ishtar_common.forms import CustomForm as CustomFormForm for app_form in ISHTAR_FORMS: app_name = app_form.__package__ if app_name == "archaeological_files_pdl": app_name = "archaeological_files" for form in dir(app_form): if 'Form' not in form and 'Select' not in form: # not very clean... but do not treat inappropriate items continue form = getattr(app_form, form) if not inspect.isclass(form) \ or not issubclass(form, CustomFormForm) \ or not getattr(form, 'form_slug', None): continue model_name = form.form_slug.split('-')[0].replace('_', '') if app_name not in cls._register_fields: cls._register_fields[app_name] = [] if model_name not in cls._register_fields[app_name]: cls._register_fields[app_name].append(model_name) cls._register[form.form_slug] = form return cls._register, cls._register_fields def get_form_class(self): register, register_fields = self.register() if self.form not in self._register: return return register[self.form] def get_available_json_fields(self): register, register_fields = self.register() if self.form not in self._register: return [] current_form = register[self.form] app_name = current_form.__module__.split('.')[0] if app_name == "archaeological_files_pdl": app_name = "archaeological_files" if app_name not in register_fields: return [] res = [] for model_name in register_fields[app_name]: q = ContentType.objects.filter(app_label=app_name, model=model_name) if not q.count(): continue ct = q.all()[0] for json_field in JsonDataField.objects.filter( content_type=ct).all(): res.append((json_field.pk, u"{} ({})".format( json_field.name, dict(JSON_VALUE_TYPES)[json_field.value_type]))) return res class ExcludedField(models.Model): custom_form = models.ForeignKey(CustomForm, related_name='excluded_fields') field = models.CharField(_(u"Field"), max_length=250) class Meta: verbose_name = _(u"Excluded field") verbose_name_plural = _(u"Excluded fields") class CustomFormJsonField(models.Model): custom_form = models.ForeignKey(CustomForm, related_name='json_fields') json_field = models.ForeignKey(JsonDataField, related_name='custom_form_details') label = models.CharField(_(u"Label"), max_length=200, blank=True, default='') order = models.IntegerField(verbose_name=_(u"Order"), default=1) help_text = models.TextField(_(u"Help"), blank=True, null=True) class Meta: verbose_name = _(u"Custom form - Json data field") verbose_name_plural = _(u"Custom form - Json data fields") class GlobalVar(models.Model, Cached): slug = models.SlugField(_(u"Variable name"), unique=True) description = models.TextField(_(u"Description of the variable"), null=True, blank=True) value = models.TextField(_(u"Value"), null=True, blank=True) class Meta: verbose_name = _(u"Global variable") verbose_name_plural = _(u"Global variables") ordering = ['slug'] def __unicode__(self): return unicode(self.slug) def cached_globalvar_changed(sender, **kwargs): if not kwargs['instance']: return var = kwargs['instance'] cache_key, value = get_cache(GlobalVar, var.slug) cache.set(cache_key, var.value, settings.CACHE_TIMEOUT) post_save.connect(cached_globalvar_changed, sender=GlobalVar) class UserDashboard: def __init__(self): types = IshtarUser.objects.values('person__person_types', 'person__person_types__label') self.types = types.annotate(number=Count('pk')) \ .order_by('person__person_types') class DashboardFormItem(object): """ Provide methods to manage statistics """ def _get_or_set_stats(self, funcname, update, timeout=settings.CACHE_TIMEOUT): key, val = get_cache(self.__class__, [funcname, self.pk]) if not update and val is not None: return val val = getattr(self, funcname)() cache.set(key, val, timeout) return val @classmethod def get_periods(cls, slice='month', fltr={}, date_source='creation'): date_var = date_source + '_date' q = cls.objects.filter(**{date_var + '__isnull': False}) if fltr: q = q.filter(**fltr) if slice == 'year': return [res[date_var].year for res in list(q.values(date_var) .annotate( Count("id")).order_by())] elif slice == 'month': return [(res[date_var].year, res[date_var].month) for res in list(q.values(date_var) .annotate(Count("id")).order_by())] return [] @classmethod def get_by_year(cls, year, fltr={}, date_source='creation'): date_var = date_source + '_date' q = cls.objects.filter(**{date_var + '__isnull': False}) if fltr: q = q.filter(**fltr) return q.filter( **{date_var + '__year': year}).order_by('pk').distinct('pk') @classmethod def get_by_month(cls, year, month, fltr={}, date_source='creation'): date_var = date_source + '_date' q = cls.objects.filter(**{date_var + '__isnull': False}) if fltr: q = q.filter(**fltr) q = q.filter( **{date_var + '__year': year, date_var + '__month': month}) return q.order_by('pk').distinct('pk') @classmethod def get_total_number(cls, fltr=None): q = cls.objects if fltr: q = q.filter(**fltr) return q.order_by('pk').distinct('pk').count() class Dashboard(object): def __init__(self, model, slice='year', date_source=None, show_detail=None, fltr=None): if not fltr: fltr = {} # don't provide date_source if it is not relevant self.model = model self.total_number = model.get_total_number(fltr) self.show_detail = show_detail history_model = self.model.history.model # last edited - created self.recents, self.lasts = [], [] for last_lst, modif_type in ((self.lasts, '+'), (self.recents, '~')): last_ids = history_model.objects.values('id') \ .annotate(hd=Max('history_date')) last_ids = last_ids.filter(history_type=modif_type) from archaeological_finds.models import Find if self.model == Find: last_ids = last_ids.filter( downstream_treatment_id__isnull=True) if modif_type == '+': last_ids = last_ids.filter( upstream_treatment_id__isnull=True) last_ids = last_ids.order_by('-hd').distinct().all()[:5] for idx in last_ids: try: obj = self.model.objects.get(pk=idx['id']) except: # deleted object are always referenced in history continue obj.history_date = idx['hd'] last_lst.append(obj) # years base_kwargs = {'fltr': fltr.copy()} if date_source: base_kwargs['date_source'] = date_source periods_kwargs = copy.deepcopy(base_kwargs) periods_kwargs['slice'] = slice self.periods = model.get_periods(**periods_kwargs) self.periods = list(set(self.periods)) self.periods.sort() if not self.total_number or not self.periods: return kwargs_num = copy.deepcopy(base_kwargs) self.serie_labels = [_(u"Total")] # numbers if slice == 'year': self.values = [('year', "", list(reversed(self.periods)))] self.numbers = [model.get_by_year(year, **kwargs_num).count() for year in self.periods] self.values += [('number', _(u"Number"), list(reversed(self.numbers)))] if slice == 'month': periods = list(reversed(self.periods)) self.periods = ["%d-%s-01" % (p[0], ('0' + str(p[1])) if len(str(p[1])) == 1 else p[1]) for p in periods] self.values = [('month', "", self.periods)] if show_detail: for dpt, lbl in settings.ISHTAR_DPTS: self.serie_labels.append(unicode(dpt)) idx = 'number_' + unicode(dpt) kwargs_num['fltr']["towns__numero_insee__startswith"] = \ unicode(dpt) numbers = [model.get_by_month(*p.split('-')[:2], **kwargs_num).count() for p in self.periods] self.values += [(idx, dpt, list(numbers))] # put "Total" at the end self.serie_labels.append(self.serie_labels.pop(0)) kwargs_num = base_kwargs.copy() self.numbers = [model.get_by_month(*p.split('-')[:2], **kwargs_num).count() for p in self.periods] self.values += [('number', _(u"Total"), list(self.numbers))] # calculate self.average = self.get_average() self.variance = self.get_variance() self.standard_deviation = self.get_standard_deviation() self.median = self.get_median() self.mode = self.get_mode() # by operation if not hasattr(model, 'get_by_operation'): return operations = model.get_operations() operation_numbers = [model.get_by_operation(op).count() for op in operations] # calculate self.operation_average = self.get_average(operation_numbers) self.operation_variance = self.get_variance(operation_numbers) self.operation_standard_deviation = self.get_standard_deviation( operation_numbers) self.operation_median = self.get_median(operation_numbers) operation_mode_pk = self.get_mode(dict(zip(operations, operation_numbers))) if operation_mode_pk: from archaeological_operations.models import Operation self.operation_mode = unicode(Operation.objects .get(pk=operation_mode_pk)) def get_average(self, vals=[]): if not vals: vals = self.numbers[:] return sum(vals) / len(vals) def get_variance(self, vals=[]): if not vals: vals = self.numbers[:] avrg = self.get_average(vals) return self.get_average([(x - avrg) ** 2 for x in vals]) def get_standard_deviation(self, vals=[]): if not vals: vals = self.numbers[:] return round(self.get_variance(vals) ** 0.5, 3) def get_median(self, vals=[]): if not vals: vals = self.numbers[:] len_vals = len(vals) vals.sort() if (len_vals % 2) == 1: return vals[len_vals / 2] else: return (vals[len_vals / 2 - 1] + vals[len_vals / 2]) / 2.0 def get_mode(self, vals={}): if not vals: vals = dict(zip(self.periods, self.numbers[:])) mx = max(vals.values()) for v in vals: if vals[v] == mx: return v class DocumentTemplate(models.Model): CLASSNAMES = (('archaeological_operations.models.AdministrativeAct', _(u"Administrative Act")),) name = models.CharField(_(u"Name"), max_length=100) slug = models.SlugField(_(u"Slug"), blank=True, null=True, max_length=100, unique=True) template = models.FileField( _(u"Template"), upload_to="templates/%Y/", help_text=max_size_help()) associated_object_name = models.CharField( _(u"Associated object"), max_length=100, choices=CLASSNAMES) available = models.BooleanField(_(u"Available"), default=True) objects = SlugModelManager() class Meta: verbose_name = _(u"Document template") verbose_name_plural = _(u"Document templates") ordering = ['associated_object_name', 'name'] def __unicode__(self): return self.name def natural_key(self): return (self.slug,) def save(self, *args, **kwargs): if not self.slug: self.slug = create_slug(DocumentTemplate, self.name) return super(DocumentTemplate, self).save(*args, **kwargs) @classmethod def get_tuples(cls, dct={}, empty_first=True): dct['available'] = True if empty_first: yield ('', '----------') items = cls.objects.filter(**dct) for item in items.distinct().order_by(*cls._meta.ordering).all(): yield (item.pk, _(unicode(item))) def publish(self, c_object): tempdir = tempfile.mkdtemp("-ishtardocs") output_name = tempdir + os.path.sep + \ slugify(self.name.replace(' ', '_').lower()) + u'-' + \ datetime.date.today().strftime('%Y-%m-%d') + \ u"." + self.template.name.split('.')[-1] values = c_object.get_values() engine = SecretaryRenderer() try: result = engine.render(self.template, **values) except TemplateSyntaxError as e: raise TemplateSyntaxError(e.message, e.lineno) output = open(output_name, 'wb') output.write(result) return output_name def convert_from_v1(self): """ Convert the current template from v1 to v2. """ from old.ooo_replace import ooo_replace from archaeological_operations.models import AdministrativeAct old_dir = settings.MEDIA_ROOT + "/templates/v1/" if not os.path.exists(old_dir): os.makedirs(old_dir) shutil.copy(settings.MEDIA_ROOT + self.template.name, old_dir) tempdir = tempfile.mkdtemp("-ishtardocs") output_name = tempdir + os.path.sep + self.template.name.split( os.sep)[-1] objects = [] filters = [ {'operation__isnull': False}, {'associated_file__isnull': False}, {'treatment_file__isnull': False}, {'treatment__isnull': False}, ] for filtr in filters: q = AdministrativeAct.objects.filter(**filtr) if q.count(): objects.append(q.all()[0]) if not objects: return values = {} for obj in objects: values.update(obj.get_values()) for key in values: values[key] = "{{ " + key + " }}" ooo_replace(self.template, output_name, values) shutil.move(output_name, settings.MEDIA_ROOT + self.template.name) return output_name class NumberManager(models.Manager): def get_by_natural_key(self, number): return self.get(number=number) class State(models.Model): label = models.CharField(_(u"Label"), max_length=30) number = models.CharField(_(u"Number"), unique=True, max_length=3) objects = NumberManager() class Meta: verbose_name = _(u"State") ordering = ['number'] def __unicode__(self): return self.label def natural_key(self): return (self.number,) class Department(models.Model): label = models.CharField(_(u"Label"), max_length=30) number = models.CharField(_(u"Number"), unique=True, max_length=3) state = models.ForeignKey( 'State', verbose_name=_(u"State"), blank=True, null=True, on_delete=models.SET_NULL, ) objects = NumberManager() class Meta: verbose_name = _(u"Department") verbose_name_plural = _(u"Departments") ordering = ['number'] def __unicode__(self): return self.label def natural_key(self): return (self.number,) def history_compress(self): return self.number @classmethod def history_decompress(cls, full_value, create=False): if not full_value: return [] res = [] for value in full_value: try: res.append(cls.objects.get(number=value)) except cls.DoesNotExist: continue return res class Arrondissement(models.Model): name = models.CharField(u"Nom", max_length=30) department = models.ForeignKey(Department, verbose_name=u"Département") def __unicode__(self): return settings.JOINT.join((self.name, unicode(self.department))) class Canton(models.Model): name = models.CharField(u"Nom", max_length=30) arrondissement = models.ForeignKey(Arrondissement, verbose_name=u"Arrondissement") def __unicode__(self): return settings.JOINT.join( (self.name, unicode(self.arrondissement))) class TownManager(models.GeoManager): def get_by_natural_key(self, numero_insee, year): return self.get(numero_insee=numero_insee, year=year) class Town(Imported, models.Model): name = models.CharField(_(u"Name"), max_length=100) surface = models.IntegerField(_(u"Surface (m2)"), blank=True, null=True) center = models.PointField(_(u"Localisation"), srid=settings.SRID, blank=True, null=True) limit = models.MultiPolygonField(_(u"Limit"), blank=True, null=True) numero_insee = models.CharField(u"Code commune (numéro INSEE)", max_length=120) departement = models.ForeignKey( Department, verbose_name=_(u"Department"), on_delete=models.SET_NULL, null=True, blank=True) year = models.IntegerField( _("Year of creation"), null=True, blank=True, help_text=_(u"Filling this field is relevant to distinguish old towns " u"from new towns.")) children = models.ManyToManyField( 'Town', verbose_name=_(u"Town children"), blank=True, related_name='parents') cached_label = models.CharField(_(u"Cached name"), max_length=500, null=True, blank=True, db_index=True) objects = TownManager() class Meta: verbose_name = _(u"Town") verbose_name_plural = _(u"Towns") if settings.COUNTRY == 'fr': ordering = ['numero_insee'] unique_together = (('numero_insee', 'year'),) def natural_key(self): return (self.numero_insee, self.year) def history_compress(self): values = {'numero_insee': self.numero_insee, 'year': self.year or ""} return values @classmethod def history_decompress(cls, full_value, create=False): if not full_value: return [] res = [] for value in full_value: try: res.append( cls.objects.get(numero_insee=value['numero_insee'], year=value['year'] or None)) except cls.DoesNotExist: continue return res def __unicode__(self): if self.cached_label: return self.cached_label self.save() return self.cached_label @property def label_with_areas(self): label = [self.name] if self.numero_insee: label.append(u"({})".format(self.numero_insee)) for area in self.areas.all(): label.append(u" - ") label.append(area.full_label) return u" ".join(label) def generate_geo(self, force=False): force = self.generate_limit(force=force) self.generate_center(force=force) self.generate_area(force=force) def generate_limit(self, force=False): if not force and self.limit: return parents = None if not self.parents.count(): return for parent in self.parents.all(): if not parent.limit: return if not parents: parents = parent.limit else: parents = parents.union(parent.limit) # if union is a simple polygon make it a multi if 'MULTI' not in parents.wkt: parents = parents.wkt.replace('POLYGON', 'MULTIPOLYGON(') + ")" if not parents: return self.limit = parents self.save() return True def generate_center(self, force=False): if not force and (self.center or not self.limit): return self.center = self.limit.centroid if not self.center: return False self.save() return True def generate_area(self, force=False): if not force and (self.surface or not self.limit): return surface = self.limit.transform(settings.SURFACE_SRID, clone=True).area if surface > 214748364 or not surface: return False self.surface = surface self.save() return True def update_town_code(self): if not self.numero_insee or not self.children.count() or not self.year: return old_num = self.numero_insee[:] numero = old_num.split('-')[0] self.numero_insee = u"{}-{}".format(numero, self.year) if self.numero_insee != old_num: return True def _generate_cached_label(self): cached_label = self.name if settings.COUNTRY == "fr" and self.numero_insee: dpt_len = 2 if self.numero_insee.startswith('97') or \ self.numero_insee.startswith('98') or \ self.numero_insee[0] not in ('0', '1', '2', '3', '4', '5', '6', '7', '8', '9'): dpt_len = 3 cached_label = u"%s - %s" % (self.name, self.numero_insee[:dpt_len]) if self.year and self.children.count(): cached_label += u" ({})".format(self.year) return cached_label def post_save_town(sender, **kwargs): cached_label_changed(sender, **kwargs) town = kwargs['instance'] town.generate_geo() if town.update_town_code(): town.save() post_save.connect(post_save_town, sender=Town) def town_child_changed(sender, **kwargs): town = kwargs['instance'] if town.update_town_code(): town.save() m2m_changed.connect(town_child_changed, sender=Town.children.through) class Area(HierarchicalType): towns = models.ManyToManyField(Town, verbose_name=_(u"Towns"), blank=True, related_name='areas') reference = models.CharField(_(u"Reference"), max_length=200, blank=True, null=True) parent = models.ForeignKey( 'self', blank=True, null=True, verbose_name=_(u"Parent"), help_text=_(u"Only four level of parent are managed."), related_name='children', on_delete=models.SET_NULL ) class Meta: verbose_name = _(u"Area") verbose_name_plural = _(u"Areas") ordering = ('label',) def __unicode__(self): if not self.reference: return self.label return u"{} ({})".format(self.label, self.reference) @property def full_label(self): label = [unicode(self)] if self.parent: label.append(self.parent.full_label) return u" / ".join(label) class Address(BaseHistorizedItem): address = models.TextField(_(u"Address"), null=True, blank=True) address_complement = models.TextField(_(u"Address complement"), null=True, blank=True) postal_code = models.CharField(_(u"Postal code"), max_length=10, null=True, blank=True) town = models.CharField(_(u"Town (freeform)"), max_length=150, null=True, blank=True) precise_town = models.ForeignKey( Town, verbose_name=_(u"Town (precise)"), null=True, blank=True) country = models.CharField(_(u"Country"), max_length=30, null=True, blank=True) alt_address = models.TextField(_(u"Other address: address"), null=True, blank=True) alt_address_complement = models.TextField( _(u"Other address: address complement"), null=True, blank=True) alt_postal_code = models.CharField(_(u"Other address: postal code"), max_length=10, null=True, blank=True) alt_town = models.CharField(_(u"Other address: town"), max_length=70, null=True, blank=True) alt_country = models.CharField(_(u"Other address: country"), max_length=30, null=True, blank=True) phone = models.CharField(_(u"Phone"), max_length=18, null=True, blank=True) phone_desc = models.CharField(_(u"Phone description"), max_length=300, null=True, blank=True) phone2 = models.CharField(_(u"Phone description 2"), max_length=18, null=True, blank=True) phone_desc2 = models.CharField(_(u"Phone description 2"), max_length=300, null=True, blank=True) phone3 = models.CharField(_(u"Phone 3"), max_length=18, null=True, blank=True) phone_desc3 = models.CharField(_(u"Phone description 3"), max_length=300, null=True, blank=True) raw_phone = models.TextField(_(u"Raw phone"), blank=True, null=True) mobile_phone = models.CharField(_(u"Mobile phone"), max_length=18, null=True, blank=True) email = models.EmailField( _(u"Email"), max_length=300, blank=True, null=True) alt_address_is_prefered = models.BooleanField( _(u"Alternative address is prefered"), default=False) history = HistoricalRecords() class Meta: abstract = True def get_town_centroid(self): if self.precise_town: return self.precise_town.center, self._meta.verbose_name def get_town_polygons(self): if self.precise_town: return self.precise_town.limit, self._meta.verbose_name def simple_lbl(self): return unicode(self) def full_address(self): lbl = self.simple_lbl() if lbl: lbl += u"\n" lbl += self.address_lbl() return lbl def address_lbl(self): lbl = u'' prefix = '' if self.alt_address_is_prefered: prefix = 'alt_' if getattr(self, prefix + 'address'): lbl += getattr(self, prefix + 'address') if getattr(self, prefix + 'address_complement'): if lbl: lbl += "\n" lbl += getattr(self, prefix + 'address_complement') postal_code = getattr(self, prefix + 'postal_code') town = getattr(self, prefix + 'town') if postal_code or town: if lbl: lbl += "\n" lbl += u"{}{}{}".format( postal_code or '', " " if postal_code and town else '', town or '') if self.phone: if lbl: lbl += u"\n" lbl += u"{} {}".format(unicode(_("Tel: ")), self.phone) if self.mobile_phone: if lbl: lbl += u"\n" lbl += u"{} {}".format(unicode(_("Mobile: ")), self.mobile_phone) if self.email: if lbl: lbl += u"\n" lbl += u"{} {}".format(unicode(_("Email: ")), self.email) return lbl class Merge(models.Model): merge_key = models.TextField(_("Merge key"), blank=True, null=True) merge_candidate = models.ManyToManyField("self", blank=True) merge_exclusion = models.ManyToManyField("self", blank=True) archived = models.NullBooleanField(default=False, blank=True, null=True) # 1 for one word similarity, 2 for two word similarity, etc. MERGE_CLEMENCY = None EMPTY_MERGE_KEY = '--' class Meta: abstract = True def generate_merge_key(self): if self.archived: return self.merge_key = slugify(self.name if self.name else '') if not self.merge_key: self.merge_key = self.EMPTY_MERGE_KEY self.merge_key = self.merge_key def generate_merge_candidate(self): if self.archived: return if not self.merge_key: self.generate_merge_key() self.save(merge_key_generated=True) if not self.pk or self.merge_key == self.EMPTY_MERGE_KEY: return q = self.__class__.objects \ .exclude(pk=self.pk) \ .exclude(merge_exclusion=self) \ .exclude(merge_candidate=self) \ .exclude(archived=True) if not self.MERGE_CLEMENCY: q = q.filter(merge_key=self.merge_key) else: subkeys_front = u"-".join( self.merge_key.split('-')[:self.MERGE_CLEMENCY]) subkeys_back = u"-".join( self.merge_key.split('-')[-self.MERGE_CLEMENCY:]) q = q.filter(Q(merge_key__istartswith=subkeys_front) | Q(merge_key__iendswith=subkeys_back)) for item in q.all(): self.merge_candidate.add(item) def save(self, *args, **kwargs): # prevent circular save merge_key_generated = False if 'merge_key_generated' in kwargs: merge_key_generated = kwargs.pop('merge_key_generated') self.generate_merge_key() item = super(Merge, self).save(*args, **kwargs) if not merge_key_generated: self.generate_merge_candidate() return item def archive(self): self.archived = True self.save() for m in self.merge_candidate.all(): m.delete() for m in self.merge_exclusion.all(): m.delete() def merge(self, item): merge_model_objects(self, item) self.generate_merge_candidate() class OrganizationType(GeneralType): class Meta: verbose_name = _(u"Organization type") verbose_name_plural = _(u"Organization types") ordering = ('label',) post_save.connect(post_save_cache, sender=OrganizationType) post_delete.connect(post_save_cache, sender=OrganizationType) organization_type_pk_lazy = lazy(OrganizationType.get_or_create_pk, unicode) organization_type_pks_lazy = lazy(OrganizationType.get_or_create_pks, unicode) class OrganizationManager(models.Manager): def get_by_natural_key(self, name, organization_type): return self.get(name=name, organization_type__txt_idx=organization_type) class Organization(Address, Merge, OwnPerms, ValueGetter): TABLE_COLS = ('name', 'organization_type', 'town') SLUG = "organization" SHOW_URL = 'show-organization' # search parameters EXTRA_REQUEST_KEYS = {} BASE_SEARCH_VECTORS = [SearchVectorConfig('name'), SearchVectorConfig('town')] # alternative names of fields for searches ALT_NAMES = { 'name': SearchAltName( pgettext_lazy("key for text search", u"name"), 'name__iexact' ), 'organization_type': SearchAltName( pgettext_lazy("key for text search", u"type"), 'organization_type__label__iexact' ), } objects = OrganizationManager() # fields name = models.CharField(_(u"Name"), max_length=500) organization_type = models.ForeignKey(OrganizationType, verbose_name=_(u"Type")) cached_label = models.TextField(_(u"Cached name"), null=True, blank=True, db_index=True) history = HistoricalRecords() class Meta: verbose_name = _(u"Organization") verbose_name_plural = _(u"Organizations") permissions = ( ("view_organization", u"Can view all Organizations"), ("view_own_organization", u"Can view own Organization"), ("add_own_organization", u"Can add own Organization"), ("change_own_organization", u"Can change own Organization"), ("delete_own_organization", u"Can delete own Organization"), ) def simple_lbl(self): if self.name: return self.name return u"{} - {}".format(self.organization_type, self.town or "") def natural_key(self): return (self.name, self.organization_type.txt_idx) def __unicode__(self): if self.cached_label: return self.cached_label self.save() return self.cached_label def _generate_cached_label(self): if self.name: return self.name attrs = ["organization_type", "address", "town"] items = [unicode(getattr(self, attr)) for attr in attrs if getattr(self, attr)] if not items: items = [unicode(_(u"unknown organization"))] return u" - ".join(items) def generate_merge_key(self): self.merge_key = slugify(self.name if self.name else '') if not self.merge_key: self.merge_key = self.EMPTY_MERGE_KEY if self.town: self.merge_key += "-" + slugify(self.town or '') if self.address: self.merge_key += "-" + slugify(self.address or '') @property def associated_filename(self): values = [unicode(getattr(self, attr)) for attr in ('organization_type', 'name') if getattr(self, attr)] return slugify(u"-".join(values)) post_save.connect(cached_label_changed, sender=Organization) class PersonType(GeneralType): class Meta: verbose_name = _(u"Person type") verbose_name_plural = _(u"Person types") ordering = ('label',) post_save.connect(post_save_cache, sender=PersonType) post_delete.connect(post_save_cache, sender=PersonType) person_type_pk_lazy = lazy(PersonType.get_or_create_pk, unicode) person_type_pks_lazy = lazy(PersonType.get_or_create_pks, unicode) class TitleType(GeneralType): class Meta: verbose_name = _(u"Title type") verbose_name_plural = _(u"Title types") ordering = ('label',) post_save.connect(post_save_cache, sender=TitleType) post_delete.connect(post_save_cache, sender=TitleType) class PersonManager(models.Manager): def get_by_natural_key( self, name, surname, attached_to_name, attached_to_organization_type): q = {"name": name, "surname": surname} if attached_to_name: q['attached_to__name'] = attached_to_name if attached_to_organization_type: q['attached_to__organization_type__txt_idx'] = \ attached_to_organization_type return self.get(**q) class Person(Address, Merge, OwnPerms, ValueGetter): SLUG = "person" _prefix = 'person_' TYPE = ( ('Mr', _(u'Mr')), ('Ms', _(u'Miss')), ('Mr and Miss', _(u'Mr and Mrs')), ('Md', _(u'Mrs')), ('Dr', _(u'Doctor')), ) TABLE_COLS = ('name', 'surname', 'raw_name', 'email', 'person_types_list', 'attached_to', 'town') TABLE_COLS_ACCOUNT = ('name', 'surname', 'raw_name', 'email', 'profiles_list', 'attached_to', 'town') SHOW_URL = 'show-person' MODIFY_URL = 'person_modify' BASE_SEARCH_VECTORS = [ SearchVectorConfig('name'), SearchVectorConfig('surname'), SearchVectorConfig('raw_name'), SearchVectorConfig('town'), SearchVectorConfig('attached_to__name'), SearchVectorConfig('email')] # search parameters REVERSED_BOOL_FIELDS = ['ishtaruser__isnull'] EXTRA_REQUEST_KEYS = { 'ishtaruser__isnull': 'ishtaruser__isnull', 'attached_to': 'attached_to', } COL_LABELS = { 'attached_to': _(u"Organization") } # alternative names of fields for searches ALT_NAMES = { 'name': SearchAltName( pgettext_lazy("key for text search", u"name"), 'name__iexact' ), 'surname': SearchAltName( pgettext_lazy("key for text search", u"surname"), 'surname__iexact' ), 'email': SearchAltName( pgettext_lazy("key for text search", u"email"), 'email__iexact' ), 'person_types': SearchAltName( pgettext_lazy("key for text search", u"type"), 'person_types__label__iexact' ), 'attached_to': SearchAltName( pgettext_lazy("key for text search", u"organization"), 'attached_to__cached_label__iexact' ), 'ishtaruser__isnull': SearchAltName( pgettext_lazy("key for text search", u"has-account"), 'ishtaruser__isnull' ), } objects = PersonManager() # fields old_title = models.CharField(_(u"Title"), max_length=100, choices=TYPE, blank=True, null=True) title = models.ForeignKey(TitleType, verbose_name=_(u"Title"), on_delete=models.SET_NULL, blank=True, null=True) salutation = models.CharField(_(u"Salutation"), max_length=200, blank=True, null=True) surname = models.CharField(_(u"Surname"), max_length=50, blank=True, null=True) name = models.CharField(_(u"Name"), max_length=200, blank=True, null=True) raw_name = models.CharField(_(u"Raw name"), max_length=300, blank=True, null=True) contact_type = models.CharField(_(u"Contact type"), max_length=300, blank=True, null=True) comment = models.TextField(_(u"Comment"), blank=True, null=True) person_types = models.ManyToManyField(PersonType, verbose_name=_(u"Types")) attached_to = models.ForeignKey( 'Organization', related_name='members', on_delete=models.SET_NULL, verbose_name=_(u"Is attached to"), blank=True, null=True) cached_label = models.TextField(_(u"Cached name"), null=True, blank=True, db_index=True) history = HistoricalRecords() class Meta: verbose_name = _(u"Person") verbose_name_plural = _(u"Persons") permissions = ( ("view_person", u"Can view all Persons"), ("view_own_person", u"Can view own Person"), ("add_own_person", u"Can add own Person"), ("change_own_person", u"Can change own Person"), ("delete_own_person", u"Can delete own Person"), ) def natural_key(self): if not self.attached_to: return (self.name, self.surname, '', '') return (self.name, self.surname, self.attached_to.name, self.attached_to.organization_type.txt_idx) @property def full_title(self): return u" ".join( [unicode(getattr(self, attr)) for attr in ['title', 'salutation'] if getattr(self, attr)]) @property def current_profile(self): q = self.profiles.filter(current=True) if q.count(): return q.all()[0] q = self.profiles nb = q.count() if not nb: return # arbitrary set the first one as the current profile = q.all()[0] profile.current = True profile.save() return profile def simple_lbl(self): values = [unicode(getattr(self, attr)) for attr in ('surname', 'name') if getattr(self, attr)] if not values and self.raw_name: values = [self.raw_name] return u" ".join(values) def __unicode__(self): if self.cached_label: return self.cached_label self.save() return self.cached_label def _generate_cached_label(self): lbl = get_external_id('person_raw_name', self) if not lbl: return u"-" if self.attached_to: attached_to = unicode(self.attached_to) lbl += u" ({})".format(attached_to) return lbl def fancy_str(self): values = [""] values += [unicode(getattr(self, attr)) for attr in ('surname', 'name') if getattr(self, attr)] if not values and self.raw_name: values += [self.raw_name] values += [""] if self.attached_to: attached_to = unicode(self.attached_to) if values: values.append(u'-') values.append(attached_to) return u" ".join(values) def get_values(self, prefix=''): values = super(Person, self).get_values(prefix=prefix) if not self.attached_to: values.update( Person.get_empty_values(prefix=prefix + 'attached_to_')) return values person_types_list_lbl = _(u"Types") @property def person_types_list(self): return u", ".join([unicode(pt) for pt in self.person_types.all()]) profiles_list_lbl = _(u"Profiles") @property def profiles_list(self): return u", ".join([unicode(p) for p in self.profiles.all()]) def generate_merge_key(self): if self.name and self.name.strip(): self.merge_key = slugify(self.name.strip()) + \ ((u'-' + slugify(self.surname.strip())) if self.surname else u'') elif self.raw_name and self.raw_name.strip(): self.merge_key = slugify(self.raw_name.strip()) elif self.attached_to: self.merge_key = self.attached_to.merge_key else: self.merge_key = self.EMPTY_MERGE_KEY if self.merge_key != self.EMPTY_MERGE_KEY and self.attached_to: self.merge_key += "-" + self.attached_to.merge_key def is_natural(self): return not self.attached_to def has_right(self, right_name, session=None, obj=None): if '.' in right_name: right_name = right_name.split('.')[-1] res, cache_key = "", "" if session: cache_key = 'session-{}-{}'.format(session.session_key, right_name) res = cache.get(cache_key) if res in (True, False): return res # list all cache key in order to clean them on profile change cache_key_list = 'sessionlist-{}'.format(session.session_key) key_list = cache.get(cache_key_list, []) key_list.append(cache_key) cache.set(cache_key_list, key_list, settings.CACHE_TIMEOUT) if type(right_name) in (list, tuple): res = bool(self.profiles.filter( current=True, profile_type__txt_idx__in=right_name).count()) or \ bool(self.profiles.filter( current=True, profile_type__groups__permissions__codename__in=right_name ).count()) or \ bool(self.ishtaruser.user_ptr.groups.filter( permissions__codename__in=right_name ).count()) or \ bool(self.ishtaruser.user_ptr.user_permissions.filter( codename__in=right_name).count()) else: res = bool( self.profiles.filter( current=True, profile_type__txt_idx=right_name).count()) or \ bool(self.profiles.filter( current=True, profile_type__groups__permissions__codename=right_name ).count()) or \ bool(self.ishtaruser.user_ptr.groups.filter( permissions__codename__in=[right_name] ).count()) or \ bool(self.ishtaruser.user_ptr.user_permissions.filter( codename__in=[right_name]).count()) if session: cache.set(cache_key, res, settings.CACHE_TIMEOUT) return res def full_label(self): values = [] if self.title: values = [self.title.label] values += [unicode(getattr(self, attr)) for attr in ('salutation', 'surname', 'name') if getattr(self, attr)] if not values and self.raw_name: values = [self.raw_name] if self.attached_to: values.append(u"- " + unicode(self.attached_to)) return u" ".join(values) @property def associated_filename(self): values = [unicode(getattr(self, attr)) for attr in ('surname', 'name', 'attached_to') if getattr(self, attr)] return slugify(u"-".join(values)) def docs_q(self): return Document.objects.filter(authors__person=self) def operation_docs_q(self): return Document.objects.filter( authors__person=self, operations__pk__isnull=False) def contextrecord_docs_q(self): return Document.objects.filter( authors__person=self, context_records__pk__isnull=False) def find_docs_q(self): return Document.objects.filter( authors__person=self, finds__pk__isnull=False) def save(self, *args, **kwargs): super(Person, self).save(*args, **kwargs) raw_name = get_external_id('person_raw_name', self) if raw_name and self.raw_name != raw_name: self.raw_name = raw_name self.save() if hasattr(self, 'responsible_town_planning_service'): for fle in self.responsible_town_planning_service.all(): fle.save() # force update of raw_town_planning_service if hasattr(self, 'general_contractor'): for fle in self.general_contractor.all(): fle.save() # force update of raw_general_contractor @classmethod def get_query_owns(cls, ishtaruser): return \ Q(operation_scientist_responsability__collaborators__ishtaruser =ishtaruser) | \ Q(operation_scientist_responsability__scientist__ishtaruser =ishtaruser) | \ Q(operation_collaborator__collaborators__ishtaruser =ishtaruser) | \ Q(operation_collaborator__scientist__ishtaruser=ishtaruser) post_save.connect(cached_label_changed, sender=Person) class ProfileType(GeneralType): groups = models.ManyToManyField(Group, verbose_name=_(u"Groups"), blank=True) class Meta: verbose_name = _(u"Profile type") verbose_name_plural = _(u"Profile types") ordering = ('label',) post_save.connect(post_save_cache, sender=ProfileType) post_delete.connect(post_save_cache, sender=ProfileType) class ProfileTypeSummary(ProfileType): class Meta: proxy = True verbose_name = _(u"Profile type summary") verbose_name_plural = _(u"Profile types summary") class UserProfile(models.Model): name = models.CharField(_(u"Name"), blank=True, default=u"", max_length=100) profile_type = models.ForeignKey( ProfileType, verbose_name=_(u"Profile type")) areas = models.ManyToManyField("Area", verbose_name=_(u"Areas"), blank=True, related_name='profiles') current = models.BooleanField(_(u"Current profile"), default=False) show_field_number = models.BooleanField( _(u"Show field number"), default=False) auto_pin = models.BooleanField(_(u"Automatically pin"), default=False) display_pin_menu = models.BooleanField(_(u"Display pin menu"), default=False) person = models.ForeignKey( Person, verbose_name=_(u"Person"), related_name='profiles') class Meta: verbose_name = _(u"User profile") verbose_name_plural = _(u"User profiles") unique_together = (('name', 'profile_type', 'person'),) def __unicode__(self): lbl = self.name or unicode(self.profile_type) if not self.areas.count(): return lbl return u"{} ({})".format(lbl, u", ".join( [unicode(area) for area in self.areas.all()])) @property def query_towns(self): return Town.objects.filter( Q(areas__profiles=self) | Q(areas__parent__profiles=self) | Q(areas__parent__parent__profiles=self) | Q(areas__parent__parent__parent__profiles=self) | Q(areas__parent__parent__parent__parent__profiles=self) ) @property def area_labels(self): return u", ".join([unicode(area) for area in self.areas.all()]) def duplicate(self, **kwargs): areas = [area for area in self.areas.all()] new_item = self new_item.pk = None name = self.name for key in kwargs: if key == 'name': name = kwargs[key] setattr(new_item, key, kwargs[key]) while UserProfile.objects.filter( name=name, profile_type=self.profile_type, person=self.person).count(): name += unicode(_(u" - duplicate")) new_item.name = name new_item.save() for area in areas: new_item.areas.add(area) return new_item def save(self, force_insert=False, force_update=False, using=None, update_fields=None): super(UserProfile, self).save( force_insert=force_insert, force_update=force_update, using=using, update_fields=update_fields) # only one current profile per user if not self.current: return q = UserProfile.objects.filter( person=self.person, current=True).exclude(pk=self.pk) if not q.count(): return for p in q.all(): p.current = False p.save() def post_save_userprofile(sender, **kwargs): if not kwargs.get('instance'): return instance = kwargs.get('instance') try: instance.person.ishtaruser.show_field_number(update=True) except IshtarUser.DoesNotExist: return post_save.connect(post_save_userprofile, sender=UserProfile) class IshtarUser(FullSearch): SLUG = "ishtaruser" TABLE_COLS = ('username', 'person__name', 'person__surname', 'person__email', 'person__person_types_list', 'person__attached_to__name') BASE_SEARCH_VECTORS = [ SearchVectorConfig('user_ptr__username'), SearchVectorConfig('person__name'), SearchVectorConfig('person__surname'), SearchVectorConfig('person__email'), SearchVectorConfig('person__town'), SearchVectorConfig('person__attached_to__name')] # search parameters EXTRA_REQUEST_KEYS = { 'person__person_types_list': 'person__person_types__label' } COL_LABELS = { 'person__attached_to__name': _(u"Organization"), 'username': _(u"Username") } # alternative names of fields for searches ALT_NAMES = { 'username': SearchAltName( pgettext_lazy("key for text search", u"username"), 'user_ptr__username__iexact' ), 'name': SearchAltName( pgettext_lazy("key for text search", u"name"), 'person__name__iexact' ), 'surname': SearchAltName( pgettext_lazy("key for text search", u"surname"), 'person__surname__iexact' ), 'email': SearchAltName( pgettext_lazy("key for text search", u"email"), 'person__email__iexact' ), 'person_types': SearchAltName( pgettext_lazy("key for text search", u"type"), 'person__person_types__label__iexact' ), 'attached_to': SearchAltName( pgettext_lazy("key for text search", u"organization"), 'person__attached_to__cached_label__iexact' ), } # fields user_ptr = models.OneToOneField(User, primary_key=True, related_name='ishtaruser') person = models.OneToOneField(Person, verbose_name=_(u"Person"), related_name='ishtaruser') advanced_shortcut_menu = models.BooleanField( _(u"Advanced shortcut menu"), default=False) class Meta: verbose_name = _(u"Ishtar user") verbose_name_plural = _(u"Ishtar users") def __unicode__(self): return unicode(self.person) def show_field_number(self, update=False): cache_key, value = get_cache(self.__class__, ['show_field_number']) if not update and value is not None: return value value = False if self.current_profile: value = self.current_profile.show_field_number cache.set(cache_key, value, settings.CACHE_TIMEOUT) return value @property def current_profile_name(self): q = UserProfile.objects.filter(current=True, person__ishtaruser=self) if q.count(): vals = q.values('profile_type__label', 'name').all()[0] return vals['name'] or vals['profile_type__label'] profile = self.person.current_profile if not profile: return u"" return unicode(profile) @property def current_profile(self): return self.person.current_profile @classmethod def set_superuser(cls, user): q = cls.objects.filter(user_ptr=user) if not q.count(): return ishtaruser = q.all()[0] person = ishtaruser.person admin, created = ProfileType.objects.get_or_create( txt_idx='administrator') if user.is_superuser: if UserProfile.objects.filter( profile_type=admin, person=person).count(): return UserProfile.objects.get_or_create( profile_type=admin, person=person, defaults={'current': True}) @classmethod def create_from_user(cls, user): default = user.username surname = user.first_name or default name = user.last_name or default email = user.email person = Person.objects.create(surname=surname, name=name, email=email, history_modifier=user) isht_user = cls.objects.create(user_ptr=user, person=person) return isht_user def has_right(self, right_name, session=None): return self.person.has_right(right_name, session=session) def has_perm(self, perm, model=None, session=None, obj=None): return self.person.has_right(perm, session=session, obj=None) def full_label(self): return self.person.full_label() class Basket(FullSearch, OwnPerms): """ Abstract class for a basket Subclass must be defined with an "items" ManyToManyField """ IS_BASKET = True label = models.CharField(_(u"Label"), max_length=1000) comment = models.TextField(_(u"Comment"), blank=True, null=True) user = models.ForeignKey( IshtarUser, blank=True, null=True, related_name='%(class)ss', on_delete=models.SET_NULL, verbose_name=_(u"Owner")) available = models.BooleanField(_(u"Available"), default=True) shared_with = models.ManyToManyField( IshtarUser, verbose_name=_(u"Shared (read) with"), blank=True, related_name='shared_%(class)ss' ) shared_write_with = models.ManyToManyField( IshtarUser, verbose_name=_(u"Shared (read/edit) with"), blank=True, related_name='shared_write_%(class)ss' ) TABLE_COLS = ['label', 'user'] BASE_SEARCH_VECTORS = [ SearchVectorConfig('label'), SearchVectorConfig('comment', 'local')] M2M_SEARCH_VECTORS = [SearchVectorConfig('items')] class Meta: abstract = True ordering = ('label', ) unique_together = (('label', 'user'),) def __unicode__(self): return self.label @classmethod def BASE_REQUEST(cls, request): if not request.user or not getattr(request.user, 'ishtaruser', None): return Q(pk=None) ishtaruser = request.user.ishtaruser return Q(user=ishtaruser) | Q(shared_with=ishtaruser) | Q( shared_write_with=ishtaruser) @property def cached_label(self): return unicode(self) @property def full_label(self): return u"{} - {} ({})".format(self.label, self.user, self.items.count()) @classmethod def get_short_menu_class(cls, pk): return 'basket' @property def associated_filename(self): return "{}-{}".format(datetime.date.today().strftime( "%Y-%m-%d"), slugify(self.label)) @classmethod def get_query_owns(cls, ishtaruser): return Q(user=ishtaruser) | Q(shared_with=ishtaruser) | Q( shared_write_with=ishtaruser) @classmethod def get_write_query_owns(cls, ishtaruser): return Q(user=ishtaruser) def duplicate(self, label=None, ishtaruser=None): """ Duplicate the basket. Items in basket are copied but not shared users :param label: if provided use the name :param ishtaruser: if provided an alternate user is used :return: the new basket """ items = list(self.items.all()) new_item = self new_item.pk = None if ishtaruser: new_item.user = ishtaruser if not label: label = new_item.label while self.__class__.objects.filter( label=label, user=new_item.user).count(): label += unicode(_(u" - duplicate")) new_item.label = label new_item.save() for item in items: new_item.items.add(item) return new_item class AuthorType(GeneralType): order = models.IntegerField(_(u"Order"), default=1) class Meta: verbose_name = _(u"Author type") verbose_name_plural = _(u"Author types") ordering = ['order', 'label'] post_save.connect(post_save_cache, sender=AuthorType) post_delete.connect(post_save_cache, sender=AuthorType) class AuthorManager(models.Manager): def get_by_natural_key( self, name, surname, attached_to_name, attached_to_organization_type, author_type): q = {"person__name": name, "person__surname": surname, "author_type__txt_idx": author_type} if attached_to_name: q['person__attached_to__name'] = attached_to_name if attached_to_organization_type: q['person__attached_to__organization_type__txt_idx'] = \ attached_to_organization_type return self.get(**q) class Author(FullSearch): PARENT_SEARCH_VECTORS = ['person'] SLUG = "author" person = models.ForeignKey(Person, verbose_name=_(u"Person"), related_name='author') author_type = models.ForeignKey(AuthorType, verbose_name=_(u"Author type")) cached_label = models.TextField(_(u"Cached name"), null=True, blank=True, db_index=True) objects = AuthorManager() class Meta: verbose_name = _(u"Author") verbose_name_plural = _(u"Authors") ordering = ('author_type__order', 'person__name') permissions = ( ("view_author", u"Can view all Authors"), ("view_own_author", u"Can view own Author"), ("add_own_author", u"Can add own Author"), ("change_own_author", u"Can change own Author"), ("delete_own_author", u"Can delete own Author"), ) def __unicode__(self): if self.cached_label: return self.cached_label self.save() return self.cached_label def natural_key(self): return self.person.natural_key() + (self.author_type.txt_idx,) def _generate_cached_label(self): return unicode(self.person) + settings.JOINT + \ unicode(self.author_type) def fancy_str(self): return self.person.fancy_str() + settings.JOINT + \ unicode(self.author_type) def related_sources(self): return list(self.treatmentsource_related.all()) + \ list(self.operationsource_related.all()) + \ list(self.findsource_related.all()) + \ list(self.contextrecordsource_related.all()) post_save.connect(cached_label_changed, sender=Author) class SourceType(HierarchicalType): class Meta: verbose_name = _(u"Source type") verbose_name_plural = _(u"Source types") ordering = ['label'] post_save.connect(post_save_cache, sender=SourceType) post_delete.connect(post_save_cache, sender=SourceType) class SupportType(GeneralType): class Meta: verbose_name = _(u"Support type") verbose_name_plural = _(u"Support types") post_save.connect(post_save_cache, sender=SupportType) post_delete.connect(post_save_cache, sender=SupportType) class Format(GeneralType): class Meta: verbose_name = _(u"Format type") verbose_name_plural = _(u"Format types") ordering = ['label'] post_save.connect(post_save_cache, sender=Format) post_delete.connect(post_save_cache, sender=Format) class LicenseType(GeneralType): url = models.URLField(_(u"URL"), blank=True, null=True) class Meta: verbose_name = _(u"License type") verbose_name_plural = _(u"License types") ordering = ('label',) post_save.connect(post_save_cache, sender=LicenseType) post_delete.connect(post_save_cache, sender=LicenseType) class Document(BaseHistorizedItem, OwnPerms, ImageModel): EXTERNAL_ID_KEY = 'document_external_id' # order is important: put the image in the first match found # other will be symbolic links RELATED_MODELS = [ 'treatment_files', 'treatments', 'finds', 'context_records', 'operations', 'sites', 'warehouses', 'containers', 'files' ] # same fields but in order for forms RELATED_MODELS_ALT = [ 'finds', 'context_records', 'operations', 'sites', 'files', 'warehouses', 'containers', 'treatments', 'treatment_files', ] SLUG = 'document' LINK_SPLIT = u"<||>" TABLE_COLS = ['title', 'source_type', 'cache_related_label', 'authors__cached_label', 'associated_url'] COL_LINK = ['associated_url'] BASE_SEARCH_VECTORS = [ SearchVectorConfig("title"), SearchVectorConfig("source_type__label"), SearchVectorConfig("external_id"), SearchVectorConfig("reference"), SearchVectorConfig("description", "local"), SearchVectorConfig("comment", "local"), SearchVectorConfig("additional_information", "local"), ] PARENT_SEARCH_VECTORS = ['authors', ] BOOL_FIELDS = ['duplicate'] COL_LABELS = {"authors__cached_label": _(u"Authors")} CACHED_LABELS = ['cache_related_label'] EXTRA_REQUEST_KEYS = { "operations": "operations__pk", "context_records": "context_records__pk", "context_records__operation": "context_records__operation__pk", "finds": "finds__pk", "finds__base_finds__context_record": "finds__base_finds__context_record__pk", "finds__base_finds__context_record__operation": "finds__base_finds__context_record__operation__pk", 'authors__cached_label': 'authors__cached_label', 'authors__person__pk': 'authors__person__pk', } # alternative names of fields for searches ALT_NAMES = { 'authors': SearchAltName( pgettext_lazy("key for text search", u"author"), 'authors__cached_label__iexact' ), 'title': SearchAltName( pgettext_lazy("key for text search", u"title"), 'title__iexact' ), 'source_type': SearchAltName( pgettext_lazy("key for text search", u"type"), 'source_type__label__iexact' ), 'reference': SearchAltName( pgettext_lazy("key for text search", u"reference"), 'reference__iexact' ), 'internal_reference': SearchAltName( pgettext_lazy("key for text search", u"internal-reference"), 'internal_reference__iexact' ), 'description': SearchAltName( pgettext_lazy("key for text search", u"description"), 'description__iexact' ), 'comment': SearchAltName( pgettext_lazy("key for text search", u"comment"), 'comment__iexact' ), 'additional_information': SearchAltName( pgettext_lazy("key for text search", u"additional-information"), 'additional_information__iexact' ), 'duplicate': SearchAltName( pgettext_lazy("key for text search", u"has-duplicate"), 'duplicate' ), 'operation': SearchAltName( pgettext_lazy("key for text search", u"operation"), 'operations__cached_label__iexact' ), 'context_record': SearchAltName( pgettext_lazy("key for text search", u"context-record"), 'context_records__cached_label__iexact' ), 'find': SearchAltName( pgettext_lazy("key for text search", u"find"), 'finds__cached_label__iexact' ), 'find__denomination': SearchAltName( pgettext_lazy("key for text search", u"find-denomination"), 'finds__denomination__iexact' ), 'file': SearchAltName( pgettext_lazy("key for text search", u"file"), 'files__cached_label__iexact' ), 'container': SearchAltName( pgettext_lazy("key for text search", u"container"), 'containers__cached_label__iexact' ), 'site': SearchAltName( pgettext_lazy("key for text search", u"site"), 'sites__cached_label__iexact' ), 'warehouse': SearchAltName( pgettext_lazy("key for text search", u"warehouse"), 'warehouses__name__iexact' ), } objects = ExternalIdManager() RELATIVE_SESSION_NAMES = [ ('find', 'finds__pk'), ('contextrecord', 'context_records__pk'), ('operation', 'operations__pk'), ('site', 'sites__pk'), ('file', 'files__pk'), ('warehouse', 'warehouses__pk'), ('treatment', 'treatments__pk'), ('treatmentfile', 'treatment_files__pk'), ] UP_MODEL_QUERY = { "operation": (pgettext_lazy("key for text search", u"operation"), 'cached_label'), "contextrecord": (pgettext_lazy("key for text search", u"context-record"), 'cached_label'), "file": (pgettext_lazy("key for text search", u"file"), 'cached_label'), "find": (pgettext_lazy("key for text search", u"find"), 'cached_label'), "site": (pgettext_lazy("key for text search", u"site"), 'cached_label'), "warehouse": (pgettext_lazy("key for text search", u"warehouse"), 'cached_label'), "treatment": (pgettext_lazy("key for text search", u"treatment"), 'cached_label'), "treatmentfile": (pgettext_lazy("key for text search", u"treatment-file"), 'cached_label'), } title = models.TextField(_(u"Title"), blank=True, default='') associated_file = models.FileField( upload_to=get_image_path, blank=True, null=True, max_length=255, help_text=max_size_help()) index = models.IntegerField(verbose_name=_(u"Index"), blank=True, null=True) external_id = models.TextField(_(u"External ID"), null=True, blank=True) reference = models.TextField(_(u"Ref."), null=True, blank=True) internal_reference = models.TextField(_(u"Internal ref."), null=True, blank=True) source_type = models.ForeignKey(SourceType, verbose_name=_(u"Type"), on_delete=models.SET_NULL, null=True, blank=True) licenses = models.ManyToManyField(LicenseType, verbose_name=_(u"License"), blank=True) support_type = models.ForeignKey(SupportType, verbose_name=_(u"Support"), on_delete=models.SET_NULL, blank=True, null=True, ) format_type = models.ForeignKey(Format, verbose_name=_(u"Format"), on_delete=models.SET_NULL, blank=True, null=True) scale = models.CharField(_(u"Scale"), max_length=30, null=True, blank=True) authors = models.ManyToManyField(Author, verbose_name=_(u"Authors"), related_name="documents") authors_raw = models.CharField(verbose_name=_(u"Authors (raw)"), blank=True, null=True, max_length=250) associated_url = models.URLField( blank=True, null=True, max_length=1000, verbose_name=_(u"Numerical ressource (web address)")) receipt_date = models.DateField(blank=True, null=True, verbose_name=_(u"Receipt date")) creation_date = models.DateField(blank=True, null=True, verbose_name=_(u"Creation date")) receipt_date_in_documentation = models.DateField( blank=True, null=True, verbose_name=_(u"Receipt date in documentation")) item_number = models.IntegerField(_(u"Number of items"), default=1) description = models.TextField(_(u"Description"), blank=True, null=True) comment = models.TextField(_(u"Comment"), blank=True, null=True) additional_information = models.TextField(_(u"Additional information"), blank=True, null=True) duplicate = models.NullBooleanField(_(u"Has a duplicate"), blank=True, null=True) associated_links = models.TextField(_(u"Symbolic links"), blank=True, null=True) cache_related_label = models.TextField( _(u"Related"), blank=True, null=True, db_index=True, help_text=_(u"Cached value - do not edit")) class Meta: verbose_name = _(u"Document") verbose_name_plural = _(u"Documents") ordering = ('title',) permissions = ( ("view_document", ugettext(u"Can view all Documents")), ("view_own_document", ugettext(u"Can view own Document")), ("add_own_document", ugettext(u"Can add own Document")), ("change_own_document", ugettext(u"Can change own Document")), ("delete_own_document", ugettext(u"Can delete own Document")), ) def __unicode__(self): return self.title def natural_key(self): return (self.external_id,) """ @property def code(self): if not self.index: return u"{}-".format(self.operation.code_patriarche or '') return u"{}-{:04d}".format(self.operation.code_patriarche or '', self.index) """ @property def images_without_main_image(self): return [] @property def associated_file_name(self): if not self.associated_file: return "" return os.path.basename(self.associated_file.name) @property def images(self): # mimic a queryset pointing to himself return Document.objects.filter( pk=self.pk, image__isnull=False).exclude(image='') @property def main_image(self): if self.images.count(): return self.images.all()[0] @property def has_related(self): for rel in self.RELATED_MODELS: if getattr(self, rel).count(): return True return False @classmethod def get_query_owns(cls, ishtaruser): Operation = cls.operations.rel.related_model ArchaeologicalSite = cls.sites.rel.related_model query_own_list = ( ('operations__', Operation._get_query_owns_dicts(ishtaruser)), ('sites__', ArchaeologicalSite._get_query_owns_dicts(ishtaruser)), ) q = None for prefix, owns in query_own_list: subq = cls._construct_query_own(prefix, owns) if subq: if not q: q = subq else: q |= subq return q def get_associated_operation(self): raise NotImplementedError() @property def associated_filename(self): values = [unicode(getattr(self, attr)) for attr in ('source_type', 'title') if getattr(self, attr)] return slugify(u"-".join(values)) def _get_base_image_paths(self): if self.pk: # m2m not available if not created... for related_model in self.RELATED_MODELS: q = getattr(self, related_model).all() if q.count(): item = q.all()[0] yield item._get_base_image_path() def _get_base_image_path(self): for path in self._get_base_image_paths(): return path return u"upload" def _get_available_filename(self, path, test_link=None): """ Get a filename not used If name already used - generate a name with schema: [base]-[current_number + 1].[suffix] :param path: base path :param test_link: test if an existing path match with this link :return: if test_link is not None, (new_path, link_match) otherwise the new_path """ file_split = path.split('.') suffix, base = "", "" if len(file_split) > 1: base = u".".join(file_split[0:-1]) suffix = file_split[-1] else: base = path base_split = base.split('-') current_nb = 0 if len(base_split) > 1: try: current_nb = int(base_split[-1]) base = u"-".join(base_split[0:-1]) + u"-" except ValueError: pass while os.path.exists(path): if test_link and os.path.islink(path) \ and os.readlink(path) == test_link: return path, True current_nb += 1 path = u"{}-{}.{}".format(base, current_nb, suffix) if test_link: return path, False return path def _move_image(self): """ Move to the relevant path and create appropriate symbolic links :return: list of associated links """ if getattr(self, "_no_move", False): return reference_path = None initial_path = self.image.path filename = os.path.basename(initial_path) links = [] for related_model in self.RELATED_MODELS: q = getattr(self, related_model).all() if q.count(): item = q.all()[0] base_path = item._get_base_image_path() new_path = base_path + u"/" + filename if not reference_path: reference_path = settings.MEDIA_ROOT + new_path # correct path if initial_path == reference_path: continue if not os.path.exists(os.path.dirname(reference_path)): os.makedirs(os.path.dirname(reference_path)) reference_path = self._get_available_filename( reference_path) try: os.rename(initial_path, reference_path) os.rename(self.thumbnail.path, self._get_thumb_name(reference_path)) self.image.name = reference_path[ len(settings.MEDIA_ROOT):] self._no_move = True self.save(no_path_change=True) except OSError: # file probably not on HDD - will be cleaned pass continue # create a link new_path = settings.MEDIA_ROOT + new_path if not os.path.exists(os.path.dirname(new_path)): os.makedirs(os.path.dirname(new_path)) new_path, match = self._get_available_filename( new_path, test_link=reference_path) links.append(new_path) if match: # the current link is correct continue os.symlink(reference_path, new_path) return links def related_label(self): items = [] for rel_attr in reversed(self.RELATED_MODELS): for item in getattr(self, rel_attr).all(): items.append(unicode(item)) return u" ; ".join(items) def _generate_cache_related_label(self): return self.related_label() @classmethod def get_next_index(cls): q = cls.objects.values('index').filter( index__isnull=False).order_by("-index") if not q.count(): return 1 cid = q.all()[0]['index'] if not cid: cid = 0 return cid + 1 def set_index(self): if self.index: return self.index = self.get_next_index() @classmethod @pre_importer_action def import_get_next_index(cls, context, value): context["index"] = cls.get_next_index() def save(self, *args, **kwargs): no_path_change = 'no_path_change' in kwargs \ and kwargs.pop('no_path_change') self.set_index() super(Document, self).save(*args, **kwargs) if self.image and not no_path_change and \ not getattr(self, '_no_path_change', False): links = self._move_image() if not links: return links = self.LINK_SPLIT.join(links) if links != self.associated_links: self.associated_links = links self.save(no_path_change=True) def document_attached_changed(sender, **kwargs): # associate a default main image instance = kwargs.get("instance", None) model = kwargs.get("model", None) pk_set = kwargs.get("pk_set", None) if not instance or not model: return if hasattr(instance, "documents"): items = [instance] else: if not pk_set: return try: items = [model.objects.get(pk=pk) for pk in pk_set] except model.DoesNotExist: return for item in items: q = item.documents.filter( image__isnull=False).exclude(image='') if item.main_image: if q.filter(pk=item.main_image.pk).count(): return # the association has disappear not the main image anymore item.main_image = None item.skip_history_when_saving = True item.save() if not q.count(): return # by default get the lowest pk item.main_image = q.order_by('pk').all()[0] item.skip_history_when_saving = True item.save() post_save.connect(cached_label_changed, sender=Document) class OperationType(GeneralType): order = models.IntegerField(_(u"Order"), default=1) preventive = models.BooleanField(_(u"Is preventive"), default=True) judiciary = models.BooleanField(_(u"Is judiciary"), default=False) class Meta: verbose_name = _(u"Operation type") verbose_name_plural = _(u"Operation types") ordering = ['judiciary', '-preventive', 'order', 'label'] @classmethod def get_types(cls, dct=None, instances=False, exclude=None, empty_first=True, default=None, initial=None): dct = dct or {} exclude = exclude or [] initial = initial or [] tuples = [] dct['available'] = True if not instances and empty_first and not default: tuples.append(('', '--')) if default and not instances: try: default = cls.objects.get(txt_idx=default) tuples.append((default.pk, _(unicode(default)))) except cls.DoesNotExist: pass items = cls.objects.filter(**dct) if default and not instances: exclude.append(default.txt_idx) if exclude: items = items.exclude(txt_idx__in=exclude) current_preventive, current_judiciary, current_lst = None, None, None item_list = list(items.order_by(*cls._meta.ordering).all()) new_vals = cls._get_initial_types(initial, [i.pk for i in item_list], instance=True) item_list += new_vals for item in item_list: item.rank = 0 if instances: return item_list for item in item_list: if not current_lst or item.preventive != current_preventive \ or item.judiciary != current_judiciary: if current_lst: tuples.append(current_lst) if item.judiciary: gp_lbl = _(u"Judiciary") elif item.preventive: gp_lbl = _(u"Preventive") else: gp_lbl = _(u"Research") current_lst = [gp_lbl, []] current_preventive = item.preventive current_judiciary = item.judiciary current_lst[1].append((item.pk, _(unicode(item)))) if current_lst: tuples.append(current_lst) return tuples @classmethod def is_preventive(cls, ope_type_id, key=''): try: op_type = cls.objects.get(pk=ope_type_id) except cls.DoesNotExist: return False if not key: return op_type.preventive return key == op_type.txt_idx @classmethod def is_judiciary(cls, ope_type_id): try: op_type = cls.objects.get(pk=ope_type_id) except cls.DoesNotExist: return False return op_type.judiciary post_save.connect(post_save_cache, sender=OperationType) post_delete.connect(post_save_cache, sender=OperationType) class AdministrationScript(models.Model): path = models.CharField(_(u"Filename"), max_length=30) name = models.TextField(_(u"Name"), null=True, blank=True) class Meta: verbose_name = _(u"Administration script") verbose_name_plural = _(u"Administration scripts") ordering = ['name'] def __unicode__(self): return unicode(self.name) SCRIPT_STATE = (("S", _(u"Scheduled")), ("P", _(u"In progress")), ("FE", _(u"Finished with errors")), ("F", _(u"Finished")), ) SCRIPT_STATE_DCT = dict(SCRIPT_STATE) class AdministrationTask(models.Model): script = models.ForeignKey(AdministrationScript) state = models.CharField(_(u"State"), max_length=2, choices=SCRIPT_STATE, default='S') creation_date = models.DateTimeField(default=datetime.datetime.now) launch_date = models.DateTimeField(null=True, blank=True) finished_date = models.DateTimeField(null=True, blank=True) result = models.TextField(_(u"Result"), null=True, blank=True) class Meta: verbose_name = _(u"Administration task") verbose_name_plural = _(u"Administration tasks") ordering = ['script'] def __unicode__(self): state = _(u"Unknown") if self.state in SCRIPT_STATE_DCT: state = unicode(SCRIPT_STATE_DCT[self.state]) return u"{} - {} - {}".format(self.script, self.creation_date, state) def execute(self): if self.state != 'S': return self.launch_date = datetime.datetime.now() script_dir = settings.ISHTAR_SCRIPT_DIR if not script_dir: self.result = unicode( _(u"ISHTAR_SCRIPT_DIR is not set in your " u"local_settings. Contact your administrator.")) self.state = 'FE' self.finished_date = datetime.datetime.now() self.save() return if '..' in script_dir: self.result = unicode( _(u"Your ISHTAR_SCRIPT_DIR is containing " u"dots \"..\". As it can refer to relative " u"paths, it can be a security issue and this is " u"not allowed. Only put a full path.")) self.state = 'FE' self.finished_date = datetime.datetime.now() self.save() return if not os.path.isdir(script_dir): self.result = unicode( _(u"Your ISHTAR_SCRIPT_DIR: \"{}\" is not a valid directory.") ).format(script_dir) self.state = 'FE' self.finished_date = datetime.datetime.now() self.save() return script_name = None # only script inside the script directory can be executed for name in os.listdir(script_dir): if name == self.script.path: if os.path.isfile(os.path.join(script_dir, name)): script_name = os.path.join(script_dir, name) break if not script_name: self.result = unicode( _(u"Script \"{}\" is not available in your script directory. " u"Check your configuration.") ).format(self.script.path) self.state = 'FE' self.finished_date = datetime.datetime.now() self.save() return self.state = 'P' self.save() self.finished_date = datetime.datetime.now() try: session = Popen([script_name], stdout=PIPE, stderr=PIPE) stdout, stderr = session.communicate() except OSError as e: self.state = 'FE' self.result = u"Error executing \"{}\" script: {}".format( self.script.path, e) self.save() return self.finished_date = datetime.datetime.now() if stderr: self.state = 'FE' self.result = u"Error: {}".format(stderr.decode('utf-8')) else: self.state = 'F' self.result = u"{}".format(stdout.decode('utf-8')) self.save()