#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2010-2017 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. """ Models description """ from cStringIO import StringIO import copy import datetime import inspect from PIL import Image import logging import os from os.path import isfile, join import re from secretary import Renderer as SecretaryRenderer import shutil from subprocess import Popen, PIPE import tempfile import time from unidecode import unidecode from django.conf import settings from django.contrib.postgres.fields import JSONField from django.contrib.postgres.search import SearchVectorField, SearchVector from django.core.cache import cache from django.core.exceptions import ObjectDoesNotExist, ValidationError from django.core.files.uploadedfile import SimpleUploadedFile from django.core.validators import validate_slug from django.core.urlresolvers import reverse, NoReverseMatch from django.db.utils import DatabaseError from django.db.models import Q, Max, Count from django.db.models.signals import post_save, post_delete, m2m_changed from django.utils.functional import lazy from django.utils.translation import ugettext_lazy as _ from django.utils.safestring import SafeUnicode, mark_safe from django.template.defaultfilters import slugify from django.contrib.auth.models import User, Group from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.gis.db import models from simple_history.models import HistoricalRecords as BaseHistoricalRecords from ishtar_common.model_merging import merge_model_objects from ishtar_common.utils import get_cache, disable_for_loaddata, create_slug,\ get_all_field_names, merge_tsvectors, cached_label_changed from ishtar_common.models_imports import ImporterModel, ImporterType, \ ImporterDefault, ImporterDefaultValues, ImporterColumn, \ ImporterDuplicateField, Regexp, ImportTarget, TargetKey, FormaterType, \ Import, TargetKeyGroup __all__ = [ 'ImporterModel', 'ImporterType', 'ImporterDefault', 'ImporterDefaultValues', 'ImporterColumn', 'ImporterDuplicateField', 'Regexp', 'ImportTarget', 'TargetKey', 'FormaterType', 'Import', 'TargetKeyGroup' ] logger = logging.getLogger(__name__) def post_save_user(sender, **kwargs): user = kwargs['instance'] if kwargs["created"]: try: IshtarUser.create_from_user(user) except DatabaseError: # manage when db is not synced pass IshtarUser.set_superuser(user) post_save.connect(post_save_user, sender=User) def check_model_access_control(request, model, available_perms=None): """ Check access control to a model for a specific request :param request: the current request :param model: the concerned model :param available_perms: specific permissions to check if not specified "view" and "view_own" will be checked :return: (allowed, own) tuple """ own = True # more restrictive by default allowed = False if not request.user.is_authenticated(): return allowed, own if not available_perms: available_perms = ['view_' + model.__name__.lower(), 'view_own_' + model.__name__.lower()] if request.user.ishtaruser.has_right('administrator', session=request.session): allowed = True own = False return allowed, own for perm, lbl in model._meta.permissions: if perm not in available_perms: continue cperm = model._meta.app_label + '.' + perm if request.user.has_perm(cperm) \ or cperm in request.user.get_all_permissions() \ or request.user.ishtaruser.has_right( perm, session=request.session): allowed = True if "_own_" not in perm: own = False break # max right reach return allowed, own class ValueGetter(object): _prefix = "" GET_VALUES_EXTRA = [] COL_LABELS = {} GET_VALUE_EXCLUDE_FIELDS = ['search_vector', 'id'] def get_values(self, prefix=''): if not prefix: prefix = self._prefix values = {} for field_name in get_all_field_names(self): if not hasattr(self, field_name) or \ field_name in self.GET_VALUE_EXCLUDE_FIELDS or \ field_name.endswith('_id'): continue value = getattr(self, field_name) if hasattr(value, 'get_values'): values.update(value.get_values(prefix + field_name + '_')) else: values[prefix + field_name] = value for extra_field in self.GET_VALUES_EXTRA: values[prefix + extra_field] = getattr(self, extra_field) or '' for key in values.keys(): val = values[key] if val is None: val = '' else: val = unicode(val) if val.endswith('.None'): val = '' values[key] = val values['KEYS'] = u'\n'.join(values.keys()) value_list = [] for key in values.keys(): if key in ('KEYS', 'VALUES'): continue value_list.append((key, unicode(values[key]))) values['VALUES'] = u'\n'.join( [u"%s: %s" % (k, v) for k, v in sorted(value_list, key=lambda x:x[0])]) for global_var in GlobalVar.objects.all(): values[global_var.slug] = global_var.value or "" return values @classmethod def get_empty_values(cls, prefix=''): if not prefix: prefix = cls._prefix values = {} for field_name in get_all_field_names(cls): values[prefix + field_name] = '' return values class HistoricalRecords(BaseHistoricalRecords): def create_historical_record(self, instance, type): try: history_modifier = getattr(instance, 'history_modifier', None) assert history_modifier except (User.DoesNotExist, AssertionError): # on batch removing of users, user could have disappeared return manager = getattr(instance, self.manager_name) attrs = {} for field in instance._meta.fields: attrs[field.attname] = getattr(instance, field.attname) q_history = instance.history\ .filter(history_modifier_id=history_modifier.pk)\ .order_by('-history_date', '-history_id') if not q_history.count(): manager.create(history_type=type, history_date=datetime.datetime.now(), **attrs) return old_instance = q_history.all()[0] # multiple saving by the same user in a very short time are generaly # caused by post_save signals it is not relevant to keep them min_history_date = datetime.datetime.now() \ - datetime.timedelta(seconds=5) q = q_history.filter(history_date__isnull=False, history_date__gt=min_history_date)\ .order_by('-history_date', '-history_id') if q.count(): return if 'history_date' not in attrs or not attrs['history_date']: attrs['history_date'] = datetime.datetime.now() # record a new version only if data have been changed for field in instance._meta.fields: if getattr(old_instance, field.attname) != attrs[field.attname]: manager.create(history_type=type, **attrs) return def valid_id(cls): # valid ID validator for models def func(value): try: cls.objects.get(pk=value) except ObjectDoesNotExist: raise ValidationError(_(u"Not a valid item.")) return func def valid_ids(cls): def func(value): if "," in value: value = value.split(",") if type(value) not in (list, tuple): value = [value] for v in value: try: cls.objects.get(pk=v) except ObjectDoesNotExist: raise ValidationError( _(u"A selected item is not a valid item.")) return func def is_unique(cls, field): # unique validator for models def func(value): query = {field: value} try: assert cls.objects.filter(**query).count() == 0 except AssertionError: raise ValidationError(_(u"This item already exists.")) return func class OwnPerms(object): """ Manage special permissions for object's owner """ @classmethod def get_query_owns(cls, user): """ Query object to get own items """ return None # implement for each object def is_own(self, user): """ Check if the current object is owned by the user """ query = self.get_query_owns(user) if not query: return False query &= Q(pk=self.pk) return self.__class__.objects.filter(query).count() @classmethod def has_item_of(cls, user): """ Check if the user own some items """ query = cls.get_query_owns(user) if not query: return False return cls.objects.filter(query).count() @classmethod def _return_get_owns(cls, owns, values, get_short_menu_class, label_key='cached_label'): sorted_values = [] if hasattr(cls, 'BASKET_MODEL'): owns_len = len(owns) for idx, item in enumerate(reversed(owns)): if get_short_menu_class: item = item[0] if type(item) == cls.BASKET_MODEL: basket = owns.pop(owns_len - idx - 1) sorted_values.append(basket) sorted_values = list(reversed(sorted_values)) if not values: if not get_short_menu_class: return sorted_values + list( sorted(owns, key=lambda x: getattr(x, label_key))) return sorted_values + list( sorted(owns, key=lambda x: getattr(x[0], label_key))) if not get_short_menu_class: return sorted_values + list( sorted(owns, key=lambda x: x[label_key])) return sorted_values + list( sorted(owns, key=lambda x: x[0][label_key])) @classmethod def get_owns(cls, user, replace_query={}, limit=None, values=None, get_short_menu_class=False): """ Get Own items """ if hasattr(user, 'is_authenticated') and not user.is_authenticated(): returned = cls.objects.filter(pk__isnull=True) if values: returned = [] return returned if isinstance(user, User): user = IshtarUser.objects.get(user_ptr=user) items = [] if hasattr(cls, 'BASKET_MODEL'): items = list(cls.BASKET_MODEL.objects.filter(user=user).all()) query = cls.get_query_owns(user) if not query and not replace_query: returned = cls.objects.filter(pk__isnull=True) if values: returned = [] return returned if query: q = cls.objects.filter(query) else: # replace_query q = cls.objects.filter(replace_query) if values: q = q.values(*values) if limit: items += list(q.order_by('-pk')[:limit]) else: items += list(q.order_by(*cls._meta.ordering).all()) if get_short_menu_class: if values: if 'id' not in values: raise NotImplementedError( "Call of get_owns with get_short_menu_class option and" " no 'id' in values is not implemented") my_items = [] for i in items: if hasattr(cls, 'BASKET_MODEL') and \ type(i) == cls.BASKET_MODEL: dct = dict([(k, getattr(i, k)) for k in values]) my_items.append( (dct, cls.BASKET_MODEL.get_short_menu_class(i.pk))) else: my_items.append((i, cls.get_short_menu_class(i['id']))) items = my_items else: items = [(i, cls.get_short_menu_class(i.pk)) for i in items] return items class Cached(object): slug_field = 'txt_idx' @classmethod def refresh_cache(cls): cache_ckey, current_keys = get_cache(cls, ['_current_keys']) if not current_keys: return for keys in current_keys: if len(keys) == 2 and keys[0] == '__slug': cls.get_cache(keys[1], force=True) elif keys[0] == '__get_types': default = None empty_first = True exclude = [] if len(keys) >= 2: default = keys.pop() if len(keys) > 1: empty_first = bool(keys.pop()) exclude = keys[1:] cls.get_types( exclude=exclude, empty_first=empty_first, default=default, force=True) elif keys[0] == '__get_help': cls.get_help(force=True) @classmethod def _add_cache_key_to_refresh(cls, keys): cache_ckey, current_keys = get_cache(cls, ['_current_keys']) if type(current_keys) != list: current_keys = [] if keys not in current_keys: current_keys.append(keys) cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT) @classmethod def get_cache(cls, slug, force=False): cache_key, value = get_cache(cls, ['__slug', slug]) if not force and value: return value try: k = {cls.slug_field: slug} obj = cls.objects.get(**k) cache.set(cache_key, obj, settings.CACHE_TIMEOUT) return obj except cls.DoesNotExist: cache.set(cache_key, None, settings.CACHE_TIMEOUT) return None @disable_for_loaddata def post_save_cache(sender, **kwargs): sender.refresh_cache() class SlugModelManager(models.Manager): def get_by_natural_key(self, slug): return self.get(slug=slug) class TypeManager(models.Manager): def get_by_natural_key(self, txt_idx): return self.get(txt_idx=txt_idx) class GeneralType(Cached, models.Model): """ Abstract class for "types" """ label = models.CharField(_(u"Label"), max_length=100) txt_idx = models.CharField( _(u"Textual ID"), validators=[validate_slug], max_length=100, unique=True, help_text=_( u"The slug is the standardized version of the name. It contains " u"only lowercase letters, numbers and hyphens. Each slug must " u"be unique.")) comment = models.TextField(_(u"Comment"), blank=True, null=True) available = models.BooleanField(_(u"Available"), default=True) HELP_TEXT = u"" objects = TypeManager() class Meta: abstract = True def __unicode__(self): return self.label def natural_key(self): return (self.txt_idx, ) @classmethod def create_default_for_test(cls): return [cls.objects.create(label='Test %d' % i) for i in range(5)] @property def short_label(self): return self.label @property def name(self): return self.label @classmethod def get_or_create(cls, slug, label=''): """ Get or create a new item. :param slug: textual id :param label: label for initialization if the item doesn't exist (not mandatory) :return: instancied item of the base class """ item = cls.get_cache(slug) if item: return item item, created = cls.objects.get_or_create( txt_idx=slug, defaults={'label': label}) return item @classmethod def get_or_create_pk(cls, slug): """ Get an id from a slug. Create the associated item if needed. :param slug: textual id :return: id of the item (string) """ return unicode(cls.get_or_create(slug).pk) @classmethod def get_or_create_pks(cls, slugs): """ Get and merge a list of ids from a slug list. Create the associated items if needed. :param slugs: textual ids :return: string with ids separated by "_" """ items = [] for slug in slugs: items.append(str(cls.get_or_create(slug).pk)) return u"_".join(items) @classmethod def get_help(cls, dct={}, exclude=[], force=False): keys = ['__get_help'] keys += [u"{}".format(ex) for ex in exclude] keys += [u'{}-{}'.format(unicode(k), dct[k]) for k in dct] cache_key, value = get_cache(cls, keys) if value and not force: return mark_safe(value) help_text = cls.HELP_TEXT c_rank = -1 help_items = u"\n" for item in cls.get_types(dct=dct, instances=True, exclude=exclude): if hasattr(item, '__iter__'): pk = item[0] item = cls.objects.get(pk=pk) item.rank = c_rank + 1 if hasattr(item, 'parent'): c_item = item parents = [] while c_item.parent: parents.append(c_item.parent.label) c_item = c_item.parent parents.reverse() parents.append(item.label) item.label = u" / ".join(parents) if not item.comment: continue if c_rank > item.rank: help_items += u"\n" elif c_rank < item.rank: help_items += u"
\n" c_rank = item.rank help_items += u"
%s
%s
" % ( item.label, u"
".join(item.comment.split('\n'))) c_rank += 1 if c_rank: help_items += c_rank * u"
" if help_text or help_items != u'\n': help_text = help_text + help_items else: help_text = u"" cache.set(cache_key, help_text, settings.CACHE_TIMEOUT) return mark_safe(help_text) @classmethod def _get_initial_types(cls, initial, type_pks, instance=False): new_vals = [] if not initial: return [] if type(initial) not in (list, tuple): initial = [initial] for value in initial: try: pk = int(value) except (ValueError, TypeError): continue if pk in type_pks: continue try: extra_type = cls.objects.get(pk=pk) if instance: new_vals.append(extra_type) else: new_vals.append((extra_type.pk, unicode(extra_type))) except cls.DoesNotExist: continue return new_vals @classmethod def get_types(cls, dct={}, instances=False, exclude=[], empty_first=True, default=None, initial=None, force=False): types = [] if not instances and empty_first and not default: types = [('', '--')] types += cls._pre_get_types(dct, instances, exclude, default, force) if not initial: return types new_vals = cls._get_initial_types(initial, [idx for idx, lbl in types]) types += new_vals return types @classmethod def _pre_get_types(cls, dct={}, instances=False, exclude=[], default=None, force=False): # cache cache_key = None if not instances: keys = ['__get_types'] keys += [u"{}".format(ex) for ex in exclude] + \ [u"{}".format(default)] keys += [u'{}-{}'.format(unicode(k), dct[k]) for k in dct] cache_key, value = get_cache(cls, keys) if value and not force: return value base_dct = dct.copy() if hasattr(cls, 'parent'): if not cache_key: return cls._get_parent_types( base_dct, instances, exclude=exclude, default=default) vals = [v for v in cls._get_parent_types( base_dct, instances, exclude=exclude, default=default)] cache.set(cache_key, vals, settings.CACHE_TIMEOUT) return vals if not cache_key: return cls._get_types(base_dct, instances, exclude=exclude, default=default) vals = [v for v in cls._get_types( base_dct, instances, exclude=exclude, default=default)] cache.set(cache_key, vals, settings.CACHE_TIMEOUT) return vals @classmethod def _get_types(cls, dct={}, instances=False, exclude=[], default=None): dct['available'] = True if default: try: default = cls.objects.get(txt_idx=default) yield(default.pk, _(unicode(default))) except cls.DoesNotExist: pass items = cls.objects.filter(**dct) if default and default != "None": if hasattr(default, 'txt_idx'): exclude.append(default.txt_idx) else: exclude.append(default) if exclude: items = items.exclude(txt_idx__in=exclude) for item in items.order_by(*cls._meta.ordering).all(): if instances: item.rank = 0 yield item else: yield (item.pk, _(unicode(item)) if item and unicode(item) else '') PREFIX = "│ " PREFIX_EMPTY = "  " PREFIX_MEDIUM = "├ " PREFIX_LAST = "└ " @classmethod def _get_childs(cls, item, dct, prefix=0, instances=False, exclude=[], is_last=False, last_of=[]): prefix += 1 dct['parent'] = item childs = cls.objects.filter(**dct) if exclude: childs = childs.exclude(txt_idx__in=exclude) if hasattr(cls, 'order'): childs = childs.order_by('order') lst = [] child_lst = childs.all() total = len(child_lst) for idx, child in enumerate(child_lst): mylast_of = last_of[:] if instances: child.rank = prefix lst.append(child) else: p = '' cprefix = prefix while cprefix: cprefix -= 1 if not cprefix: if (idx + 1) == total: p += cls.PREFIX_LAST else: p += cls.PREFIX_MEDIUM elif is_last: if mylast_of: clast = mylast_of.pop(0) if clast: p += cls.PREFIX_EMPTY else: p += cls.PREFIX else: p += cls.PREFIX_EMPTY else: p += cls.PREFIX lst.append(( child.pk, SafeUnicode(p + unicode(_(unicode(child)))) )) clast_of = last_of[:] clast_of.append(idx + 1 == total) for sub_child in cls._get_childs( child, dct, prefix, instances, exclude=exclude, is_last=((idx + 1) == total), last_of=clast_of): lst.append(sub_child) return lst @classmethod def _get_parent_types(cls, dct={}, instances=False, exclude=[], default=None): dct['available'] = True dct['parent'] = None items = cls.objects.filter(**dct) if exclude: items = items.exclude(txt_idx__in=exclude) if hasattr(cls, 'order'): items = items.order_by('order') for item in items.all(): if instances: item.rank = 0 yield item else: yield (item.pk, unicode(item)) for child in cls._get_childs(item, dct, instances, exclude=exclude): yield child def save(self, *args, **kwargs): if not self.id and not self.label: self.label = u" ".join(u" ".join(self.txt_idx.split('-')) .split('_')).title() if not self.txt_idx: self.txt_idx = slugify(self.label)[:100] # clean old keys if self.pk: old = self.__class__.objects.get(pk=self.pk) content_type = ContentType.objects.get_for_model(self.__class__) if slugify(self.label) != slugify(old.label): ItemKey.objects.filter( object_id=self.pk, key=slugify(old.label), content_type=content_type).delete() if self.txt_idx != old.txt_idx: ItemKey.objects.filter( object_id=self.pk, key=old.txt_idx, content_type=content_type).delete() obj = super(GeneralType, self).save(*args, **kwargs) self.generate_key(force=True) return obj def add_key(self, key, force=False, importer=None, group=None, user=None): content_type = ContentType.objects.get_for_model(self.__class__) if not importer and not force and ItemKey.objects.filter( key=key, content_type=content_type).count(): return filtr = {'key': key, 'content_type': content_type} if group: filtr['group'] = group elif user: filtr['user'] = user else: filtr['importer'] = importer if force: ItemKey.objects.filter(**filtr).exclude(object_id=self.pk).delete() filtr['object_id'] = self.pk ItemKey.objects.get_or_create(**filtr) def generate_key(self, force=False): for key in (slugify(self.label), self.txt_idx): self.add_key(key) def get_keys(self, importer): keys = [self.txt_idx] content_type = ContentType.objects.get_for_model(self.__class__) base_q = Q(content_type=content_type, object_id=self.pk) subquery = Q(importer__isnull=True, user__isnull=True, group__isnull=True) subquery |= Q(user__isnull=True, group__isnull=True, importer=importer) if importer.user: subquery |= Q(user=importer.user, group__isnull=True, importer=importer) if importer.associated_group: subquery |= Q(user__isnull=True, group=importer.associated_group, importer=importer) q = ItemKey.objects.filter(base_q & subquery) for ik in q.exclude(key=self.txt_idx).all(): keys.append(ik.key) return keys @classmethod def generate_keys(cls): # content_type = ContentType.objects.get_for_model(cls) for item in cls.objects.all(): item.generate_key() class HierarchicalType(GeneralType): parent = models.ForeignKey('self', blank=True, null=True, verbose_name=_(u"Parent")) class Meta: abstract = True def full_label(self): lbls = [self.label] item = self while item.parent: item = item.parent lbls.append(item.label) return u" > ".join(reversed(lbls)) class ItemKey(models.Model): key = models.CharField(_(u"Key"), max_length=100) content_type = models.ForeignKey(ContentType) object_id = models.PositiveIntegerField() content_object = GenericForeignKey('content_type', 'object_id') importer = models.ForeignKey( Import, null=True, blank=True, help_text=_(u"Specific key to an import")) user = models.ForeignKey('IshtarUser', blank=True, null=True) group = models.ForeignKey(TargetKeyGroup, blank=True, null=True) def __unicode__(self): return self.key def get_image_path(instance, filename): return instance._get_image_path(filename) class ImageModel(models.Model): image = models.ImageField(upload_to=get_image_path, blank=True, null=True, max_length=255) thumbnail = models.ImageField(upload_to=get_image_path, blank=True, null=True, max_length=255) IMAGE_MAX_SIZE = settings.IMAGE_MAX_SIZE THUMB_MAX_SIZE = settings.THUMB_MAX_SIZE IMAGE_PREFIX = '' class Meta: abstract = True def _get_image_path(self, filename): return u"{}/{}".format(self._get_base_image_path(), filename) def _get_base_image_path(self): return u"upload" def has_changed(self, field): if not self.pk: return True manager = getattr(self.__class__, 'objects') old = getattr(manager.get(pk=self.pk), field) return not getattr(self, field) == old def create_thumb(self, image, size): """Returns the image resized to fit inside a box of the given size""" image.thumbnail(size, Image.ANTIALIAS) temp = StringIO() image.save(temp, 'jpeg') temp.seek(0) return SimpleUploadedFile('temp', temp.read()) def save(self, *args, **kwargs): if 'force_copy' in kwargs: kwargs.pop('force_copy') super(ImageModel, self).save(*args, **kwargs) return # manage images if not self.has_changed('image'): return super(ImageModel, self).save(*args, **kwargs) if not self.image: self.thumbnail = None return super(ImageModel, self).save(*args, **kwargs) # # generate thumbnail # convert to jpg filename = os.path.splitext(os.path.split(self.image.name)[-1])[0] old_path = self.image.path filename = "%s.jpg" % filename try: image = Image.open(self.image.file) # convert to RGB if image.mode not in ('L', 'RGB'): image = image.convert('RGB') # resize if necessary if self.IMAGE_MAX_SIZE: self.image.save(filename, self.create_thumb(image, self.IMAGE_MAX_SIZE), save=False) if old_path != self.image.path: try: os.remove(old_path) except OSError: # already clean pass # save the thumbnail splited = filename.split('.') thumb_filename = u"{}-thumb.{}".format( u".".join(splited[:-1]), splited[-1] ) self.thumbnail.save( thumb_filename, self.create_thumb(image, self.THUMB_MAX_SIZE), save=False) except IOError: pass return super(ImageModel, self).save(*args, **kwargs) class HistoryError(Exception): def __init__(self, value): self.value = value def __str__(self): return repr(self.value) PRIVATE_FIELDS = ('id', 'history_modifier', 'order') class BulkUpdatedItem(object): @classmethod def bulk_recursion(cls, transaction_id, extra_args): """ Prevent infinite recursion. Should not happen but wrong manipulation in the database or messy imports can generate circular relations :param transaction_id: current transaction ID (unix time) - if null a transaction ID is generated :param extra_args: arguments dealing with :return: (transaction ID, is a recursion) """ if not transaction_id: transaction_id = unicode(time.time()) args = ['cached_label_bulk_update', transaction_id] + extra_args key, val = get_cache(cls, args) if val: return transaction_id, True cache.set(key, 1, settings.CACHE_SMALLTIMEOUT) return transaction_id, False class JsonDataSection(models.Model): content_type = models.ForeignKey(ContentType) name = models.CharField(_(u"Name"), max_length=200) order = models.IntegerField(_(u"Order"), default=10) class Meta: verbose_name = _(u"Json data - Menu") verbose_name_plural = _(u"Json data - Menus") ordering = ['order', 'name'] def __unicode__(self): return u"{} - {}".format(self.content_type, self.name) class JsonDataField(models.Model): name = models.CharField(_(u"Name"), max_length=200) content_type = models.ForeignKey(ContentType) key = models.CharField( _(u"Key"), max_length=200, help_text=_(u"Value of the key in the JSON schema. For hierarchical " u"key use \"__\" to explain it. For instance for the key " u"'my_subkey' with data such as {'my_key': {'my_subkey': " u"'value'}}, its value will be reached with my_key__my_subkey.")) display = models.BooleanField(_(u"Display"), default=True) order = models.IntegerField(_(u"Order"), default=10) section = models.ForeignKey(JsonDataSection, blank=True, null=True) class Meta: verbose_name = _(u"Json data - Field") verbose_name_plural = _(u"Json data - Fields") ordering = ['order', 'name'] def __unicode__(self): return u"{} - {}".format(self.content_type, self.name) def clean(self): if not self.section: return if self.section.content_type != self.content_type: raise ValidationError( _(u"Content types of the field and of the menu do not match")) class JsonData(models.Model): data = JSONField(default={}, db_index=True, blank=True) class Meta: abstract = True def pre_save(self): if not self.data: self.data = {} @property def json_sections(self): sections = [] try: content_type = ContentType.objects.get_for_model(self) except ContentType.DoesNotExists: return sections fields = list(JsonDataField.objects.filter( content_type=content_type, display=True, section__isnull=True ).all()) # no section fields fields += list(JsonDataField.objects.filter( content_type=content_type, display=True, section__isnull=False ).order_by('section__order', 'order').all()) for field in fields: value = None data = self.data.copy() for key in field.key.split('__'): if key in data: value = copy.copy(data[key]) data = data[key] else: value = None break if not value: continue if type(value) in (list, tuple): value = u" ; ".join([unicode(v) for v in value]) section_name = field.section.name if field.section else None if not sections or section_name != sections[-1][0]: # if section name is identical it is the same sections.append((section_name, [])) sections[-1][1].append((field.name, value)) return sections class Imported(models.Model): imports = models.ManyToManyField( Import, blank=True, related_name="imported_%(app_label)s_%(class)s") class Meta: abstract = True class FullSearch(models.Model): search_vector = SearchVectorField(_("Search vector"), blank=True, null=True, help_text=_("Auto filled at save")) BASE_SEARCH_VECTORS = [] INT_SEARCH_VECTORS = [] M2M_SEARCH_VECTORS = [] PARENT_SEARCH_VECTORS = [] class Meta: abstract = True def update_search_vector(self, save=True): """ Update the search vector :param save: True if you want to save the object immediately :return: True if modified """ if not self.pk: logger.warning("Cannot update search vector before save or " "after deletion.") return if not self.BASE_SEARCH_VECTORS and not self.M2M_SEARCH_VECTORS \ and not self.INT_SEARCH_VECTORS \ and not self.PARENT_SEARCH_VECTORS: logger.warning("No search_vectors defined for {}".format( self.__class__)) return if getattr(self, '_search_updated', None): return self._search_updated = True old_search = "" if self.search_vector: old_search = self.search_vector[:] search_vectors = [] base_q = self.__class__.objects.filter(pk=self.pk) # many to many have to be queried one by one otherwise only one is fetch for M2M_SEARCH_VECTOR in self.M2M_SEARCH_VECTORS: key = M2M_SEARCH_VECTOR.split('__')[0] rel_key = getattr(self, key) for item in rel_key.values('pk').all(): query_dct = {key + "__pk": item['pk']} q = copy.copy(base_q).filter(**query_dct) q = q.annotate( search=SearchVector( M2M_SEARCH_VECTOR, config=settings.ISHTAR_SEARCH_LANGUAGE) ).values('search') search_vectors.append(q.all()[0]['search']) # int/float are not well managed by the SearchVector for INT_SEARCH_VECTOR in self.INT_SEARCH_VECTORS: q = base_q.values(INT_SEARCH_VECTOR) search_vectors.append( "'{}':1".format(q.all()[0][INT_SEARCH_VECTOR])) # copy parent vector fields for PARENT_SEARCH_VECTOR in self.PARENT_SEARCH_VECTORS: parent = getattr(self, PARENT_SEARCH_VECTOR) if hasattr(parent, 'all'): # m2m for p in parent.all(): search_vectors.append(p.search_vector) elif parent: search_vectors.append(parent.search_vector) if self.BASE_SEARCH_VECTORS: # query "simple" fields q = base_q.annotate( search=SearchVector( *self.BASE_SEARCH_VECTORS, config=settings.ISHTAR_SEARCH_LANGUAGE )).values('search') search_vectors.append( unidecode( q.all()[0]['search'].decode('utf-8') ) ) self.search_vector = merge_tsvectors(search_vectors) changed = old_search != self.search_vector if save and changed: self.skip_history_when_saving = True self.save() return changed class FixAssociated(object): ASSOCIATED = {} def fix_associated(self): for key in self.ASSOCIATED: item = getattr(self, key) if not item: continue dct = self.ASSOCIATED[key] for dct_key in dct: subkey, ctype = dct_key expected_values = dct[dct_key] if not isinstance(expected_values, (list, tuple)): expected_values = [expected_values] if hasattr(ctype, "txt_idx"): try: expected_values = [ctype.objects.get(txt_idx=v) for v in expected_values] except ctype.DoesNotExist: # type not yet initialized return current_vals = getattr(item, subkey) is_many = False if hasattr(current_vals, "all"): is_many = True current_vals = current_vals.all() else: current_vals = [current_vals] is_ok = False for current_val in current_vals: if current_val in expected_values: is_ok = True break if is_ok: continue # the first value is used new_value = expected_values[0] if is_many: getattr(item, subkey).add(new_value) else: setattr(item, subkey, new_value) class BaseHistorizedItem(FullSearch, Imported, JsonData, FixAssociated): """ Historized item with external ID management. All historized items are searcheable and have a data json field """ IS_BASKET = False EXTERNAL_ID_KEY = '' EXTERNAL_ID_DEPENDENCIES = [] history_modifier = models.ForeignKey( User, related_name='+', on_delete=models.SET_NULL, verbose_name=_(u"Last editor"), blank=True, null=True) history_creator = models.ForeignKey( User, related_name='+', on_delete=models.SET_NULL, verbose_name=_(u"Creator"), blank=True, null=True) class Meta: abstract = True @classmethod def get_verbose_name(cls): return cls._meta.verbose_name def update_external_id(self, save=False): if not self.EXTERNAL_ID_KEY or ( self.external_id and not self.auto_external_id): return external_id = get_external_id(self.EXTERNAL_ID_KEY, self) if external_id == self.external_id: return self.auto_external_id = True self.external_id = external_id self._cached_label_checked = False if save: self.skip_history_when_saving = True self.save() return external_id def get_previous(self, step=None, date=None, strict=True): """ Get a "step" previous state of the item """ assert step or date historized = self.history.all() item = None if step: if len(historized) <= step: # silently return the last step if too far in the history item = historized[len(historized) - 1] else: item = historized[step] else: for step, item in enumerate(historized): if item.history_date == date: break # ended with no match if item.history_date != date: return item._step = step if len(historized) != (step + 1): item._previous = historized[step + 1].history_date else: item._previous = None if step > 0: item._next = historized[step - 1].history_date else: item._next = None item.history_date = historized[step].history_date model = self.__class__ for k in get_all_field_names(model): field = model._meta.get_field(k) if hasattr(field, 'rel') and field.rel: if not hasattr(item, k + '_id'): setattr(item, k, getattr(self, k)) continue val = getattr(item, k + '_id') if not val: setattr(item, k, None) continue try: val = field.rel.to.objects.get(pk=val) setattr(item, k, val) except ObjectDoesNotExist: if strict: raise HistoryError(u"The class %s has no pk %d" % ( unicode(field.rel.to), val)) setattr(item, k, None) item.pk = self.pk return item @property def last_edition_date(self): try: return self.history.order_by('-history_date').all()[0].history_date except (AttributeError, IndexError): return @property def history_creation_date(self): try: return self.history.order_by('history_date').all()[0].history_date except (AttributeError, IndexError): return def rollback(self, date): """ Rollback to a previous state """ to_del, new_item = [], None for item in self.history.all(): if item.history_date == date: new_item = item break to_del.append(item) if not new_item: raise HistoryError(u"The date to rollback to doesn't exist.") try: for f in self._meta.fields: k = f.name if k != 'id' and hasattr(self, k): if not hasattr(new_item, k): k = k + "_id" setattr(self, k, getattr(new_item, k)) try: self.history_modifier = User.objects.get( pk=new_item.history_modifier_id) except User.ObjectDoesNotExist: pass # force label regeneration self._cached_label_checked = False self.save() except: raise HistoryError(u"The rollback has failed.") # clean the obsolete history for historized_item in to_del: historized_item.delete() def values(self): values = {} for f in self._meta.fields: k = f.name if k != 'id': values[k] = getattr(self, k) return values def get_show_url(self): try: return reverse('show-' + self.__class__.__name__.lower(), args=[self.pk, '']) except NoReverseMatch: return @property def associated_filename(self): if [True for attr in ('get_town_label', 'get_department', 'reference', 'short_class_name') if not hasattr(self, attr)]: return '' items = [slugify(self.get_department()), slugify(self.get_town_label()).upper(), slugify(self.short_class_name), slugify(self.reference), slugify(self.name or '').replace('-', '_').capitalize()] last_edition_date = self.last_edition_date if last_edition_date: items.append(last_edition_date.strftime('%Y%m%d')) else: items.append('00000000') return u"-".join([unicode(item) for item in items]) def save(self, *args, **kwargs): created = not self.pk if not getattr(self, 'skip_history_when_saving', False): assert hasattr(self, 'history_modifier') if created: self.history_creator = self.history_modifier # external ID can have related item not available before save external_id_updated = kwargs.pop('external_id_updated') \ if 'external_id_updated' in kwargs else False if not created and not external_id_updated: self.update_external_id() super(BaseHistorizedItem, self).save(*args, **kwargs) if created and self.update_external_id(): # force resave for external ID creation self.skip_history_when_saving = True self._updated_id = True return self.save(external_id_updated=True) for dep in self.EXTERNAL_ID_DEPENDENCIES: for obj in getattr(self, dep).all(): obj.update_external_id(save=True) self.fix_associated() return True class GeneralRelationType(GeneralType): order = models.IntegerField(_(u"Order"), default=1) symmetrical = models.BooleanField(_(u"Symmetrical")) tiny_label = models.CharField(_(u"Tiny label"), max_length=50, blank=True, null=True) inverse_relation = models.ForeignKey( 'self', verbose_name=_(u"Inverse relation"), blank=True, null=True) class Meta: abstract = True def clean(self): # cannot have symmetrical and an inverse_relation if self.symmetrical and self.inverse_relation: raise ValidationError( _(u"Cannot have symmetrical and an inverse_relation")) def get_tiny_label(self): return self.tiny_label or self.label or u"" def save(self, *args, **kwargs): obj = super(GeneralRelationType, self).save(*args, **kwargs) # after saving check that the inverse_relation of the inverse_relation # point to the saved object if self.inverse_relation \ and (not self.inverse_relation.inverse_relation or self.inverse_relation.inverse_relation != self): self.inverse_relation.inverse_relation = self self.inverse_relation.symmetrical = False self.inverse_relation.save() return obj class GeneralRecordRelations(object): def save(self, *args, **kwargs): super(GeneralRecordRelations, self).save(*args, **kwargs) # after saving create the symetric or the inverse relation sym_rel_type = self.relation_type if not self.relation_type.symmetrical: sym_rel_type = self.relation_type.inverse_relation # no symetric/inverse is defined if not sym_rel_type: return dct = {'right_record': self.left_record, 'left_record': self.right_record, 'relation_type': sym_rel_type} self.__class__.objects.get_or_create(**dct) return self def post_delete_record_relation(sender, instance, **kwargs): # delete symmetrical or inverse relation sym_rel_type = instance.relation_type if not instance.relation_type.symmetrical: sym_rel_type = instance.relation_type.inverse_relation # no symetric/inverse is defined if not sym_rel_type: return dct = {'right_record_id': instance.left_record_id, 'left_record_id': instance.right_record_id, 'relation_type': sym_rel_type} q = instance.__class__.objects.filter(**dct) if q.count(): q.delete() class ShortMenuItem(object): @classmethod def get_short_menu_class(cls, pk): return '' class LightHistorizedItem(BaseHistorizedItem): history_date = models.DateTimeField(default=datetime.datetime.now) class Meta: abstract = True def save(self, *args, **kwargs): super(LightHistorizedItem, self).save(*args, **kwargs) return True PARSE_FORMULA = re.compile("{([^}]*)}") FORMULA_FILTERS = { 'upper': lambda x: x.upper(), 'lower': lambda x: x.lower(), 'capitalize': lambda x: x.capitalize(), 'slug': lambda x: slugify(x) } def get_external_id(key, item): profile = get_current_profile() if not hasattr(profile, key): return formula = getattr(profile, key) dct = {} for fkey in PARSE_FORMULA.findall(formula): filtered = fkey.split(u'|') initial_key = fkey[:] fkey = filtered[0] filters = [] for filtr in filtered[1:]: if filtr in FORMULA_FILTERS: filters.append(FORMULA_FILTERS[filtr]) if fkey.startswith('settings__'): dct[fkey] = getattr(settings, fkey[len('settings__'):]) or '' continue obj = item for k in fkey.split('__'): try: obj = getattr(obj, k) except ObjectDoesNotExist: obj = None if callable(obj): obj = obj() if obj is None: break if obj is None: dct[initial_key] = '' else: dct[initial_key] = unicode(obj) for filtr in filters: dct[initial_key] = filtr(dct[initial_key]) return formula.format(**dct) CURRENCY = ((u"€", _(u"Euro")), (u"$", _(u"US dollar"))) FIND_INDEX_SOURCE = ((u"O", _(u"Operations")), (u"CR", _(u"Context records"))) SITE_LABELS = [('site', _(u"Site")), ('entity', _(u"Archaeological entity"))] TRANSLATED_SITE_LABELS = { 'site': { 'search': _(u"Site search"), 'new': _(u"New site"), 'modification': _(u"Site modification"), }, 'entity': { 'search': _(u"Archaeological entity search"), 'new': _(u"New archaeological entity"), 'modification': _(u"Archaeological entity modification"), }, } class IshtarSiteProfile(models.Model, Cached): slug_field = 'slug' label = models.TextField(_(u"Name")) slug = models.SlugField(_(u"Slug"), unique=True) description = models.TextField(_(u"Description"), null=True, blank=True) files = models.BooleanField(_(u"Files module"), default=False) archaeological_site = models.BooleanField( _(u"Archaeological site module"), default=False) archaeological_site_label = models.CharField( _(u"Archaeological site type"), max_length=200, choices=SITE_LABELS, default='site' ) context_record = models.BooleanField(_(u"Context records module"), default=False) find = models.BooleanField(_(u"Finds module"), default=False, help_text=_(u"Need context records module")) find_index = models.CharField( _(u"Find index is based on"), default='O', max_length=2, choices=FIND_INDEX_SOURCE, help_text=_(u"To prevent irrelevant indexes, change this parameter " u"only if there is no find in the database")) warehouse = models.BooleanField( _(u"Warehouses module"), default=False, help_text=_(u"Need finds module")) preservation = models.BooleanField(_(u"Preservation module"), default=False) mapping = models.BooleanField(_(u"Mapping module"), default=False) underwater = models.BooleanField(_(u"Underwater module"), default=False) parcel_mandatory = models.BooleanField( _(u"Parcel are mandatory for context records"), default=True) homepage = models.TextField( _(u"Home page"), null=True, blank=True, help_text=_(u"Homepage of Ishtar - if not defined a default homepage " u"will appear. Use the markdown syntax. {random_image} " u"can be used to display a random image.")) file_external_id = models.TextField( _(u"File external id"), default=u"{year}-{numeric_reference}", help_text=_(u"Formula to manage file external ID. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) parcel_external_id = models.TextField( _(u"Parcel external id"), default=u"{associated_file__external_id}{operation__code_patriarche}-" u"{town__numero_insee}-{section}{parcel_number}", help_text=_(u"Formula to manage parcel external ID. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) context_record_external_id = models.TextField( _(u"Context record external id"), default=u"{parcel__external_id}-{label}", help_text=_(u"Formula to manage context record external ID. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) base_find_external_id = models.TextField( _(u"Base find external id"), default=u"{context_record__external_id}-{label}", help_text=_(u"Formula to manage base find external ID. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) find_external_id = models.TextField( _(u"Find external id"), default=u"{get_first_base_find__context_record__external_id}-{label}", help_text=_(u"Formula to manage find external ID. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) container_external_id = models.TextField( _(u"Container external id"), default=u"{responsible__external_id}-{index}", help_text=_(u"Formula to manage container external ID. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) warehouse_external_id = models.TextField( _(u"Warehouse external id"), default=u"{name|slug}", help_text=_(u"Formula to manage warehouse external ID. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) person_raw_name = models.TextField( _(u"Raw name for person"), default=u"{name|upper} {surname}", help_text=_(u"Formula to manage person raw_name. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) active = models.BooleanField(_(u"Current active"), default=False) currency = models.CharField(_(u"Currency"), default=u"€", choices=CURRENCY, max_length=5) class Meta: verbose_name = _(u"Ishtar site profile") verbose_name_plural = _(u"Ishtar site profiles") ordering = ['label'] def __unicode__(self): return unicode(self.label) @classmethod def get_current_profile(cls, force=False): cache_key, value = get_cache(cls, ['is-current-profile']) if value and not force: return value q = cls.objects.filter(active=True) if not q.count(): obj = cls.objects.create( label="Default profile", slug='default', active=True) else: obj = q.all()[0] cache.set(cache_key, obj, settings.CACHE_TIMEOUT) return obj @classmethod def get_default_site_label(cls, key=None): return cls.get_current_profile().get_site_label(key) def get_site_label(self, key=None): if not key: return unicode(dict(SITE_LABELS)[self.archaeological_site_label]) return unicode( TRANSLATED_SITE_LABELS[self.archaeological_site_label][key] ) def save(self, *args, **kwargs): raw = False if 'raw' in kwargs: raw = kwargs.pop('raw') super(IshtarSiteProfile, self).save(*args, **kwargs) obj = self if raw: return obj q = self.__class__.objects.filter(active=True).exclude(slug=self.slug) if obj.active and q.count(): for profile in q.all(): profile.active = False profile.save(raw=True) changed = False if not obj.active and not q.count(): obj.active = True changed = True if obj.warehouse and not obj.find: obj.find = True changed = True if obj.find and not obj.context_record: obj.context_record = True changed = True if changed: obj = obj.save(raw=True) return obj def get_current_profile(force=False): return IshtarSiteProfile.get_current_profile(force=force) def cached_site_changed(sender, **kwargs): get_current_profile(force=True) from ishtar_common.menus import menu menu.reinit_menu_for_all_user() post_save.connect(cached_site_changed, sender=IshtarSiteProfile) post_delete.connect(cached_site_changed, sender=IshtarSiteProfile) class CustomForm(models.Model): name = models.CharField(_(u"Name"), max_length=250) form = models.CharField(_(u"Form"), max_length=250) available = models.BooleanField(_(u"Available"), default=True) enabled = models.BooleanField( _(u"Enable this form"), default=True, help_text=_(u"Disable with caution: disabling a form with mandatory " u"fields may lead to database errors.")) apply_to_all = models.BooleanField( _(u"Apply to all"), default=False, help_text=_(u"Apply this form to all users. If set to True, selecting " u"user and user type is useless.")) users = models.ManyToManyField('IshtarUser', blank=True) user_types = models.ManyToManyField('PersonType', blank=True) class Meta: verbose_name = _(u"Custom form") verbose_name_plural = _(u"Custom forms") ordering = ['name', 'form'] def __unicode__(self): return u"{} - {}".format(self.name, self.form) def users_lbl(self): users = [unicode(user) for user in self.users.all()] return " ; ".join(users) users_lbl.short_description = _(u"Users") def user_types_lbl(self): user_types = [unicode(u) for u in self.user_types.all()] return " ; ".join(user_types) user_types_lbl.short_description = _(u"User types") @classmethod def register(cls): if hasattr(cls, '_register'): return cls._register cache_key, value = get_cache(cls.__class__, ['dct-forms'], app_label='ishtar_common') if value: cls._register = value return cls._register cls._register = {} # ideally should be improved but only used in admin from ishtar_common.admin import ISHTAR_FORMS from ishtar_common.forms import CustomForm for app_form in ISHTAR_FORMS: for form in dir(app_form): if 'Form' not in form: # not very clean... but do not treat inappropriate items continue form = getattr(app_form, form) if not inspect.isclass(form) \ or not issubclass(form, CustomForm) \ or not getattr(form, 'form_slug', None): continue cls._register[form.form_slug] = form return cls._register def get_form_class(self): register = self.register() if self.form not in self._register: return return register[self.form] class ExcludedField(models.Model): custom_form = models.ForeignKey(CustomForm, related_name='excluded_fields') field = models.CharField(_(u"Field"), max_length=250) class Meta: verbose_name = _(u"Excluded field") verbose_name_plural = _(u"Excluded fields") class GlobalVar(models.Model, Cached): slug = models.SlugField(_(u"Variable name"), unique=True) description = models.TextField(_(u"Description of the variable"), null=True, blank=True) value = models.TextField(_(u"Value"), null=True, blank=True) class Meta: verbose_name = _(u"Global variable") verbose_name_plural = _(u"Global variables") ordering = ['slug'] def __unicode__(self): return unicode(self.slug) def cached_globalvar_changed(sender, **kwargs): if not kwargs['instance']: return var = kwargs['instance'] cache_key, value = get_cache(GlobalVar, var.slug) cache.set(cache_key, var.value, settings.CACHE_TIMEOUT) post_save.connect(cached_globalvar_changed, sender=GlobalVar) class UserDashboard: def __init__(self): types = IshtarUser.objects.values('person__person_types', 'person__person_types__label') self.types = types.annotate(number=Count('pk'))\ .order_by('person__person_types') class DashboardFormItem(object): """ Provide methods to manage statistics """ def _get_or_set_stats(self, funcname, update, timeout=settings.CACHE_TIMEOUT): key, val = get_cache(self.__class__, [funcname, self.pk]) if not update and val is not None: return val val = getattr(self, funcname)() cache.set(key, val, timeout) return val @classmethod def get_periods(cls, slice='month', fltr={}, date_source='creation'): date_var = date_source + '_date' q = cls.objects.filter(**{date_var + '__isnull': False}) if fltr: q = q.filter(**fltr) if slice == 'year': return [res[date_var].year for res in list(q.values(date_var) .annotate(Count("id")).order_by())] elif slice == 'month': return [(res[date_var].year, res[date_var].month) for res in list(q.values(date_var) .annotate(Count("id")).order_by())] return [] @classmethod def get_by_year(cls, year, fltr={}, date_source='creation'): date_var = date_source + '_date' q = cls.objects.filter(**{date_var + '__isnull': False}) if fltr: q = q.filter(**fltr) return q.filter( **{date_var + '__year': year}).order_by('pk').distinct('pk') @classmethod def get_by_month(cls, year, month, fltr={}, date_source='creation'): date_var = date_source + '_date' q = cls.objects.filter(**{date_var + '__isnull': False}) if fltr: q = q.filter(**fltr) q = q.filter( **{date_var + '__year': year, date_var + '__month': month}) return q.order_by('pk').distinct('pk') @classmethod def get_total_number(cls, fltr=None): q = cls.objects if fltr: q = q.filter(**fltr) return q.order_by('pk').distinct('pk').count() class Dashboard(object): def __init__(self, model, slice='year', date_source=None, show_detail=None, fltr=None): if not fltr: fltr = {} # don't provide date_source if it is not relevant self.model = model self.total_number = model.get_total_number(fltr) self.show_detail = show_detail history_model = self.model.history.model # last edited - created self.recents, self.lasts = [], [] for last_lst, modif_type in ((self.lasts, '+'), (self.recents, '~')): last_ids = history_model.objects.values('id')\ .annotate(hd=Max('history_date')) last_ids = last_ids.filter(history_type=modif_type) from archaeological_finds.models import Find if self.model == Find: last_ids = last_ids.filter( downstream_treatment_id__isnull=True) if modif_type == '+': last_ids = last_ids.filter( upstream_treatment_id__isnull=True) last_ids = last_ids.order_by('-hd').distinct().all()[:5] for idx in last_ids: try: obj = self.model.objects.get(pk=idx['id']) except: # deleted object are always referenced in history continue obj.history_date = idx['hd'] last_lst.append(obj) # years base_kwargs = {'fltr': fltr.copy()} if date_source: base_kwargs['date_source'] = date_source periods_kwargs = copy.deepcopy(base_kwargs) periods_kwargs['slice'] = slice self.periods = model.get_periods(**periods_kwargs) self.periods = list(set(self.periods)) self.periods.sort() if not self.total_number or not self.periods: return kwargs_num = copy.deepcopy(base_kwargs) self.serie_labels = [_(u"Total")] # numbers if slice == 'year': self.values = [('year', "", list(reversed(self.periods)))] self.numbers = [model.get_by_year(year, **kwargs_num).count() for year in self.periods] self.values += [('number', _(u"Number"), list(reversed(self.numbers)))] if slice == 'month': periods = list(reversed(self.periods)) self.periods = ["%d-%s-01" % (p[0], ('0' + str(p[1])) if len(str(p[1])) == 1 else p[1]) for p in periods] self.values = [('month', "", self.periods)] if show_detail: for dpt, lbl in settings.ISHTAR_DPTS: self.serie_labels.append(unicode(dpt)) idx = 'number_' + unicode(dpt) kwargs_num['fltr']["towns__numero_insee__startswith"] = \ unicode(dpt) numbers = [model.get_by_month(*p.split('-')[:2], **kwargs_num).count() for p in self.periods] self.values += [(idx, dpt, list(numbers))] # put "Total" at the end self.serie_labels.append(self.serie_labels.pop(0)) kwargs_num = base_kwargs.copy() self.numbers = [model.get_by_month(*p.split('-')[:2], **kwargs_num).count() for p in self.periods] self.values += [('number', _(u"Total"), list(self.numbers))] # calculate self.average = self.get_average() self.variance = self.get_variance() self.standard_deviation = self.get_standard_deviation() self.median = self.get_median() self.mode = self.get_mode() # by operation if not hasattr(model, 'get_by_operation'): return operations = model.get_operations() operation_numbers = [model.get_by_operation(op).count() for op in operations] # calculate self.operation_average = self.get_average(operation_numbers) self.operation_variance = self.get_variance(operation_numbers) self.operation_standard_deviation = self.get_standard_deviation( operation_numbers) self.operation_median = self.get_median(operation_numbers) operation_mode_pk = self.get_mode(dict(zip(operations, operation_numbers))) if operation_mode_pk: from archaeological_operations.models import Operation self.operation_mode = unicode(Operation.objects .get(pk=operation_mode_pk)) def get_average(self, vals=[]): if not vals: vals = self.numbers[:] return sum(vals) / len(vals) def get_variance(self, vals=[]): if not vals: vals = self.numbers[:] avrg = self.get_average(vals) return self.get_average([(x - avrg) ** 2 for x in vals]) def get_standard_deviation(self, vals=[]): if not vals: vals = self.numbers[:] return round(self.get_variance(vals) ** 0.5, 3) def get_median(self, vals=[]): if not vals: vals = self.numbers[:] len_vals = len(vals) vals.sort() if (len_vals % 2) == 1: return vals[len_vals / 2] else: return (vals[len_vals / 2 - 1] + vals[len_vals / 2]) / 2.0 def get_mode(self, vals={}): if not vals: vals = dict(zip(self.periods, self.numbers[:])) mx = max(vals.values()) for v in vals: if vals[v] == mx: return v class DocumentTemplate(models.Model): CLASSNAMES = (('archaeological_operations.models.AdministrativeAct', _(u"Administrative Act")),) name = models.CharField(_(u"Name"), max_length=100) slug = models.SlugField(_(u"Slug"), blank=True, null=True, max_length=100, unique=True) template = models.FileField( _(u"Template"), upload_to="templates/%Y/") associated_object_name = models.CharField( _(u"Associated object"), max_length=100, choices=CLASSNAMES) available = models.BooleanField(_(u"Available"), default=True) objects = SlugModelManager() class Meta: verbose_name = _(u"Document template") verbose_name_plural = _(u"Document templates") ordering = ['associated_object_name', 'name'] def __unicode__(self): return self.name def natural_key(self): return (self.slug, ) def save(self, *args, **kwargs): if not self.slug: self.slug = create_slug(DocumentTemplate, self.name) return super(DocumentTemplate, self).save(*args, **kwargs) @classmethod def get_tuples(cls, dct={}, empty_first=True): dct['available'] = True if empty_first: yield ('', '----------') items = cls.objects.filter(**dct) for item in items.distinct().order_by(*cls._meta.ordering).all(): yield (item.pk, _(unicode(item))) def publish(self, c_object): tempdir = tempfile.mkdtemp("-ishtardocs") output_name = tempdir + os.path.sep + \ slugify(self.name.replace(' ', '_').lower()) + u'-' +\ datetime.date.today().strftime('%Y-%m-%d') +\ u"." + self.template.name.split('.')[-1] values = c_object.get_values() engine = SecretaryRenderer() result = engine.render(self.template, **values) output = open(output_name, 'wb') output.write(result) return output_name def convert_from_v1(self): """ Convert the current template from v1 to v2. """ from old.ooo_replace import ooo_replace from archaeological_operations.models import AdministrativeAct old_dir = settings.MEDIA_ROOT + "/templates/v1/" if not os.path.exists(old_dir): os.makedirs(old_dir) shutil.copy(settings.MEDIA_ROOT + self.template.name, old_dir) tempdir = tempfile.mkdtemp("-ishtardocs") output_name = tempdir + os.path.sep + self.template.name.split( os.sep)[-1] objects = [] filters = [ {'operation__isnull': False}, {'associated_file__isnull': False}, {'treatment_file__isnull': False}, {'treatment__isnull': False}, ] for filtr in filters: q = AdministrativeAct.objects.filter(**filtr) if q.count(): objects.append(q.all()[0]) if not objects: return values = {} for obj in objects: values.update(obj.get_values()) for key in values: values[key] = "{{ " + key + " }}" ooo_replace(self.template, output_name, values) shutil.move(output_name, settings.MEDIA_ROOT + self.template.name) return output_name class NumberManager(models.Manager): def get_by_natural_key(self, number): return self.get(number=number) class State(models.Model): label = models.CharField(_(u"Label"), max_length=30) number = models.CharField(_(u"Number"), unique=True, max_length=3) objects = NumberManager() class Meta: verbose_name = _(u"State") ordering = ['number'] def __unicode__(self): return self.label def natural_key(self): return (self.number, ) class Department(models.Model): label = models.CharField(_(u"Label"), max_length=30) number = models.CharField(_(u"Number"), unique=True, max_length=3) state = models.ForeignKey('State', verbose_name=_(u"State"), blank=True, null=True) objects = NumberManager() class Meta: verbose_name = _(u"Department") verbose_name_plural = _(u"Departments") ordering = ['number'] def __unicode__(self): return self.label def natural_key(self): return (self.number, ) class Address(BaseHistorizedItem): address = models.TextField(_(u"Address"), null=True, blank=True) address_complement = models.TextField(_(u"Address complement"), null=True, blank=True) postal_code = models.CharField(_(u"Postal code"), max_length=10, null=True, blank=True) town = models.CharField(_(u"Town"), max_length=70, null=True, blank=True) country = models.CharField(_(u"Country"), max_length=30, null=True, blank=True) alt_address = models.TextField(_(u"Other address: address"), null=True, blank=True) alt_address_complement = models.TextField( _(u"Other address: address complement"), null=True, blank=True) alt_postal_code = models.CharField(_(u"Other address: postal code"), max_length=10, null=True, blank=True) alt_town = models.CharField(_(u"Other address: town"), max_length=70, null=True, blank=True) alt_country = models.CharField(_(u"Other address: country"), max_length=30, null=True, blank=True) phone = models.CharField(_(u"Phone"), max_length=18, null=True, blank=True) phone_desc = models.CharField(_(u"Phone description"), max_length=300, null=True, blank=True) phone2 = models.CharField(_(u"Phone description 2"), max_length=18, null=True, blank=True) phone_desc2 = models.CharField(_(u"Phone description 2"), max_length=300, null=True, blank=True) phone3 = models.CharField(_(u"Phone 3"), max_length=18, null=True, blank=True) phone_desc3 = models.CharField(_(u"Phone description 3"), max_length=300, null=True, blank=True) raw_phone = models.TextField(_(u"Raw phone"), blank=True, null=True) mobile_phone = models.CharField(_(u"Mobile phone"), max_length=18, null=True, blank=True) email = models.EmailField( _(u"Email"), max_length=300, blank=True, null=True) alt_address_is_prefered = models.BooleanField( _(u"Alternative address is prefered"), default=False) history = HistoricalRecords() class Meta: abstract = True def simple_lbl(self): return unicode(self) def full_address(self): lbl = self.simple_lbl() if lbl: lbl += u"\n" lbl += self.address_lbl() return lbl def address_lbl(self): lbl = u'' prefix = '' if self.alt_address_is_prefered: prefix = 'alt_' if getattr(self, prefix + 'address'): lbl += getattr(self, prefix + 'address') if getattr(self, prefix + 'address_complement'): if lbl: lbl += "\n" lbl += getattr(self, prefix + 'address_complement') postal_code = getattr(self, prefix + 'postal_code') town = getattr(self, prefix + 'town') if postal_code or town: if lbl: lbl += "\n" lbl += u"{}{}{}".format( postal_code or '', " " if postal_code and town else '', town or '') if self.phone: if lbl: lbl += u"\n" lbl += u"{} {}".format(unicode(_("Tel: ")), self.phone) if self.mobile_phone: if lbl: lbl += u"\n" lbl += u"{} {}".format(unicode(_("Mobile: ")), self.mobile_phone) if self.email: if lbl: lbl += u"\n" lbl += u"{} {}".format(unicode(_("Email: ")), self.email) return lbl class Merge(models.Model): merge_key = models.TextField(_("Merge key"), blank=True, null=True) merge_candidate = models.ManyToManyField("self", blank=True) merge_exclusion = models.ManyToManyField("self", blank=True) archived = models.NullBooleanField(default=False, blank=True, null=True) # 1 for one word similarity, 2 for two word similarity, etc. MERGE_CLEMENCY = None EMPTY_MERGE_KEY = '--' class Meta: abstract = True def generate_merge_key(self): if self.archived: return self.merge_key = slugify(self.name if self.name else '') if not self.merge_key: self.merge_key = self.EMPTY_MERGE_KEY self.merge_key = self.merge_key def generate_merge_candidate(self): if self.archived: return if not self.merge_key: self.generate_merge_key() self.save(merge_key_generated=True) if not self.pk or self.merge_key == self.EMPTY_MERGE_KEY: return q = self.__class__.objects\ .exclude(pk=self.pk)\ .exclude(merge_exclusion=self)\ .exclude(merge_candidate=self)\ .exclude(archived=True) if not self.MERGE_CLEMENCY: q = q.filter(merge_key=self.merge_key) else: subkeys_front = u"-".join( self.merge_key.split('-')[:self.MERGE_CLEMENCY]) subkeys_back = u"-".join( self.merge_key.split('-')[-self.MERGE_CLEMENCY:]) q = q.filter(Q(merge_key__istartswith=subkeys_front) | Q(merge_key__iendswith=subkeys_back)) for item in q.all(): self.merge_candidate.add(item) def save(self, *args, **kwargs): # prevent circular save merge_key_generated = False if 'merge_key_generated' in kwargs: merge_key_generated = kwargs.pop('merge_key_generated') self.generate_merge_key() item = super(Merge, self).save(*args, **kwargs) if not merge_key_generated: self.generate_merge_candidate() return item def archive(self): self.archived = True self.save() for m in self.merge_candidate.all(): m.delete() for m in self.merge_exclusion.all(): m.delete() def merge(self, item): merge_model_objects(self, item) self.generate_merge_candidate() class OrganizationType(GeneralType): class Meta: verbose_name = _(u"Organization type") verbose_name_plural = _(u"Organization types") ordering = ('label',) post_save.connect(post_save_cache, sender=OrganizationType) post_delete.connect(post_save_cache, sender=OrganizationType) organization_type_pk_lazy = lazy(OrganizationType.get_or_create_pk, unicode) organization_type_pks_lazy = lazy(OrganizationType.get_or_create_pks, unicode) class Organization(Address, Merge, OwnPerms, ValueGetter): TABLE_COLS = ('name', 'organization_type', 'town') SHOW_URL = 'show-organization' # search parameters EXTRA_REQUEST_KEYS = { 'name': 'name__icontains', 'organization_type': 'organization_type__pk__in', } BASE_SEARCH_VECTORS = ['name', 'town'] # fields name = models.CharField(_(u"Name"), max_length=500) organization_type = models.ForeignKey(OrganizationType, verbose_name=_(u"Type")) history = HistoricalRecords() class Meta: verbose_name = _(u"Organization") verbose_name_plural = _(u"Organizations") permissions = ( ("view_organization", u"Can view all Organizations"), ("view_own_organization", u"Can view own Organization"), ("add_own_organization", u"Can add own Organization"), ("change_own_organization", u"Can change own Organization"), ("delete_own_organization", u"Can delete own Organization"), ) def simple_lbl(self): if self.name: return self.name return u"{} - {}".format(self.organization_type, self.town or "") def __unicode__(self): if self.name: return self.name return u"{} - {} - {}".format(self.organization_type, self.address or "", self.town or "") def generate_merge_key(self): self.merge_key = slugify(self.name if self.name else '') if not self.merge_key: self.merge_key = self.EMPTY_MERGE_KEY if self.town: self.merge_key += "-" + slugify(self.town or '') if self.address: self.merge_key += "-" + slugify(self.address or '') @property def associated_filename(self): values = [unicode(getattr(self, attr)) for attr in ('organization_type', 'name') if getattr(self, attr)] return slugify(u"-".join(values)) class PersonType(GeneralType): # rights = models.ManyToManyField(WizardStep, verbose_name=_(u"Rights")) groups = models.ManyToManyField(Group, verbose_name=_(u"Groups"), blank=True) class Meta: verbose_name = _(u"Person type") verbose_name_plural = _(u"Person types") ordering = ('label',) post_save.connect(post_save_cache, sender=PersonType) post_delete.connect(post_save_cache, sender=PersonType) person_type_pk_lazy = lazy(PersonType.get_or_create_pk, unicode) person_type_pks_lazy = lazy(PersonType.get_or_create_pks, unicode) class TitleType(GeneralType): class Meta: verbose_name = _(u"Title type") verbose_name_plural = _(u"Title types") ordering = ('label',) post_save.connect(post_save_cache, sender=TitleType) post_delete.connect(post_save_cache, sender=TitleType) class Person(Address, Merge, OwnPerms, ValueGetter): _prefix = 'person_' TYPE = ( ('Mr', _(u'Mr')), ('Ms', _(u'Miss')), ('Mr and Miss', _(u'Mr and Mrs')), ('Md', _(u'Mrs')), ('Dr', _(u'Doctor')), ) TABLE_COLS = ('name', 'surname', 'raw_name', 'email', 'person_types_list', 'attached_to__name', 'town') SHOW_URL = 'show-person' MODIFY_URL = 'person_modify' BASE_SEARCH_VECTORS = ['name', 'surname', 'raw_name', 'town', 'attached_to__name', 'email'] # search parameters REVERSED_BOOL_FIELDS = ['ishtaruser__isnull'] EXTRA_REQUEST_KEYS = { 'name': ['name__icontains', 'raw_name__icontains'], 'surname': ['surname__icontains', 'raw_name__icontains'], 'attached_to': 'attached_to__pk', 'attached_to__name': 'attached_to__name', 'person_types': 'person_types__pk__in', 'ishtaruser__isnull': 'ishtaruser__isnull' } COL_LABELS = { 'attached_to__name': _(u"Organization") } # fields old_title = models.CharField(_(u"Title"), max_length=100, choices=TYPE, blank=True, null=True) title = models.ForeignKey(TitleType, verbose_name=_(u"Title"), blank=True, null=True) salutation = models.CharField(_(u"Salutation"), max_length=200, blank=True, null=True) surname = models.CharField(_(u"Surname"), max_length=50, blank=True, null=True) name = models.CharField(_(u"Name"), max_length=200, blank=True, null=True) raw_name = models.CharField(_(u"Raw name"), max_length=300, blank=True, null=True) contact_type = models.CharField(_(u"Contact type"), max_length=300, blank=True, null=True) comment = models.TextField(_(u"Comment"), blank=True, null=True) person_types = models.ManyToManyField(PersonType, verbose_name=_(u"Types")) attached_to = models.ForeignKey( 'Organization', related_name='members', on_delete=models.SET_NULL, verbose_name=_(u"Is attached to"), blank=True, null=True) history = HistoricalRecords() class Meta: verbose_name = _(u"Person") verbose_name_plural = _(u"Persons") permissions = ( ("view_person", u"Can view all Persons"), ("view_own_person", u"Can view own Person"), ("add_own_person", u"Can add own Person"), ("change_own_person", u"Can change own Person"), ("delete_own_person", u"Can delete own Person"), ) @property def full_title(self): return u" ".join( [unicode(getattr(self, attr)) for attr in ['title', 'salutation'] if getattr(self, attr)]) def simple_lbl(self): values = [unicode(getattr(self, attr)) for attr in ('surname', 'name') if getattr(self, attr)] if not values and self.raw_name: values = [self.raw_name] return u" ".join(values) def __unicode__(self): values = [unicode(getattr(self, attr)) for attr in ('surname', 'name') if getattr(self, attr)] if not values and self.raw_name: values = [self.raw_name] if self.attached_to: attached_to = unicode(self.attached_to) if values: values.append(u'-') values.append(attached_to) return u" ".join(values) def fancy_str(self): values = [""] values += [unicode(getattr(self, attr)) for attr in ('surname', 'name') if getattr(self, attr)] if not values and self.raw_name: values += [self.raw_name] values += [""] if self.attached_to: attached_to = unicode(self.attached_to) if values: values.append(u'-') values.append(attached_to) return u" ".join(values) def get_values(self, prefix=''): values = super(Person, self).get_values(prefix=prefix) if not self.attached_to: values.update( Person.get_empty_values(prefix=prefix + 'attached_to_')) return values person_types_list_lbl = _(u"Types") @property def person_types_list(self): return u", ".join([unicode(pt) for pt in self.person_types.all()]) def generate_merge_key(self): if self.name and self.name.strip(): self.merge_key = slugify(self.name.strip()) + \ ((u'-' + slugify(self.surname.strip())) if self.surname else u'') elif self.raw_name and self.raw_name.strip(): self.merge_key = slugify(self.raw_name.strip()) elif self.attached_to: self.merge_key = self.attached_to.merge_key else: self.merge_key = self.EMPTY_MERGE_KEY if self.merge_key != self.EMPTY_MERGE_KEY and self.attached_to: self.merge_key += "-" + self.attached_to.merge_key def is_natural(self): return not self.attached_to def has_right(self, right_name, session=None): if '.' in right_name: right_name = right_name.split('.')[-1] res, cache_key = "", "" if session: cache_key = 'session-{}-{}'.format(session.session_key, right_name) res = cache.get(cache_key) if res in (True, False): return res if type(right_name) in (list, tuple): res = bool(self.person_types.filter( txt_idx__in=right_name).count()) or \ bool(self.person_types.filter( groups__permissions__codename__in=right_name).count()) or\ bool(self.ishtaruser.user_ptr.groups.filter( permissions__codename__in=right_name ).count()) or\ bool(self.ishtaruser.user_ptr.user_permissions.filter( codename__in=right_name).count()) # or self.person_types.filter(wizard__url_name__in=right_name).count()) else: res = bool(self.person_types.filter(txt_idx=right_name).count()) or\ bool(self.person_types.filter( groups__permissions__codename=right_name).count()) or \ bool(self.ishtaruser.user_ptr.groups.filter( permissions__codename__in=[right_name] ).count()) or \ bool(self.ishtaruser.user_ptr.user_permissions.filter( codename__in=[right_name]).count()) # or self.person_types.filter(wizard__url_name=right_name).count()) if session: cache.set(cache_key, res, settings.CACHE_TIMEOUT) return res def full_label(self): values = [] if self.title: values = [self.title.label] values += [unicode(getattr(self, attr)) for attr in ('salutation', 'surname', 'name') if getattr(self, attr)] if not values and self.raw_name: values = [self.raw_name] if self.attached_to: values.append(u"- " + unicode(self.attached_to)) return u" ".join(values) @property def associated_filename(self): values = [unicode(getattr(self, attr)) for attr in ('surname', 'name', 'attached_to') if getattr(self, attr)] return slugify(u"-".join(values)) def operation_docs_q(self): from archaeological_operations.models import OperationSource return OperationSource.objects.filter( authors__person=self) def contextrecord_docs_q(self): from archaeological_context_records.models import ContextRecordSource return ContextRecordSource.objects.filter( authors__person=self) def find_docs_q(self): from archaeological_finds.models import FindSource return FindSource.objects.filter( authors__person=self) def save(self, *args, **kwargs): super(Person, self).save(*args, **kwargs) raw_name = get_external_id('person_raw_name', self) if raw_name and self.raw_name != raw_name: self.raw_name = raw_name self.save() if hasattr(self, 'responsible_town_planning_service'): for fle in self.responsible_town_planning_service.all(): fle.save() # force update of raw_town_planning_service if hasattr(self, 'general_contractor'): for fle in self.general_contractor.all(): fle.save() # force update of raw_general_contractor @classmethod def get_query_owns(cls, user): return \ Q(operation_scientist_responsability__collaborators__ishtaruser =user.ishtaruser) | \ Q(operation_scientist_responsability__scientist__ishtaruser =user.ishtaruser) | \ Q(operation_collaborator__collaborators__ishtaruser =user.ishtaruser) | \ Q(operation_collaborator__scientist__ishtaruser =user.ishtaruser) class IshtarUser(FullSearch): TABLE_COLS = ('username', 'person__name', 'person__surname', 'person__email', 'person__person_types_list', 'person__attached_to') BASE_SEARCH_VECTORS = [ 'user_ptr__username', 'person__name', 'person__surname', 'person__email', 'person__town', 'person__attached_to__name'] # search parameters EXTRA_REQUEST_KEYS = { 'username': ['username__icontains'], 'name': ['person__name__icontains', 'person__raw_name__icontains'], 'surname': ['person__surname__icontains', 'person__raw_name__icontains'], 'email': ['person__email'], 'attached_to': 'person__attached_to__pk', 'person_types': 'person__person_types__pk__in', 'person__person_types_list': 'person__person_types__name' } # fields user_ptr = models.OneToOneField(User, primary_key=True, related_name='ishtaruser') person = models.OneToOneField(Person, verbose_name=_(u"Person"), related_name='ishtaruser') advanced_shortcut_menu = models.BooleanField( _(u"Advanced shortcut menu"), default=False) class Meta: verbose_name = _(u"Ishtar user") verbose_name_plural = _(u"Ishtar users") def __unicode__(self): return unicode(self.person) @classmethod def set_superuser(cls, user): q = cls.objects.filter(user_ptr=user) if not q.count(): return ishtaruser = q.all()[0] admin, created = PersonType.objects.get_or_create( txt_idx='administrator') person = ishtaruser.person if user.is_superuser: person.person_types.add(admin) elif admin in person.person_types.all(): person.person_types.remove(admin) @classmethod def create_from_user(cls, user): default = user.username surname = user.first_name or default name = user.last_name or default email = user.email person = Person.objects.create(surname=surname, name=name, email=email, history_modifier=user) isht_user = cls.objects.create(user_ptr=user, person=person) return isht_user def has_right(self, right_name, session=None): return self.person.has_right(right_name, session=session) def full_label(self): return self.person.full_label() def has_perm(self, perm, model=None, session=None, obj=None): if not session: return self.user_ptr.has_perm(perm, model) cache_key = 'usersession-{}-{}-{}-{}'.format( session.session_key, perm, model.__name__ if model else 'no', obj.pk if obj else 'no') res = cache.get(cache_key) if res in (True, False): return res res = self.user_ptr.has_perm(perm, model) cache.set(cache_key, res, settings.CACHE_SMALLTIMEOUT) return res class Basket(models.Model): """ Abstract class for a basket Subclass must be defined with an "items" ManyToManyField """ IS_BASKET = True label = models.CharField(_(u"Label"), max_length=1000) comment = models.TextField(_(u"Comment"), blank=True, null=True) user = models.ForeignKey(IshtarUser, blank=True, null=True) available = models.BooleanField(_(u"Available"), default=True) class Meta: abstract = True unique_together = (('label', 'user'),) def __unicode__(self): return self.label @property def cached_label(self): return unicode(self) @classmethod def get_short_menu_class(cls, pk): return 'basket' @property def associated_filename(self): return "{}-{}".format(datetime.date.today().strftime( "%Y-%m-%d"), slugify(self.label)) class AuthorType(GeneralType): order = models.IntegerField(_(u"Order"), default=1) class Meta: verbose_name = _(u"Author type") verbose_name_plural = _(u"Author types") ordering = ['order', 'label'] post_save.connect(post_save_cache, sender=AuthorType) post_delete.connect(post_save_cache, sender=AuthorType) class Author(FullSearch): person = models.ForeignKey(Person, verbose_name=_(u"Person"), related_name='author') author_type = models.ForeignKey(AuthorType, verbose_name=_(u"Author type")) PARENT_SEARCH_VECTORS = ['person'] class Meta: verbose_name = _(u"Author") verbose_name_plural = _(u"Authors") ordering = ('author_type__order', 'person__name') permissions = ( ("view_author", u"Can view all Authors"), ("view_own_author", u"Can view own Author"), ("add_own_author", u"Can add own Author"), ("change_own_author", u"Can change own Author"), ("delete_own_author", u"Can delete own Author"), ) def __unicode__(self): return unicode(self.person) + settings.JOINT + \ unicode(self.author_type) def fancy_str(self): return self.person.fancy_str() + settings.JOINT + \ unicode(self.author_type) def related_sources(self): return list(self.treatmentsource_related.all()) + \ list(self.operationsource_related.all()) + \ list(self.findsource_related.all()) + \ list(self.contextrecordsource_related.all()) class SourceType(GeneralType): class Meta: verbose_name = _(u"Source type") verbose_name_plural = _(u"Source types") ordering = ['label'] post_save.connect(post_save_cache, sender=SourceType) post_delete.connect(post_save_cache, sender=SourceType) class SupportType(GeneralType): class Meta: verbose_name = _(u"Support type") verbose_name_plural = _(u"Support types") post_save.connect(post_save_cache, sender=SupportType) post_delete.connect(post_save_cache, sender=SupportType) class Format(GeneralType): class Meta: verbose_name = _(u"Format type") verbose_name_plural = _(u"Format types") ordering = ['label'] post_save.connect(post_save_cache, sender=Format) post_delete.connect(post_save_cache, sender=Format) class Source(OwnPerms, ImageModel, FullSearch): title = models.CharField(_(u"Title"), max_length=300) external_id = models.TextField(_(u"External ID"), max_length=300, null=True, blank=True) source_type = models.ForeignKey(SourceType, verbose_name=_(u"Type")) support_type = models.ForeignKey(SupportType, verbose_name=_(u"Support"), blank=True, null=True,) format_type = models.ForeignKey(Format, verbose_name=_(u"Format"), blank=True, null=True,) scale = models.CharField(_(u"Scale"), max_length=30, null=True, blank=True) authors = models.ManyToManyField(Author, verbose_name=_(u"Authors"), related_name="%(class)s_related") associated_url = models.URLField( blank=True, null=True, max_length=1000, verbose_name=_(u"Numerical ressource (web address)")) receipt_date = models.DateField(blank=True, null=True, verbose_name=_(u"Receipt date")) creation_date = models.DateField(blank=True, null=True, verbose_name=_(u"Creation date")) receipt_date_in_documentation = models.DateField( blank=True, null=True, verbose_name=_(u"Receipt date in documentation")) item_number = models.IntegerField(_(u"Item number"), default=1) reference = models.CharField(_(u"Ref."), max_length=100, null=True, blank=True) internal_reference = models.CharField( _(u"Internal ref."), max_length=100, null=True, blank=True) description = models.TextField(_(u"Description"), blank=True, null=True) comment = models.TextField(_(u"Comment"), blank=True, null=True) additional_information = models.TextField(_(u"Additional information"), blank=True, null=True) duplicate = models.BooleanField(_(u"Has a duplicate"), default=False) TABLE_COLS = ['title', 'source_type', 'authors', 'associated_url'] COL_LINK = ['associated_url'] BASE_SEARCH_VECTORS = ['title', 'source_type__label', 'external_id', 'reference', 'description', 'comment', 'additional_information'] PARENT_SEARCH_VECTORS = ['authors'] class Meta: abstract = True def __unicode__(self): return self.title def get_associated_operation(self): raise NotImplementedError() def _get_base_image_path(self): base = self.owner._get_base_image_path() return u"{}/sources".format(base) @property def associated_filename(self): values = [unicode(getattr(self, attr)) for attr in ('source_type', 'title') if getattr(self, attr)] return slugify(u"-".join(values)) class LicenseType(GeneralType): class Meta: verbose_name = _(u"License type") verbose_name_plural = _(u"License types") ordering = ('label',) class ImageType(GeneralType): class Meta: verbose_name = _(u"Image type") verbose_name_plural = _(u"Image types") ordering = ('label',) class IshtarImage(ImageModel): name = models.CharField(_(u"Name"), max_length=250) description = models.TextField(_(u"Description"), blank=True, null=True) licenses = models.ManyToManyField(LicenseType, verbose_name=_(u"License"), blank=True) authors = models.ManyToManyField(Author, verbose_name=_(u"Authors"), blank=True) authors_raw = models.CharField(verbose_name=_(u"Authors (raw)"), blank=True, null=True, max_length=250) image_type = models.ForeignKey(ImageType, verbose_name=_(u"Type"), blank=True, null=True) creation_date = models.DateField(blank=True, null=True, verbose_name=_(u"Creation date")) reference = models.CharField(_(u"Ref."), max_length=250, null=True, blank=True) internal_reference = models.CharField( _(u"Internal ref."), max_length=250, null=True, blank=True) class Meta: verbose_name = _(u"Image") verbose_name_plural = _(u"Images") ordering = ('name',) if settings.COUNTRY == 'fr': class Arrondissement(models.Model): name = models.CharField(u"Nom", max_length=30) department = models.ForeignKey(Department, verbose_name=u"Département") def __unicode__(self): return settings.JOINT.join((self.name, unicode(self.department))) class Canton(models.Model): name = models.CharField(u"Nom", max_length=30) arrondissement = models.ForeignKey(Arrondissement, verbose_name=u"Arrondissement") def __unicode__(self): return settings.JOINT.join( (self.name, unicode(self.arrondissement))) class TownManager(models.GeoManager): def get_by_natural_key(self, numero_insee, year): return self.get(numero_insee=numero_insee, year=year) class Town(Imported, models.Model): name = models.CharField(_(u"Name"), max_length=100) surface = models.IntegerField(_(u"Surface (m2)"), blank=True, null=True) center = models.PointField(_(u"Localisation"), srid=settings.SRID, blank=True, null=True) limit = models.MultiPolygonField(_(u"Limit"), blank=True, null=True) numero_insee = models.CharField(u"Code commune (numéro INSEE)", max_length=120) departement = models.ForeignKey( Department, verbose_name=_(u"Department"), null=True, blank=True) year = models.IntegerField( _("Year of creation"), null=True, blank=True, help_text=_(u"Filling this field is relevant to distinguish old towns " u"from new towns.")) children = models.ManyToManyField( 'Town', verbose_name=_(u"Town children"), blank=True, related_name='parents') cached_label = models.CharField(_(u"Cached name"), max_length=500, null=True, blank=True, db_index=True) objects = TownManager() class Meta: verbose_name = _(u"Town") verbose_name_plural = _(u"Towns") if settings.COUNTRY == 'fr': ordering = ['numero_insee'] unique_together = (('numero_insee', 'year'),) def natural_key(self): return (self.numero_insee, self.year) def __unicode__(self): if self.cached_label: return self.cached_label self.save() return self.cached_label def generate_geo(self, force=False): force = self.generate_limit(force=force) self.generate_center(force=force) self.generate_area(force=force) def generate_limit(self, force=False): if not force and self.limit: return parents = None if not self.parents.count(): return for parent in self.parents.all(): if not parent.limit: return if not parents: parents = parent.limit else: parents = parents.union(parent.limit) # if union is a simple polygon make it a multi if 'MULTI' not in parents.wkt: parents = parents.wkt.replace('POLYGON', 'MULTIPOLYGON(') + ")" if not parents: return self.limit = parents self.save() return True def generate_center(self, force=False): if not force and (self.center or not self.limit): return self.center = self.limit.centroid if not self.center: return False self.save() return True def generate_area(self, force=False): if not force and (self.surface or not self.limit): return self.surface = self.limit.transform(settings.SURFACE_SRID, clone=True).area if not self.surface: return False self.save() return True def update_town_code(self): if not self.numero_insee or not self.children.count() or not self.year: return old_num = self.numero_insee[:] numero = old_num.split('-')[0] self.numero_insee = u"{}-{}".format(numero, self.year) if self.numero_insee != old_num: return True def _generate_cached_label(self): cached_label = self.name if settings.COUNTRY == "fr": cached_label = u"%s - %s" % (self.name, self.numero_insee[:2]) if self.year: cached_label += " ({})".format(self.year) return cached_label def post_save_town(sender, **kwargs): cached_label_changed(sender, **kwargs) town = kwargs['instance'] town.generate_geo() if town.update_town_code(): town.save() post_save.connect(post_save_town, sender=Town) def town_child_changed(sender, **kwargs): town = kwargs['instance'] if town.update_town_code(): town.save() m2m_changed.connect(town_child_changed, sender=Town.children.through) class OperationType(GeneralType): order = models.IntegerField(_(u"Order"), default=1) preventive = models.BooleanField(_(u"Is preventive"), default=True) class Meta: verbose_name = _(u"Operation type") verbose_name_plural = _(u"Operation types") ordering = ['-preventive', 'order', 'label'] @classmethod def get_types(cls, dct={}, instances=False, exclude=[], empty_first=True, default=None, initial=[]): tuples = [] dct['available'] = True if not instances and empty_first and not default: tuples.append(('', '--')) if default and not instances: try: default = cls.objects.get(txt_idx=default) tuples.append((default.pk, _(unicode(default)))) except cls.DoesNotExist: pass items = cls.objects.filter(**dct) if default and not instances: exclude.append(default.txt_idx) if exclude: items = items.exclude(txt_idx__in=exclude) current_preventive, current_lst = None, None item_list = list(items.order_by(*cls._meta.ordering).all()) new_vals = cls._get_initial_types(initial, [i.pk for i in item_list], instance=True) item_list += new_vals for item in item_list: item.rank = 0 if instances: return item_list for item in item_list: if not current_lst or item.preventive != current_preventive: if current_lst: tuples.append(current_lst) current_lst = [_(u"Preventive") if item.preventive else _(u"Research"), []] current_preventive = item.preventive current_lst[1].append((item.pk, _(unicode(item)))) if current_lst: tuples.append(current_lst) return tuples @classmethod def is_preventive(cls, ope_type_id, key=''): try: op_type = cls.objects.get(pk=ope_type_id) except cls.DoesNotExist: return False if not key: return op_type.preventive return key == op_type.txt_idx post_save.connect(post_save_cache, sender=OperationType) post_delete.connect(post_save_cache, sender=OperationType) class SpatialReferenceSystem(GeneralType): order = models.IntegerField(_(u"Order"), default=10) auth_name = models.CharField( _(u"Authority name"), default=u'EPSG', max_length=256) srid = models.IntegerField(_(u"Authority SRID")) class Meta: verbose_name = _(u"Spatial reference system") verbose_name_plural = _(u"Spatial reference systems") ordering = ('label',) post_save.connect(post_save_cache, sender=SpatialReferenceSystem) post_delete.connect(post_save_cache, sender=SpatialReferenceSystem) class AdministrationScript(models.Model): path = models.CharField(_(u"Filename"), max_length=30) name = models.TextField(_(u"Name"), null=True, blank=True) class Meta: verbose_name = _(u"Administration script") verbose_name_plural = _(u"Administration scripts") ordering = ['name'] def __unicode__(self): return unicode(self.name) SCRIPT_STATE = (("S", _(u"Scheduled")), ("P", _(u"In progress")), ("FE", _(u"Finished with errors")), ("F", _(u"Finished")), ) SCRIPT_STATE_DCT = dict(SCRIPT_STATE) class AdministrationTask(models.Model): script = models.ForeignKey(AdministrationScript) state = models.CharField(_(u"State"), max_length=2, choices=SCRIPT_STATE, default='S') creation_date = models.DateTimeField(default=datetime.datetime.now) launch_date = models.DateTimeField(null=True, blank=True) finished_date = models.DateTimeField(null=True, blank=True) result = models.TextField(_(u"Result"), null=True, blank=True) class Meta: verbose_name = _(u"Administration task") verbose_name_plural = _(u"Administration tasks") ordering = ['script'] def __unicode__(self): state = _(u"Unknown") if self.state in SCRIPT_STATE_DCT: state = unicode(SCRIPT_STATE_DCT[self.state]) return u"{} - {} - {}".format(self.script, self.creation_date, state) def execute(self): if self.state != 'S': return self.launch_date = datetime.datetime.now() script_dir = settings.ISHTAR_SCRIPT_DIR if not script_dir: self.result = unicode( _(u"ISHTAR_SCRIPT_DIR is not set in your " u"local_settings. Contact your administrator.")) self.state = 'FE' self.finished_date = datetime.datetime.now() self.save() return if '..' in script_dir: self.result = unicode( _(u"Your ISHTAR_SCRIPT_DIR is containing " u"dots \"..\". As it can refer to relative " u"paths, it can be a security issue and this is " u"not allowed. Only put a full path.")) self.state = 'FE' self.finished_date = datetime.datetime.now() self.save() return if not os.path.isdir(script_dir): self.result = unicode( _(u"Your ISHTAR_SCRIPT_DIR: \"{}\" is not a valid directory.") ).format(script_dir) self.state = 'FE' self.finished_date = datetime.datetime.now() self.save() return script_name = None # only script inside the script directory can be executed for name in os.listdir(script_dir): if name == self.script.path: if isfile(join(script_dir, name)): script_name = join(script_dir, name) break if not script_name: self.result = unicode( _(u"Script \"{}\" is not available in your script directory. " u"Check your configuration.") ).format(self.script.path) self.state = 'FE' self.finished_date = datetime.datetime.now() self.save() return self.state = 'P' self.save() self.finished_date = datetime.datetime.now() try: session = Popen([script_name], stdout=PIPE, stderr=PIPE) stdout, stderr = session.communicate() except OSError as e: self.state = 'FE' self.result = u"Error executing \"{}\" script: {}".format( self.script.path, e) self.save() return self.finished_date = datetime.datetime.now() if stderr: self.state = 'FE' self.result = u"Error: {}".format(stderr) else: self.state = 'F' self.result = u"{}".format(stdout) self.save()