#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2010-2017 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. """ Models description """ import copy import datetime import inspect from jinja2 import TemplateSyntaxError import logging import os import re import shutil import tempfile import time from cStringIO import StringIO from subprocess import Popen, PIPE from PIL import Image from django.conf import settings from django.contrib.auth.models import User, Group from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.contenttypes.models import ContentType from django.contrib.gis.db import models from django.contrib.postgres.fields import JSONField from django.contrib.postgres.search import SearchVectorField, SearchVector from django.core.cache import cache from django.core.exceptions import ObjectDoesNotExist, ValidationError from django.core.files.uploadedfile import SimpleUploadedFile from django.core.urlresolvers import reverse, NoReverseMatch from django.core.validators import validate_slug from django.db import connection from django.db.models import Q, Max, Count from django.db.models.signals import post_save, post_delete, m2m_changed from django.db.utils import DatabaseError from django.template.defaultfilters import slugify from django.utils.functional import lazy from django.utils.safestring import SafeUnicode, mark_safe from django.utils.translation import ugettext_lazy as _, ugettext, \ pgettext_lazy, activate, deactivate from secretary import Renderer as SecretaryRenderer from simple_history.models import HistoricalRecords as BaseHistoricalRecords from unidecode import unidecode from ishtar_common.alternative_configs import ALTERNATE_CONFIGS, \ ALTERNATE_CONFIGS_CHOICES from ishtar_common.model_merging import merge_model_objects from ishtar_common.models_imports import ImporterModel, ImporterType, \ ImporterDefault, ImporterDefaultValues, ImporterColumn, \ ImporterDuplicateField, Regexp, ImportTarget, TargetKey, FormaterType, \ Import, TargetKeyGroup from ishtar_common.utils import get_cache, disable_for_loaddata, create_slug, \ get_all_field_names, merge_tsvectors, cached_label_changed, \ generate_relation_graph __all__ = [ 'ImporterModel', 'ImporterType', 'ImporterDefault', 'ImporterDefaultValues', 'ImporterColumn', 'ImporterDuplicateField', 'Regexp', 'ImportTarget', 'TargetKey', 'FormaterType', 'Import', 'TargetKeyGroup' ] logger = logging.getLogger(__name__) def post_save_user(sender, **kwargs): user = kwargs['instance'] if kwargs["created"]: try: IshtarUser.create_from_user(user) except DatabaseError: # manage when db is not synced pass IshtarUser.set_superuser(user) post_save.connect(post_save_user, sender=User) class ValueGetter(object): _prefix = "" GET_VALUES_EXTRA = [] COL_LABELS = {} GET_VALUE_EXCLUDE_FIELDS = ['search_vector', 'id'] def get_values(self, prefix=''): if not prefix: prefix = self._prefix values = {} for field_name in get_all_field_names(self): if not hasattr(self, field_name) or \ field_name in self.GET_VALUE_EXCLUDE_FIELDS or \ field_name.endswith('_id'): continue value = getattr(self, field_name) if hasattr(value, 'get_values'): values.update(value.get_values(prefix + field_name + '_')) else: values[prefix + field_name] = value for extra_field in self.GET_VALUES_EXTRA: values[prefix + extra_field] = getattr(self, extra_field) or '' for key in values.keys(): val = values[key] if val is None: val = '' else: val = unicode(val) if val.endswith('.None'): val = '' values[key] = val values['KEYS'] = u'\n'.join(values.keys()) value_list = [] for key in values.keys(): if key in ('KEYS', 'VALUES'): continue value_list.append((key, unicode(values[key]))) values['VALUES'] = u'\n'.join( [u"%s: %s" % (k, v) for k, v in sorted(value_list, key=lambda x:x[0])]) for global_var in GlobalVar.objects.all(): values[global_var.slug] = global_var.value or "" return values @classmethod def get_empty_values(cls, prefix=''): if not prefix: prefix = cls._prefix values = {} for field_name in get_all_field_names(cls): values[prefix + field_name] = '' return values class HistoricalRecords(BaseHistoricalRecords): def create_historical_record(self, instance, type): try: history_modifier = getattr(instance, 'history_modifier', None) assert history_modifier except (User.DoesNotExist, AssertionError): # on batch removing of users, user could have disappeared return manager = getattr(instance, self.manager_name) attrs = {} for field in instance._meta.fields: attrs[field.attname] = getattr(instance, field.attname) q_history = instance.history\ .filter(history_modifier_id=history_modifier.pk)\ .order_by('-history_date', '-history_id') if not q_history.count(): manager.create(history_type=type, history_date=datetime.datetime.now(), **attrs) return old_instance = q_history.all()[0] # multiple saving by the same user in a very short time are generaly # caused by post_save signals it is not relevant to keep them min_history_date = datetime.datetime.now() \ - datetime.timedelta(seconds=5) q = q_history.filter(history_date__isnull=False, history_date__gt=min_history_date)\ .order_by('-history_date', '-history_id') if q.count(): return if 'history_date' not in attrs or not attrs['history_date']: attrs['history_date'] = datetime.datetime.now() # record a new version only if data have been changed for field in instance._meta.fields: if getattr(old_instance, field.attname) != attrs[field.attname]: manager.create(history_type=type, **attrs) return def valid_id(cls): # valid ID validator for models def func(value): try: cls.objects.get(pk=value) except ObjectDoesNotExist: raise ValidationError(_(u"Not a valid item.")) return func def valid_ids(cls): def func(value): if "," in value: value = value.split(",") if type(value) not in (list, tuple): value = [value] for v in value: try: cls.objects.get(pk=v) except ObjectDoesNotExist: raise ValidationError( _(u"A selected item is not a valid item.")) return func def is_unique(cls, field): # unique validator for models def func(value): query = {field: value} try: assert cls.objects.filter(**query).count() == 0 except AssertionError: raise ValidationError(_(u"This item already exists.")) return func class OwnPerms(object): """ Manage special permissions for object's owner """ @classmethod def get_query_owns(cls, user): """ Query object to get own items """ return None # implement for each object def is_own(self, user): """ Check if the current object is owned by the user """ query = self.get_query_owns(user) if not query: return False query &= Q(pk=self.pk) return self.__class__.objects.filter(query).count() @classmethod def has_item_of(cls, user): """ Check if the user own some items """ query = cls.get_query_owns(user) if not query: return False return cls.objects.filter(query).count() @classmethod def _return_get_owns(cls, owns, values, get_short_menu_class, label_key='cached_label'): sorted_values = [] if hasattr(cls, 'BASKET_MODEL'): owns_len = len(owns) for idx, item in enumerate(reversed(owns)): if get_short_menu_class: item = item[0] if type(item) == cls.BASKET_MODEL: basket = owns.pop(owns_len - idx - 1) sorted_values.append(basket) sorted_values = list(reversed(sorted_values)) if not values: if not get_short_menu_class: return sorted_values + list( sorted(owns, key=lambda x: getattr(x, label_key))) return sorted_values + list( sorted(owns, key=lambda x: getattr(x[0], label_key))) if not get_short_menu_class: return sorted_values + list( sorted(owns, key=lambda x: x[label_key])) return sorted_values + list( sorted(owns, key=lambda x: x[0][label_key])) @classmethod def get_owns(cls, user, replace_query={}, limit=None, values=None, get_short_menu_class=False): """ Get Own items """ if hasattr(user, 'is_authenticated') and not user.is_authenticated(): returned = cls.objects.filter(pk__isnull=True) if values: returned = [] return returned if isinstance(user, User): try: user = IshtarUser.objects.get(user_ptr=user) except IshtarUser.DoesNotExist: returned = cls.objects.filter(pk__isnull=True) if values: returned = [] return returned items = [] if hasattr(cls, 'BASKET_MODEL'): items = list(cls.BASKET_MODEL.objects.filter(user=user).all()) query = cls.get_query_owns(user) if not query and not replace_query: returned = cls.objects.filter(pk__isnull=True) if values: returned = [] return returned if query: q = cls.objects.filter(query) else: # replace_query q = cls.objects.filter(replace_query) if values: q = q.values(*values) if limit: items += list(q.order_by('-pk')[:limit]) else: items += list(q.order_by(*cls._meta.ordering).all()) if get_short_menu_class: if values: if 'id' not in values: raise NotImplementedError( "Call of get_owns with get_short_menu_class option and" " no 'id' in values is not implemented") my_items = [] for i in items: if hasattr(cls, 'BASKET_MODEL') and \ type(i) == cls.BASKET_MODEL: dct = dict([(k, getattr(i, k)) for k in values]) my_items.append( (dct, cls.BASKET_MODEL.get_short_menu_class(i.pk))) else: my_items.append((i, cls.get_short_menu_class(i['id']))) items = my_items else: items = [(i, cls.get_short_menu_class(i.pk)) for i in items] return items @classmethod def _get_query_owns_dicts(cls, ishtaruser): """ List of query own dict to construct the query. Each dict are join with an AND operator, each dict key, values are joined with OR operator """ return [] @classmethod def _construct_query_own(cls, prefix, dct_list): q = None for subquery_dict in dct_list: subquery = None for k in subquery_dict: subsubquery = Q(**{prefix + k: subquery_dict[k]}) if subquery: subquery |= subsubquery else: subquery = subsubquery if not subquery: continue if q: q &= subquery else: q = subquery return q class Cached(object): slug_field = 'txt_idx' @classmethod def refresh_cache(cls): cache_ckey, current_keys = get_cache(cls, ['_current_keys']) if not current_keys: return for keys in current_keys: if len(keys) == 2 and keys[0] == '__slug': cls.get_cache(keys[1], force=True) elif keys[0] == '__get_types': default = None empty_first = True exclude = [] if len(keys) >= 2: default = keys.pop() if len(keys) > 1: empty_first = bool(keys.pop()) exclude = keys[1:] cls.get_types( exclude=exclude, empty_first=empty_first, default=default, force=True) elif keys[0] == '__get_help': cls.get_help(force=True) @classmethod def _add_cache_key_to_refresh(cls, keys): cache_ckey, current_keys = get_cache(cls, ['_current_keys']) if type(current_keys) != list: current_keys = [] if keys not in current_keys: current_keys.append(keys) cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT) @classmethod def get_cache(cls, slug, force=False): cache_key, value = get_cache(cls, ['__slug', slug]) if not force and value: return value try: k = {cls.slug_field: slug} obj = cls.objects.get(**k) cache.set(cache_key, obj, settings.CACHE_TIMEOUT) return obj except cls.DoesNotExist: cache.set(cache_key, None, settings.CACHE_TIMEOUT) return None @disable_for_loaddata def post_save_cache(sender, **kwargs): sender.refresh_cache() class SlugModelManager(models.Manager): def get_by_natural_key(self, slug): return self.get(slug=slug) class TypeManager(models.Manager): def get_by_natural_key(self, txt_idx): return self.get(txt_idx=txt_idx) class GeneralType(Cached, models.Model): """ Abstract class for "types" """ label = models.TextField(_(u"Label")) txt_idx = models.TextField( _(u"Textual ID"), validators=[validate_slug], unique=True, help_text=_( u"The slug is the standardized version of the name. It contains " u"only lowercase letters, numbers and hyphens. Each slug must " u"be unique.")) comment = models.TextField(_(u"Comment"), blank=True, null=True) available = models.BooleanField(_(u"Available"), default=True) HELP_TEXT = u"" objects = TypeManager() class Meta: abstract = True def __unicode__(self): return self.label def natural_key(self): return (self.txt_idx, ) @property def explicit_label(self): return u"{} ({})".format(self.label, self._meta.verbose_name) @classmethod def create_default_for_test(cls): return [cls.objects.create(label='Test %d' % i) for i in range(5)] @property def short_label(self): return self.label @property def name(self): return self.label @classmethod def get_or_create(cls, slug, label=''): """ Get or create a new item. :param slug: textual id :param label: label for initialization if the item doesn't exist (not mandatory) :return: instancied item of the base class """ item = cls.get_cache(slug) if item: return item item, created = cls.objects.get_or_create( txt_idx=slug, defaults={'label': label}) return item @classmethod def get_or_create_pk(cls, slug): """ Get an id from a slug. Create the associated item if needed. :param slug: textual id :return: id of the item (string) """ return unicode(cls.get_or_create(slug).pk) @classmethod def get_or_create_pks(cls, slugs): """ Get and merge a list of ids from a slug list. Create the associated items if needed. :param slugs: textual ids :return: string with ids separated by "_" """ items = [] for slug in slugs: items.append(str(cls.get_or_create(slug).pk)) return u"_".join(items) @classmethod def get_help(cls, dct={}, exclude=[], force=False): keys = ['__get_help'] keys += [u"{}".format(ex) for ex in exclude] keys += [u'{}-{}'.format(unicode(k), dct[k]) for k in dct] cache_key, value = get_cache(cls, keys) if value and not force: return mark_safe(value) help_text = cls.HELP_TEXT c_rank = -1 help_items = u"\n" for item in cls.get_types(dct=dct, instances=True, exclude=exclude): if hasattr(item, '__iter__'): pk = item[0] item = cls.objects.get(pk=pk) item.rank = c_rank + 1 if hasattr(item, 'parent'): c_item = item parents = [] while c_item.parent: parents.append(c_item.parent.label) c_item = c_item.parent parents.reverse() parents.append(item.label) item.label = u" / ".join(parents) if not item.comment: continue if c_rank > item.rank: help_items += u"\n" elif c_rank < item.rank: help_items += u"
\n" c_rank = item.rank help_items += u"
%s
%s
" % ( item.label, u"
".join(item.comment.split('\n'))) c_rank += 1 if c_rank: help_items += c_rank * u"
" if help_text or help_items != u'\n': help_text = help_text + help_items else: help_text = u"" cache.set(cache_key, help_text, settings.CACHE_TIMEOUT) return mark_safe(help_text) @classmethod def _get_initial_types(cls, initial, type_pks, instance=False): new_vals = [] if not initial: return [] if type(initial) not in (list, tuple): initial = [initial] for value in initial: try: pk = int(value) except (ValueError, TypeError): continue if pk in type_pks: continue try: extra_type = cls.objects.get(pk=pk) if instance: new_vals.append(extra_type) else: new_vals.append((extra_type.pk, unicode(extra_type))) except cls.DoesNotExist: continue return new_vals @classmethod def get_types(cls, dct={}, instances=False, exclude=[], empty_first=True, default=None, initial=None, force=False): types = [] if not instances and empty_first and not default: types = [('', '--')] types += cls._pre_get_types(dct, instances, exclude, default, force) if not initial: return types new_vals = cls._get_initial_types(initial, [idx for idx, lbl in types]) types += new_vals return types @classmethod def _pre_get_types(cls, dct={}, instances=False, exclude=[], default=None, force=False): # cache cache_key = None if not instances: keys = ['__get_types'] keys += [u"{}".format(ex) for ex in exclude] + \ [u"{}".format(default)] keys += [u'{}-{}'.format(unicode(k), dct[k]) for k in dct] cache_key, value = get_cache(cls, keys) if value and not force: return value base_dct = dct.copy() if hasattr(cls, 'parent'): if not cache_key: return cls._get_parent_types( base_dct, instances, exclude=exclude, default=default) vals = [v for v in cls._get_parent_types( base_dct, instances, exclude=exclude, default=default)] cache.set(cache_key, vals, settings.CACHE_TIMEOUT) return vals if not cache_key: return cls._get_types(base_dct, instances, exclude=exclude, default=default) vals = [v for v in cls._get_types( base_dct, instances, exclude=exclude, default=default)] cache.set(cache_key, vals, settings.CACHE_TIMEOUT) return vals @classmethod def _get_types(cls, dct={}, instances=False, exclude=[], default=None): dct['available'] = True if default: try: default = cls.objects.get(txt_idx=default) yield(default.pk, _(unicode(default))) except cls.DoesNotExist: pass items = cls.objects.filter(**dct) if default and default != "None": if hasattr(default, 'txt_idx'): exclude.append(default.txt_idx) else: exclude.append(default) if exclude: items = items.exclude(txt_idx__in=exclude) for item in items.order_by(*cls._meta.ordering).all(): if instances: item.rank = 0 yield item else: yield (item.pk, _(unicode(item)) if item and unicode(item) else '') PREFIX = "│ " PREFIX_EMPTY = "  " PREFIX_MEDIUM = "├ " PREFIX_LAST = "└ " PREFIX_CODES = [u"\u2502", u"\u251C", u"\u2514"] @classmethod def _get_childs(cls, item, dct, prefix=0, instances=False, exclude=[], is_last=False, last_of=[]): prefix += 1 dct['parent'] = item childs = cls.objects.filter(**dct) if exclude: childs = childs.exclude(txt_idx__in=exclude) if hasattr(cls, 'order'): childs = childs.order_by('order') lst = [] child_lst = childs.all() total = len(child_lst) for idx, child in enumerate(child_lst): mylast_of = last_of[:] if instances: child.rank = prefix lst.append(child) else: p = '' cprefix = prefix while cprefix: cprefix -= 1 if not cprefix: if (idx + 1) == total: p += cls.PREFIX_LAST else: p += cls.PREFIX_MEDIUM elif is_last: if mylast_of: clast = mylast_of.pop(0) if clast: p += cls.PREFIX_EMPTY else: p += cls.PREFIX else: p += cls.PREFIX_EMPTY else: p += cls.PREFIX lst.append(( child.pk, SafeUnicode(p + unicode(_(unicode(child)))) )) clast_of = last_of[:] clast_of.append(idx + 1 == total) for sub_child in cls._get_childs( child, dct, prefix, instances, exclude=exclude, is_last=((idx + 1) == total), last_of=clast_of): lst.append(sub_child) return lst @classmethod def _get_parent_types(cls, dct={}, instances=False, exclude=[], default=None): dct['available'] = True dct['parent'] = None items = cls.objects.filter(**dct) if exclude: items = items.exclude(txt_idx__in=exclude) if hasattr(cls, 'order'): items = items.order_by('order') for item in items.all(): if instances: item.rank = 0 yield item else: yield (item.pk, unicode(item)) for child in cls._get_childs(item, dct, instances, exclude=exclude): yield child def save(self, *args, **kwargs): if not self.id and not self.label: self.label = u" ".join(u" ".join(self.txt_idx.split('-')) .split('_')).title() if not self.txt_idx: self.txt_idx = slugify(self.label)[:100] # clean old keys if self.pk: old = self.__class__.objects.get(pk=self.pk) content_type = ContentType.objects.get_for_model(self.__class__) if slugify(self.label) != slugify(old.label): ItemKey.objects.filter( object_id=self.pk, key=slugify(old.label), content_type=content_type).delete() if self.txt_idx != old.txt_idx: ItemKey.objects.filter( object_id=self.pk, key=old.txt_idx, content_type=content_type).delete() obj = super(GeneralType, self).save(*args, **kwargs) self.generate_key(force=True) return obj def add_key(self, key, force=False, importer=None, group=None, user=None): content_type = ContentType.objects.get_for_model(self.__class__) if not importer and not force and ItemKey.objects.filter( key=key, content_type=content_type).count(): return filtr = {'key': key, 'content_type': content_type} if group: filtr['group'] = group elif user: filtr['user'] = user else: filtr['importer'] = importer if force: ItemKey.objects.filter(**filtr).exclude(object_id=self.pk).delete() filtr['object_id'] = self.pk ItemKey.objects.get_or_create(**filtr) def generate_key(self, force=False): for key in (slugify(self.label), self.txt_idx): self.add_key(key) def get_keys(self, importer): keys = [self.txt_idx] content_type = ContentType.objects.get_for_model(self.__class__) base_q = Q(content_type=content_type, object_id=self.pk) subquery = Q(importer__isnull=True, user__isnull=True, group__isnull=True) subquery |= Q(user__isnull=True, group__isnull=True, importer=importer) if importer.user: subquery |= Q(user=importer.user, group__isnull=True, importer=importer) if importer.associated_group: subquery |= Q(user__isnull=True, group=importer.associated_group, importer=importer) q = ItemKey.objects.filter(base_q & subquery) for ik in q.exclude(key=self.txt_idx).all(): keys.append(ik.key) return keys @classmethod def generate_keys(cls): # content_type = ContentType.objects.get_for_model(cls) for item in cls.objects.all(): item.generate_key() class HierarchicalType(GeneralType): parent = models.ForeignKey('self', blank=True, null=True, verbose_name=_(u"Parent")) class Meta: abstract = True def full_label(self): lbls = [self.label] item = self while item.parent: item = item.parent lbls.append(item.label) return u" > ".join(reversed(lbls)) class ItemKey(models.Model): key = models.TextField(_(u"Key")) content_type = models.ForeignKey(ContentType) object_id = models.PositiveIntegerField() content_object = GenericForeignKey('content_type', 'object_id') importer = models.ForeignKey( Import, null=True, blank=True, help_text=_(u"Specific key to an import")) user = models.ForeignKey('IshtarUser', blank=True, null=True) group = models.ForeignKey(TargetKeyGroup, blank=True, null=True) def __unicode__(self): return self.key def get_image_path(instance, filename): # when using migrations instance is not a real ImageModel instance if not hasattr(instance, '_get_image_path'): return u"upload/{}".format(filename) return instance._get_image_path(filename) class ImageModel(models.Model): image = models.ImageField(upload_to=get_image_path, blank=True, null=True, max_length=255) thumbnail = models.ImageField(upload_to=get_image_path, blank=True, null=True, max_length=255) IMAGE_MAX_SIZE = settings.IMAGE_MAX_SIZE THUMB_MAX_SIZE = settings.THUMB_MAX_SIZE IMAGE_PREFIX = '' class Meta: abstract = True def _get_image_path(self, filename): return u"{}/{}".format(self._get_base_image_path(), filename) def _get_base_image_path(self): return u"upload" def has_changed(self, field): if not self.pk: return True manager = getattr(self.__class__, 'objects') old = getattr(manager.get(pk=self.pk), field) return not getattr(self, field) == old def create_thumb(self, image, size): """Returns the image resized to fit inside a box of the given size""" image.thumbnail(size, Image.ANTIALIAS) temp = StringIO() image.save(temp, 'jpeg') temp.seek(0) return SimpleUploadedFile('temp', temp.read()) def save(self, *args, **kwargs): if 'force_copy' in kwargs: kwargs.pop('force_copy') super(ImageModel, self).save(*args, **kwargs) return # manage images if not self.has_changed('image'): return super(ImageModel, self).save(*args, **kwargs) if not self.image: self.thumbnail = None return super(ImageModel, self).save(*args, **kwargs) # # generate thumbnail # convert to jpg filename = os.path.splitext(os.path.split(self.image.name)[-1])[0] old_path = self.image.path filename = "%s.jpg" % filename try: image = Image.open(self.image.file) # convert to RGB if image.mode not in ('L', 'RGB'): image = image.convert('RGB') # resize if necessary if self.IMAGE_MAX_SIZE: self.image.save(filename, self.create_thumb(image, self.IMAGE_MAX_SIZE), save=False) if old_path != self.image.path: try: os.remove(old_path) except OSError: # already clean pass # save the thumbnail thumb_filename = self._get_thumb_name(filename) self.thumbnail.save( thumb_filename, self.create_thumb(image, self.THUMB_MAX_SIZE), save=False) except IOError: pass return super(ImageModel, self).save(*args, **kwargs) def _get_thumb_name(self, filename): splited = filename.split('.') return u"{}-thumb.{}".format( u".".join(splited[:-1]), splited[-1] ) class HistoryError(Exception): def __init__(self, value): self.value = value def __str__(self): return repr(self.value) PRIVATE_FIELDS = ('id', 'history_modifier', 'order') class BulkUpdatedItem(object): @classmethod def bulk_recursion(cls, transaction_id, extra_args): """ Prevent infinite recursion. Should not happen but wrong manipulation in the database or messy imports can generate circular relations :param transaction_id: current transaction ID (unix time) - if null a transaction ID is generated :param extra_args: arguments dealing with :return: (transaction ID, is a recursion) """ if not transaction_id: transaction_id = unicode(time.time()) args = ['cached_label_bulk_update', transaction_id] + extra_args key, val = get_cache(cls, args) if val: return transaction_id, True cache.set(key, 1, settings.CACHE_SMALLTIMEOUT) return transaction_id, False class RelationItem(models.Model): """ Items with relation between them """ relation_image = models.FileField( _(u"Generated relation image (SVG)"), null=True, blank=True, upload_to=get_image_path ) class Meta: abstract = True def generate_relation_image(self): generate_relation_graph(self) class JsonDataSection(models.Model): content_type = models.ForeignKey(ContentType) name = models.CharField(_(u"Name"), max_length=200) order = models.IntegerField(_(u"Order"), default=10) class Meta: verbose_name = _(u"Json data - Menu") verbose_name_plural = _(u"Json data - Menus") ordering = ['order', 'name'] def __unicode__(self): return u"{} - {}".format(self.content_type, self.name) JSON_VALUE_TYPES = ( ('T', _(u"Text")), ('LT', _(u"Long text")), ('I', _(u"Integer")), ('B', _(u"Boolean")), ('F', _(u"Float")), ('D', _(u"Date")), ('C', _(u"Choices")), ) class JsonDataField(models.Model): name = models.CharField(_(u"Name"), max_length=200) content_type = models.ForeignKey(ContentType) key = models.CharField( _(u"Key"), max_length=200, help_text=_(u"Value of the key in the JSON schema. For hierarchical " u"key use \"__\" to explain it. For instance for the key " u"'my_subkey' with data such as {'my_key': {'my_subkey': " u"'value'}}, its value will be reached with my_key__my_subkey.")) display = models.BooleanField(_(u"Display"), default=True) value_type = models.CharField(_(u"Type"), default="T", max_length=10, choices=JSON_VALUE_TYPES) order = models.IntegerField(_(u"Order"), default=10) search_index = models.BooleanField(_(u"Use in search indexes"), default=False) section = models.ForeignKey(JsonDataSection, blank=True, null=True) custom_forms = models.ManyToManyField( "CustomForm", blank=True, through="CustomFormJsonField") class Meta: verbose_name = _(u"Json data - Field") verbose_name_plural = _(u"Json data - Fields") ordering = ['order', 'name'] def __unicode__(self): return u"{} - {}".format(self.content_type, self.name) def clean(self): if not self.section: return if self.section.content_type != self.content_type: raise ValidationError( _(u"Content types of the field and of the menu do not match")) class JsonData(models.Model): data = JSONField(default={}, db_index=True, blank=True) class Meta: abstract = True def pre_save(self): if not self.data: self.data = {} @property def json_sections(self): sections = [] try: content_type = ContentType.objects.get_for_model(self) except ContentType.DoesNotExists: return sections fields = list(JsonDataField.objects.filter( content_type=content_type, display=True, section__isnull=True ).all()) # no section fields fields += list(JsonDataField.objects.filter( content_type=content_type, display=True, section__isnull=False ).order_by('section__order', 'order').all()) for field in fields: value = None data = self.data.copy() for key in field.key.split('__'): if key in data: value = copy.copy(data[key]) data = data[key] else: value = None break if value is None: continue if type(value) in (list, tuple): value = u" ; ".join([unicode(v) for v in value]) section_name = field.section.name if field.section else None if not sections or section_name != sections[-1][0]: # if section name is identical it is the same sections.append((section_name, [])) sections[-1][1].append((field.name, value)) return sections class Imported(models.Model): imports = models.ManyToManyField( Import, blank=True, related_name="imported_%(app_label)s_%(class)s") class Meta: abstract = True class FullSearch(models.Model): search_vector = SearchVectorField(_("Search vector"), blank=True, null=True, help_text=_("Auto filled at save")) BASE_SEARCH_VECTORS = [] PROPERTY_SEARCH_VECTORS = [] INT_SEARCH_VECTORS = [] M2M_SEARCH_VECTORS = [] PARENT_SEARCH_VECTORS = [] # prevent circular dependency PARENT_ONLY_SEARCH_VECTORS = [] class Meta: abstract = True @classmethod def general_types(cls): for k in get_all_field_names(cls): field = cls._meta.get_field(k) if not hasattr(field, 'rel') or not field.rel: continue rel_model = field.rel.to if issubclass(rel_model, (GeneralType, HierarchicalType)): yield k def update_search_vector(self, save=True, exclude_parent=False): """ Update the search vector :param save: True if you want to save the object immediately :return: True if modified """ if not hasattr(self, 'search_vector'): return if not self.pk: # logger.warning("Cannot update search vector before save or " # "after deletion.") return if not self.BASE_SEARCH_VECTORS and not self.M2M_SEARCH_VECTORS \ and not self.INT_SEARCH_VECTORS \ and not self.PROPERTY_SEARCH_VECTORS \ and not self.PARENT_SEARCH_VECTORS: logger.warning("No search_vectors defined for {}".format( self.__class__)) return if getattr(self, '_search_updated', None): return self._search_updated = True old_search = "" if self.search_vector: old_search = self.search_vector[:] search_vectors = [] base_q = self.__class__.objects.filter(pk=self.pk) # many to many have to be queried one by one otherwise only one is fetch for M2M_SEARCH_VECTOR in self.M2M_SEARCH_VECTORS: key = M2M_SEARCH_VECTOR.split('__')[0] rel_key = getattr(self, key) for item in rel_key.values('pk').all(): query_dct = {key + "__pk": item['pk']} q = copy.copy(base_q).filter(**query_dct) q = q.annotate( search=SearchVector( M2M_SEARCH_VECTOR, config=settings.ISHTAR_SEARCH_LANGUAGE) ).values('search') search_vectors.append(q.all()[0]['search']) # int/float are not well managed by the SearchVector for INT_SEARCH_VECTOR in self.INT_SEARCH_VECTORS: q = base_q.values(INT_SEARCH_VECTOR) search_vectors.append( "'{}':1".format(q.all()[0][INT_SEARCH_VECTOR])) if not exclude_parent: # copy parent vector fields for PARENT_SEARCH_VECTOR in self.PARENT_SEARCH_VECTORS: parent = getattr(self, PARENT_SEARCH_VECTOR) if hasattr(parent, 'all'): # m2m for p in parent.all(): search_vectors.append(p.search_vector) elif parent: search_vectors.append(parent.search_vector) for PARENT_ONLY_SEARCH_VECTOR in self.PARENT_ONLY_SEARCH_VECTORS: parent = getattr(self, PARENT_ONLY_SEARCH_VECTOR) if hasattr(parent, 'all'): # m2m for p in parent.all(): search_vectors.append( p.update_search_vector(save=False, exclude_parent=True) ) elif parent: search_vectors.append( parent.update_search_vector(save=False, exclude_parent=True) ) if self.BASE_SEARCH_VECTORS: # query "simple" fields q = base_q.annotate( search=SearchVector( *self.BASE_SEARCH_VECTORS, config=settings.ISHTAR_SEARCH_LANGUAGE )).values('search') search_vectors.append( unidecode( q.all()[0]['search'].decode('utf-8') ) ) if self.PROPERTY_SEARCH_VECTORS: for attr in self.PROPERTY_SEARCH_VECTORS: data = getattr(self, attr) if callable(data): data = data() if not data: continue data = unicode(data) with connection.cursor() as cursor: cursor.execute("SELECT to_tsvector(%s)", [data]) row = cursor.fetchone() search_vectors.append(row[0]) if hasattr(self, 'data') and self.data: content_type = ContentType.objects.get_for_model(self) for json_field in JsonDataField.objects.filter( content_type=content_type, search_index=True).all(): data = copy.deepcopy(self.data) no_data = False for key in json_field.key.split('__'): if key not in data: no_data = True break data = data[key] if no_data: continue with connection.cursor() as cursor: cursor.execute("SELECT to_tsvector(%s)", [data]) row = cursor.fetchone() search_vectors.append(row[0]) new_search_vector = merge_tsvectors(search_vectors) changed = old_search != new_search_vector if save and changed: self.search_vector = new_search_vector self.skip_history_when_saving = True self.save() elif not save: return new_search_vector return changed class FixAssociated(object): ASSOCIATED = {} def fix_associated(self): for key in self.ASSOCIATED: item = getattr(self, key) if not item: continue dct = self.ASSOCIATED[key] for dct_key in dct: subkey, ctype = dct_key expected_values = dct[dct_key] if not isinstance(expected_values, (list, tuple)): expected_values = [expected_values] if hasattr(ctype, "txt_idx"): try: expected_values = [ctype.objects.get(txt_idx=v) for v in expected_values] except ctype.DoesNotExist: # type not yet initialized return current_vals = getattr(item, subkey) is_many = False if hasattr(current_vals, "all"): is_many = True current_vals = current_vals.all() else: current_vals = [current_vals] is_ok = False for current_val in current_vals: if current_val in expected_values: is_ok = True break if is_ok: continue # the first value is used new_value = expected_values[0] if is_many: getattr(item, subkey).add(new_value) else: setattr(item, subkey, new_value) class DocumentItem(object): @property def images(self): if not hasattr(self, 'documents'): return Document.objects.none() return self.documents.filter( image__isnull=False).exclude(image="").order_by("pk") def get_extra_actions(self, request): """ For sheet template: return "Add document / image" action """ # url, base_text, icon, extra_text, extra css class actions = [] if not hasattr(self, 'SLUG'): return actions can_add_doc = request.user.ishtaruser.has_right( 'add_document', request.session) or ( request.user.ishtaruser.has_right( 'add_own_document', request.session) and self.is_own(request.user.ishtaruser) ) if can_add_doc: actions = [ ( reverse("create-document") + "?{}={}".format(self.SLUG, self.pk), _(u"Add document/image"), "fa fa-plus", _(u"doc./image"), "" ) ] return actions class BaseHistorizedItem(DocumentItem, FullSearch, Imported, JsonData, FixAssociated): """ Historized item with external ID management. All historized items are searchable and have a data json field. """ IS_BASKET = False EXTERNAL_ID_KEY = '' EXTERNAL_ID_DEPENDENCIES = [] history_modifier = models.ForeignKey( User, related_name='+', on_delete=models.SET_NULL, verbose_name=_(u"Last editor"), blank=True, null=True) history_creator = models.ForeignKey( User, related_name='+', on_delete=models.SET_NULL, verbose_name=_(u"Creator"), blank=True, null=True) class Meta: abstract = True @classmethod def get_verbose_name(cls): return cls._meta.verbose_name def update_external_id(self, save=False): if not self.EXTERNAL_ID_KEY or ( self.external_id and not self.auto_external_id): return external_id = get_external_id(self.EXTERNAL_ID_KEY, self) if external_id == self.external_id: return self.auto_external_id = True self.external_id = external_id self._cached_label_checked = False if save: self.skip_history_when_saving = True self.save() return external_id def get_previous(self, step=None, date=None, strict=True): """ Get a "step" previous state of the item """ assert step or date historized = self.history.all() item = None if step: if len(historized) <= step: # silently return the last step if too far in the history item = historized[len(historized) - 1] else: item = historized[step] else: for step, item in enumerate(historized): if item.history_date == date: break # ended with no match if item.history_date != date: return item._step = step if len(historized) != (step + 1): item._previous = historized[step + 1].history_date else: item._previous = None if step > 0: item._next = historized[step - 1].history_date else: item._next = None item.history_date = historized[step].history_date model = self.__class__ for k in get_all_field_names(model): field = model._meta.get_field(k) if hasattr(field, 'rel') and field.rel: if not hasattr(item, k + '_id'): setattr(item, k, getattr(self, k)) continue val = getattr(item, k + '_id') if not val: setattr(item, k, None) continue try: val = field.rel.to.objects.get(pk=val) setattr(item, k, val) except ObjectDoesNotExist: if strict: raise HistoryError(u"The class %s has no pk %d" % ( unicode(field.rel.to), val)) setattr(item, k, None) item.pk = self.pk return item @property def last_edition_date(self): try: return self.history.order_by('-history_date').all()[0].history_date except (AttributeError, IndexError): return @property def history_creation_date(self): try: return self.history.order_by('history_date').all()[0].history_date except (AttributeError, IndexError): return def rollback(self, date): """ Rollback to a previous state """ to_del, new_item = [], None for item in self.history.all(): if item.history_date == date: new_item = item break to_del.append(item) if not new_item: raise HistoryError(u"The date to rollback to doesn't exist.") try: for f in self._meta.fields: k = f.name if k != 'id' and hasattr(self, k): if not hasattr(new_item, k): k = k + "_id" setattr(self, k, getattr(new_item, k)) try: self.history_modifier = User.objects.get( pk=new_item.history_modifier_id) except User.ObjectDoesNotExist: pass # force label regeneration self._cached_label_checked = False self.save() except: raise HistoryError(u"The rollback has failed.") # clean the obsolete history for historized_item in to_del: historized_item.delete() def values(self): values = {} for f in self._meta.fields: k = f.name if k != 'id': values[k] = getattr(self, k) return values def get_show_url(self): try: return reverse('show-' + self.__class__.__name__.lower(), args=[self.pk, '']) except NoReverseMatch: return @property def associated_filename(self): if [True for attr in ('get_town_label', 'get_department', 'reference', 'short_class_name') if not hasattr(self, attr)]: return '' items = [slugify(self.get_department()), slugify(self.get_town_label()).upper(), slugify(self.short_class_name), slugify(self.reference), slugify(self.name or '').replace('-', '_').capitalize()] last_edition_date = self.last_edition_date if last_edition_date: items.append(last_edition_date.strftime('%Y%m%d')) else: items.append('00000000') return u"-".join([unicode(item) for item in items]) def save(self, *args, **kwargs): created = not self.pk if not getattr(self, 'skip_history_when_saving', False): assert hasattr(self, 'history_modifier') if created: self.history_creator = self.history_modifier # external ID can have related item not available before save external_id_updated = kwargs.pop('external_id_updated') \ if 'external_id_updated' in kwargs else False if not created and not external_id_updated: self.update_external_id() super(BaseHistorizedItem, self).save(*args, **kwargs) if created and self.update_external_id(): # force resave for external ID creation self.skip_history_when_saving = True self._updated_id = True return self.save(external_id_updated=True) for dep in self.EXTERNAL_ID_DEPENDENCIES: for obj in getattr(self, dep).all(): obj.update_external_id(save=True) self.fix_associated() return True LOGICAL_TYPES = ( ('above', _(u"Above")), ('bellow', _(u"Bellow")), ('equal', _(u"Equal")) ) class GeneralRelationType(GeneralType): order = models.IntegerField(_(u"Order"), default=1) symmetrical = models.BooleanField(_(u"Symmetrical")) tiny_label = models.CharField(_(u"Tiny label"), max_length=50, blank=True, null=True) inverse_relation = models.ForeignKey( 'self', verbose_name=_(u"Inverse relation"), blank=True, null=True) logical_relation = models.CharField( verbose_name=_(u"Logical relation"), max_length=10, choices=LOGICAL_TYPES, blank=True, null=True) class Meta: abstract = True def clean(self): # cannot have symmetrical and an inverse_relation if self.symmetrical and self.inverse_relation: raise ValidationError( _(u"Cannot have symmetrical and an inverse_relation")) def get_tiny_label(self): return self.tiny_label or self.label or u"" def save(self, *args, **kwargs): obj = super(GeneralRelationType, self).save(*args, **kwargs) # after saving check that the inverse_relation of the inverse_relation # point to the saved object if self.inverse_relation \ and (not self.inverse_relation.inverse_relation or self.inverse_relation.inverse_relation != self): self.inverse_relation.inverse_relation = self self.inverse_relation.symmetrical = False self.inverse_relation.save() return obj class GeneralRecordRelations(object): @classmethod def general_types(cls): return ['relation_type'] def save(self, *args, **kwargs): super(GeneralRecordRelations, self).save(*args, **kwargs) # after saving create the symetric or the inverse relation sym_rel_type = self.relation_type if not self.relation_type.symmetrical: sym_rel_type = self.relation_type.inverse_relation # no symetric/inverse is defined if not sym_rel_type: return dct = {'right_record': self.left_record, 'left_record': self.right_record, 'relation_type': sym_rel_type} self.__class__.objects.get_or_create(**dct) return self def post_delete_record_relation(sender, instance, **kwargs): # delete symmetrical or inverse relation sym_rel_type = instance.relation_type if not instance.relation_type.symmetrical: sym_rel_type = instance.relation_type.inverse_relation # no symetric/inverse is defined if not sym_rel_type: return dct = {'right_record_id': instance.left_record_id, 'left_record_id': instance.right_record_id, 'relation_type': sym_rel_type} q = instance.__class__.objects.filter(**dct) if q.count(): q.delete() class SearchQuery(models.Model): label = models.TextField(_(u"Label"), blank=True) query = models.TextField(_(u"Query"), blank=True) content_type = models.ForeignKey(ContentType, verbose_name=_(u"Content type")) profile = models.ForeignKey("UserProfile", verbose_name=_(u"Profile")) is_alert = models.BooleanField(_(u"Is an alert"), default=False) class Meta: verbose_name = _(u"Search query") verbose_name_plural = _(u"Search queries") ordering = ['label'] def __unicode__(self): return unicode(self.label) class ShortMenuItem(object): @classmethod def get_short_menu_class(cls, pk): return '' class LightHistorizedItem(BaseHistorizedItem): history_date = models.DateTimeField(default=datetime.datetime.now) class Meta: abstract = True def save(self, *args, **kwargs): super(LightHistorizedItem, self).save(*args, **kwargs) return self PARSE_FORMULA = re.compile("{([^}]*)}") def _deduplicate(value): new_values = [] for v in value.split(u'-'): if v not in new_values: new_values.append(v) return u'-'.join(new_values) FORMULA_FILTERS = { 'upper': lambda x: x.upper(), 'lower': lambda x: x.lower(), 'capitalize': lambda x: x.capitalize(), 'slug': lambda x: slugify(x), 'deduplicate': _deduplicate } def get_external_id(key, item): profile = get_current_profile() if not hasattr(profile, key): return formula = getattr(profile, key) dct = {} for fkey in PARSE_FORMULA.findall(formula): filtered = fkey.split(u'|') initial_key = fkey[:] fkey = filtered[0] filters = [] for filtr in filtered[1:]: if filtr in FORMULA_FILTERS: filters.append(FORMULA_FILTERS[filtr]) if fkey.startswith('settings__'): dct[fkey] = getattr(settings, fkey[len('settings__'):]) or '' continue obj = item for k in fkey.split('__'): try: obj = getattr(obj, k) except ObjectDoesNotExist: obj = None if callable(obj): obj = obj() if obj is None: break if obj is None: dct[initial_key] = '' else: dct[initial_key] = unicode(obj) for filtr in filters: dct[initial_key] = filtr(dct[initial_key]) values = formula.format(**dct).split('||') value = values[0] for filtr in values[1:]: if filtr not in FORMULA_FILTERS: value += u'||' + filtr continue value = FORMULA_FILTERS[filtr](value) return value CURRENCY = ((u"€", _(u"Euro")), (u"$", _(u"US dollar"))) FIND_INDEX_SOURCE = ((u"O", _(u"Operations")), (u"CR", _(u"Context records"))) SITE_LABELS = [('site', _(u"Site")), ('entity', _(u"Archaeological entity"))] TRANSLATED_SITE_LABELS = { 'site': { 'search': _(u"Site search"), 'new': _(u"New site"), 'modification': _(u"Site modification"), 'deletion': _(u"Site deletion"), }, 'entity': { 'search': _(u"Archaeological entity search"), 'new': _(u"New archaeological entity"), 'modification': _(u"Archaeological entity modification"), 'deletion': _(u"Archaeological entity deletion"), }, } class IshtarSiteProfile(models.Model, Cached): slug_field = 'slug' label = models.TextField(_(u"Name")) slug = models.SlugField(_(u"Slug"), unique=True) active = models.BooleanField(_(u"Current active"), default=False) experimental_feature = models.BooleanField( _(u"Activate experimental feature"), default=False) description = models.TextField(_(u"Description"), null=True, blank=True) config = models.CharField( _(u"Alternate configuration"), max_length=200, choices=ALTERNATE_CONFIGS_CHOICES, help_text=_(u"Choose an alternate configuration for label, " u"index management"), null=True, blank=True ) files = models.BooleanField(_(u"Files module"), default=False) archaeological_site = models.BooleanField( _(u"Archaeological site module"), default=False) archaeological_site_label = models.CharField( _(u"Archaeological site type"), max_length=200, choices=SITE_LABELS, default='site' ) context_record = models.BooleanField(_(u"Context records module"), default=False) find = models.BooleanField(_(u"Finds module"), default=False, help_text=_(u"Need context records module")) find_index = models.CharField( _(u"Find index is based on"), default='O', max_length=2, choices=FIND_INDEX_SOURCE, help_text=_(u"To prevent irrelevant indexes, change this parameter " u"only if there is no find in the database")) warehouse = models.BooleanField( _(u"Warehouses module"), default=False, help_text=_(u"Need finds module")) preservation = models.BooleanField(_(u"Preservation module"), default=False) mapping = models.BooleanField(_(u"Mapping module"), default=False) underwater = models.BooleanField(_(u"Underwater module"), default=False) parcel_mandatory = models.BooleanField( _(u"Parcel are mandatory for context records"), default=True) homepage = models.TextField( _(u"Home page"), null=True, blank=True, help_text=_(u"Homepage of Ishtar - if not defined a default homepage " u"will appear. Use the markdown syntax. {random_image} " u"can be used to display a random image.")) operation_prefix = models.CharField( _(u"Main operation code prefix"), default=u'OA', null=True, blank=True, max_length=20 ) default_operation_prefix = models.CharField( _(u"Default operation code prefix"), default=u'OP', null=True, blank=True, max_length=20 ) operation_region_code = models.CharField( _(u"Operation region code"), null=True, blank=True, max_length=5 ) file_external_id = models.TextField( _(u"File external id"), default=u"{year}-{numeric_reference}", help_text=_(u"Formula to manage file external ID. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) parcel_external_id = models.TextField( _(u"Parcel external id"), default=u"{associated_file__external_id}{operation__code_patriarche}-" u"{town__numero_insee}-{section}{parcel_number}", help_text=_(u"Formula to manage parcel external ID. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) context_record_external_id = models.TextField( _(u"Context record external id"), default=u"{parcel__external_id}-{label}", help_text=_(u"Formula to manage context record external ID. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) base_find_external_id = models.TextField( _(u"Base find external id"), default=u"{context_record__external_id}-{label}", help_text=_(u"Formula to manage base find external ID. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) find_external_id = models.TextField( _(u"Find external id"), default=u"{get_first_base_find__context_record__external_id}-{label}", help_text=_(u"Formula to manage find external ID. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) container_external_id = models.TextField( _(u"Container external id"), default=u"{responsible__external_id}-{index}", help_text=_(u"Formula to manage container external ID. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) warehouse_external_id = models.TextField( _(u"Warehouse external id"), default=u"{name|slug}", help_text=_(u"Formula to manage warehouse external ID. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) person_raw_name = models.TextField( _(u"Raw name for person"), default=u"{name|upper} {surname}", help_text=_(u"Formula to manage person raw_name. " u"Change this with care. With incorrect formula, the " u"application might be unusable and import of external " u"data can be destructive.")) currency = models.CharField(_(u"Currency"), default=u"€", choices=CURRENCY, max_length=5) class Meta: verbose_name = _(u"Ishtar site profile") verbose_name_plural = _(u"Ishtar site profiles") ordering = ['label'] def __unicode__(self): return unicode(self.label) def has_overload(self, key): return self.config and self.config in ALTERNATE_CONFIGS and \ hasattr(ALTERNATE_CONFIGS[self.config], key) @classmethod def get_current_profile(cls, force=False): cache_key, value = get_cache(cls, ['is-current-profile']) if value and not force: return value q = cls.objects.filter(active=True) if not q.count(): obj = cls.objects.create( label="Default profile", slug='default', active=True) else: obj = q.all()[0] cache.set(cache_key, obj, settings.CACHE_TIMEOUT) return obj @classmethod def get_default_site_label(cls, key=None): return cls.get_current_profile().get_site_label(key) def get_site_label(self, key=None): if not key: return unicode(dict(SITE_LABELS)[self.archaeological_site_label]) return unicode( TRANSLATED_SITE_LABELS[self.archaeological_site_label][key] ) def save(self, *args, **kwargs): raw = False if 'raw' in kwargs: raw = kwargs.pop('raw') super(IshtarSiteProfile, self).save(*args, **kwargs) obj = self if raw: return obj q = self.__class__.objects.filter(active=True).exclude(slug=self.slug) if obj.active and q.count(): for profile in q.all(): profile.active = False profile.save(raw=True) changed = False if not obj.active and not q.count(): obj.active = True changed = True if obj.warehouse and not obj.find: obj.find = True changed = True if obj.find and not obj.context_record: obj.context_record = True changed = True if changed: obj = obj.save(raw=True) return obj def get_current_profile(force=False): return IshtarSiteProfile.get_current_profile(force=force) def cached_site_changed(sender, **kwargs): get_current_profile(force=True) from ishtar_common.menus import Menu MAIN_MENU = Menu(None) MAIN_MENU.init() MAIN_MENU.reinit_menu_for_all_user() post_save.connect(cached_site_changed, sender=IshtarSiteProfile) post_delete.connect(cached_site_changed, sender=IshtarSiteProfile) class CustomForm(models.Model): name = models.CharField(_(u"Name"), max_length=250) form = models.CharField(_(u"Form"), max_length=250) available = models.BooleanField(_(u"Available"), default=True) enabled = models.BooleanField( _(u"Enable this form"), default=True, help_text=_(u"Disable with caution: disabling a form with mandatory " u"fields may lead to database errors.")) apply_to_all = models.BooleanField( _(u"Apply to all"), default=False, help_text=_(u"Apply this form to all users. If set to True, selecting " u"user and user type is useless.")) users = models.ManyToManyField('IshtarUser', blank=True) user_types = models.ManyToManyField('PersonType', blank=True) class Meta: verbose_name = _(u"Custom form") verbose_name_plural = _(u"Custom forms") ordering = ['name', 'form'] def __unicode__(self): return u"{} - {}".format(self.name, self.form) def users_lbl(self): users = [unicode(user) for user in self.users.all()] return " ; ".join(users) users_lbl.short_description = _(u"Users") def user_types_lbl(self): user_types = [unicode(u) for u in self.user_types.all()] return " ; ".join(user_types) user_types_lbl.short_description = _(u"User types") @classmethod def register(cls): if hasattr(cls, '_register') and hasattr(cls, '_register_fields'): return cls._register, cls._register_fields cache_key, value = get_cache(cls.__class__, ['dct-forms'], app_label='ishtar_common') cache_key_fields, value_fields = get_cache( cls.__class__, ['dct-fields'], app_label='ishtar_common') if value and value_fields: cls._register = value cls._register_fields = value_fields return cls._register, cls._register_fields cls._register, cls._register_fields = {}, {} # ideally should be improved but only used in admin from ishtar_common.admin import ISHTAR_FORMS from ishtar_common.forms import CustomForm as CustomFormForm for app_form in ISHTAR_FORMS: app_name = app_form.__package__ if app_name == "archaeological_files_pdl": app_name = "archaeological_files" for form in dir(app_form): if 'Form' not in form and 'Select' not in form: # not very clean... but do not treat inappropriate items continue form = getattr(app_form, form) if not inspect.isclass(form) \ or not issubclass(form, CustomFormForm) \ or not getattr(form, 'form_slug', None): continue model_name = form.form_slug.split('-')[0].replace('_', '') if app_name not in cls._register_fields: cls._register_fields[app_name] = [] if model_name not in cls._register_fields[app_name]: cls._register_fields[app_name].append(model_name) cls._register[form.form_slug] = form return cls._register, cls._register_fields def get_form_class(self): register, register_fields = self.register() if self.form not in self._register: return return register[self.form] def get_available_json_fields(self): register, register_fields = self.register() if self.form not in self._register: return [] current_form = register[self.form] app_name = current_form.__module__.split('.')[0] if app_name == "archaeological_files_pdl": app_name = "archaeological_files" if app_name not in register_fields: return [] res = [] for model_name in register_fields[app_name]: ct = ContentType.objects.get(app_label=app_name, model=model_name) for json_field in JsonDataField.objects.filter( content_type=ct).all(): res.append((json_field.pk, u"{} ({})".format( json_field.name, dict(JSON_VALUE_TYPES)[json_field.value_type]))) return res class ExcludedField(models.Model): custom_form = models.ForeignKey(CustomForm, related_name='excluded_fields') field = models.CharField(_(u"Field"), max_length=250) class Meta: verbose_name = _(u"Excluded field") verbose_name_plural = _(u"Excluded fields") class CustomFormJsonField(models.Model): custom_form = models.ForeignKey(CustomForm, related_name='json_fields') json_field = models.ForeignKey(JsonDataField, related_name='custom_form_details') label = models.CharField(_(u"Label"), max_length=200, blank=True, default='') order = models.IntegerField(verbose_name=_(u"Order"), default=1) help_text = models.TextField(_(u"Help"), blank=True, null=True) class Meta: verbose_name = _(u"Custom form - Json data field") verbose_name_plural = _(u"Custom form - Json data fields") class GlobalVar(models.Model, Cached): slug = models.SlugField(_(u"Variable name"), unique=True) description = models.TextField(_(u"Description of the variable"), null=True, blank=True) value = models.TextField(_(u"Value"), null=True, blank=True) class Meta: verbose_name = _(u"Global variable") verbose_name_plural = _(u"Global variables") ordering = ['slug'] def __unicode__(self): return unicode(self.slug) def cached_globalvar_changed(sender, **kwargs): if not kwargs['instance']: return var = kwargs['instance'] cache_key, value = get_cache(GlobalVar, var.slug) cache.set(cache_key, var.value, settings.CACHE_TIMEOUT) post_save.connect(cached_globalvar_changed, sender=GlobalVar) class UserDashboard: def __init__(self): types = IshtarUser.objects.values('person__person_types', 'person__person_types__label') self.types = types.annotate(number=Count('pk'))\ .order_by('person__person_types') class DashboardFormItem(object): """ Provide methods to manage statistics """ def _get_or_set_stats(self, funcname, update, timeout=settings.CACHE_TIMEOUT): key, val = get_cache(self.__class__, [funcname, self.pk]) if not update and val is not None: return val val = getattr(self, funcname)() cache.set(key, val, timeout) return val @classmethod def get_periods(cls, slice='month', fltr={}, date_source='creation'): date_var = date_source + '_date' q = cls.objects.filter(**{date_var + '__isnull': False}) if fltr: q = q.filter(**fltr) if slice == 'year': return [res[date_var].year for res in list(q.values(date_var) .annotate(Count("id")).order_by())] elif slice == 'month': return [(res[date_var].year, res[date_var].month) for res in list(q.values(date_var) .annotate(Count("id")).order_by())] return [] @classmethod def get_by_year(cls, year, fltr={}, date_source='creation'): date_var = date_source + '_date' q = cls.objects.filter(**{date_var + '__isnull': False}) if fltr: q = q.filter(**fltr) return q.filter( **{date_var + '__year': year}).order_by('pk').distinct('pk') @classmethod def get_by_month(cls, year, month, fltr={}, date_source='creation'): date_var = date_source + '_date' q = cls.objects.filter(**{date_var + '__isnull': False}) if fltr: q = q.filter(**fltr) q = q.filter( **{date_var + '__year': year, date_var + '__month': month}) return q.order_by('pk').distinct('pk') @classmethod def get_total_number(cls, fltr=None): q = cls.objects if fltr: q = q.filter(**fltr) return q.order_by('pk').distinct('pk').count() class Dashboard(object): def __init__(self, model, slice='year', date_source=None, show_detail=None, fltr=None): if not fltr: fltr = {} # don't provide date_source if it is not relevant self.model = model self.total_number = model.get_total_number(fltr) self.show_detail = show_detail history_model = self.model.history.model # last edited - created self.recents, self.lasts = [], [] for last_lst, modif_type in ((self.lasts, '+'), (self.recents, '~')): last_ids = history_model.objects.values('id')\ .annotate(hd=Max('history_date')) last_ids = last_ids.filter(history_type=modif_type) from archaeological_finds.models import Find if self.model == Find: last_ids = last_ids.filter( downstream_treatment_id__isnull=True) if modif_type == '+': last_ids = last_ids.filter( upstream_treatment_id__isnull=True) last_ids = last_ids.order_by('-hd').distinct().all()[:5] for idx in last_ids: try: obj = self.model.objects.get(pk=idx['id']) except: # deleted object are always referenced in history continue obj.history_date = idx['hd'] last_lst.append(obj) # years base_kwargs = {'fltr': fltr.copy()} if date_source: base_kwargs['date_source'] = date_source periods_kwargs = copy.deepcopy(base_kwargs) periods_kwargs['slice'] = slice self.periods = model.get_periods(**periods_kwargs) self.periods = list(set(self.periods)) self.periods.sort() if not self.total_number or not self.periods: return kwargs_num = copy.deepcopy(base_kwargs) self.serie_labels = [_(u"Total")] # numbers if slice == 'year': self.values = [('year', "", list(reversed(self.periods)))] self.numbers = [model.get_by_year(year, **kwargs_num).count() for year in self.periods] self.values += [('number', _(u"Number"), list(reversed(self.numbers)))] if slice == 'month': periods = list(reversed(self.periods)) self.periods = ["%d-%s-01" % (p[0], ('0' + str(p[1])) if len(str(p[1])) == 1 else p[1]) for p in periods] self.values = [('month', "", self.periods)] if show_detail: for dpt, lbl in settings.ISHTAR_DPTS: self.serie_labels.append(unicode(dpt)) idx = 'number_' + unicode(dpt) kwargs_num['fltr']["towns__numero_insee__startswith"] = \ unicode(dpt) numbers = [model.get_by_month(*p.split('-')[:2], **kwargs_num).count() for p in self.periods] self.values += [(idx, dpt, list(numbers))] # put "Total" at the end self.serie_labels.append(self.serie_labels.pop(0)) kwargs_num = base_kwargs.copy() self.numbers = [model.get_by_month(*p.split('-')[:2], **kwargs_num).count() for p in self.periods] self.values += [('number', _(u"Total"), list(self.numbers))] # calculate self.average = self.get_average() self.variance = self.get_variance() self.standard_deviation = self.get_standard_deviation() self.median = self.get_median() self.mode = self.get_mode() # by operation if not hasattr(model, 'get_by_operation'): return operations = model.get_operations() operation_numbers = [model.get_by_operation(op).count() for op in operations] # calculate self.operation_average = self.get_average(operation_numbers) self.operation_variance = self.get_variance(operation_numbers) self.operation_standard_deviation = self.get_standard_deviation( operation_numbers) self.operation_median = self.get_median(operation_numbers) operation_mode_pk = self.get_mode(dict(zip(operations, operation_numbers))) if operation_mode_pk: from archaeological_operations.models import Operation self.operation_mode = unicode(Operation.objects .get(pk=operation_mode_pk)) def get_average(self, vals=[]): if not vals: vals = self.numbers[:] return sum(vals) / len(vals) def get_variance(self, vals=[]): if not vals: vals = self.numbers[:] avrg = self.get_average(vals) return self.get_average([(x - avrg) ** 2 for x in vals]) def get_standard_deviation(self, vals=[]): if not vals: vals = self.numbers[:] return round(self.get_variance(vals) ** 0.5, 3) def get_median(self, vals=[]): if not vals: vals = self.numbers[:] len_vals = len(vals) vals.sort() if (len_vals % 2) == 1: return vals[len_vals / 2] else: return (vals[len_vals / 2 - 1] + vals[len_vals / 2]) / 2.0 def get_mode(self, vals={}): if not vals: vals = dict(zip(self.periods, self.numbers[:])) mx = max(vals.values()) for v in vals: if vals[v] == mx: return v class DocumentTemplate(models.Model): CLASSNAMES = (('archaeological_operations.models.AdministrativeAct', _(u"Administrative Act")),) name = models.CharField(_(u"Name"), max_length=100) slug = models.SlugField(_(u"Slug"), blank=True, null=True, max_length=100, unique=True) template = models.FileField( _(u"Template"), upload_to="templates/%Y/") associated_object_name = models.CharField( _(u"Associated object"), max_length=100, choices=CLASSNAMES) available = models.BooleanField(_(u"Available"), default=True) objects = SlugModelManager() class Meta: verbose_name = _(u"Document template") verbose_name_plural = _(u"Document templates") ordering = ['associated_object_name', 'name'] def __unicode__(self): return self.name def natural_key(self): return (self.slug, ) def save(self, *args, **kwargs): if not self.slug: self.slug = create_slug(DocumentTemplate, self.name) return super(DocumentTemplate, self).save(*args, **kwargs) @classmethod def get_tuples(cls, dct={}, empty_first=True): dct['available'] = True if empty_first: yield ('', '----------') items = cls.objects.filter(**dct) for item in items.distinct().order_by(*cls._meta.ordering).all(): yield (item.pk, _(unicode(item))) def publish(self, c_object): tempdir = tempfile.mkdtemp("-ishtardocs") output_name = tempdir + os.path.sep + \ slugify(self.name.replace(' ', '_').lower()) + u'-' +\ datetime.date.today().strftime('%Y-%m-%d') +\ u"." + self.template.name.split('.')[-1] values = c_object.get_values() engine = SecretaryRenderer() try: result = engine.render(self.template, **values) except TemplateSyntaxError as e: raise TemplateSyntaxError(e.message, e.lineno) output = open(output_name, 'wb') output.write(result) return output_name def convert_from_v1(self): """ Convert the current template from v1 to v2. """ from old.ooo_replace import ooo_replace from archaeological_operations.models import AdministrativeAct old_dir = settings.MEDIA_ROOT + "/templates/v1/" if not os.path.exists(old_dir): os.makedirs(old_dir) shutil.copy(settings.MEDIA_ROOT + self.template.name, old_dir) tempdir = tempfile.mkdtemp("-ishtardocs") output_name = tempdir + os.path.sep + self.template.name.split( os.sep)[-1] objects = [] filters = [ {'operation__isnull': False}, {'associated_file__isnull': False}, {'treatment_file__isnull': False}, {'treatment__isnull': False}, ] for filtr in filters: q = AdministrativeAct.objects.filter(**filtr) if q.count(): objects.append(q.all()[0]) if not objects: return values = {} for obj in objects: values.update(obj.get_values()) for key in values: values[key] = "{{ " + key + " }}" ooo_replace(self.template, output_name, values) shutil.move(output_name, settings.MEDIA_ROOT + self.template.name) return output_name class NumberManager(models.Manager): def get_by_natural_key(self, number): return self.get(number=number) class State(models.Model): label = models.CharField(_(u"Label"), max_length=30) number = models.CharField(_(u"Number"), unique=True, max_length=3) objects = NumberManager() class Meta: verbose_name = _(u"State") ordering = ['number'] def __unicode__(self): return self.label def natural_key(self): return (self.number, ) class Department(models.Model): label = models.CharField(_(u"Label"), max_length=30) number = models.CharField(_(u"Number"), unique=True, max_length=3) state = models.ForeignKey('State', verbose_name=_(u"State"), blank=True, null=True) objects = NumberManager() class Meta: verbose_name = _(u"Department") verbose_name_plural = _(u"Departments") ordering = ['number'] def __unicode__(self): return self.label def natural_key(self): return (self.number, ) class Address(BaseHistorizedItem): address = models.TextField(_(u"Address"), null=True, blank=True) address_complement = models.TextField(_(u"Address complement"), null=True, blank=True) postal_code = models.CharField(_(u"Postal code"), max_length=10, null=True, blank=True) town = models.CharField(_(u"Town"), max_length=70, null=True, blank=True) country = models.CharField(_(u"Country"), max_length=30, null=True, blank=True) alt_address = models.TextField(_(u"Other address: address"), null=True, blank=True) alt_address_complement = models.TextField( _(u"Other address: address complement"), null=True, blank=True) alt_postal_code = models.CharField(_(u"Other address: postal code"), max_length=10, null=True, blank=True) alt_town = models.CharField(_(u"Other address: town"), max_length=70, null=True, blank=True) alt_country = models.CharField(_(u"Other address: country"), max_length=30, null=True, blank=True) phone = models.CharField(_(u"Phone"), max_length=18, null=True, blank=True) phone_desc = models.CharField(_(u"Phone description"), max_length=300, null=True, blank=True) phone2 = models.CharField(_(u"Phone description 2"), max_length=18, null=True, blank=True) phone_desc2 = models.CharField(_(u"Phone description 2"), max_length=300, null=True, blank=True) phone3 = models.CharField(_(u"Phone 3"), max_length=18, null=True, blank=True) phone_desc3 = models.CharField(_(u"Phone description 3"), max_length=300, null=True, blank=True) raw_phone = models.TextField(_(u"Raw phone"), blank=True, null=True) mobile_phone = models.CharField(_(u"Mobile phone"), max_length=18, null=True, blank=True) email = models.EmailField( _(u"Email"), max_length=300, blank=True, null=True) alt_address_is_prefered = models.BooleanField( _(u"Alternative address is prefered"), default=False) history = HistoricalRecords() class Meta: abstract = True def simple_lbl(self): return unicode(self) def full_address(self): lbl = self.simple_lbl() if lbl: lbl += u"\n" lbl += self.address_lbl() return lbl def address_lbl(self): lbl = u'' prefix = '' if self.alt_address_is_prefered: prefix = 'alt_' if getattr(self, prefix + 'address'): lbl += getattr(self, prefix + 'address') if getattr(self, prefix + 'address_complement'): if lbl: lbl += "\n" lbl += getattr(self, prefix + 'address_complement') postal_code = getattr(self, prefix + 'postal_code') town = getattr(self, prefix + 'town') if postal_code or town: if lbl: lbl += "\n" lbl += u"{}{}{}".format( postal_code or '', " " if postal_code and town else '', town or '') if self.phone: if lbl: lbl += u"\n" lbl += u"{} {}".format(unicode(_("Tel: ")), self.phone) if self.mobile_phone: if lbl: lbl += u"\n" lbl += u"{} {}".format(unicode(_("Mobile: ")), self.mobile_phone) if self.email: if lbl: lbl += u"\n" lbl += u"{} {}".format(unicode(_("Email: ")), self.email) return lbl class Merge(models.Model): merge_key = models.TextField(_("Merge key"), blank=True, null=True) merge_candidate = models.ManyToManyField("self", blank=True) merge_exclusion = models.ManyToManyField("self", blank=True) archived = models.NullBooleanField(default=False, blank=True, null=True) # 1 for one word similarity, 2 for two word similarity, etc. MERGE_CLEMENCY = None EMPTY_MERGE_KEY = '--' class Meta: abstract = True def generate_merge_key(self): if self.archived: return self.merge_key = slugify(self.name if self.name else '') if not self.merge_key: self.merge_key = self.EMPTY_MERGE_KEY self.merge_key = self.merge_key def generate_merge_candidate(self): if self.archived: return if not self.merge_key: self.generate_merge_key() self.save(merge_key_generated=True) if not self.pk or self.merge_key == self.EMPTY_MERGE_KEY: return q = self.__class__.objects\ .exclude(pk=self.pk)\ .exclude(merge_exclusion=self)\ .exclude(merge_candidate=self)\ .exclude(archived=True) if not self.MERGE_CLEMENCY: q = q.filter(merge_key=self.merge_key) else: subkeys_front = u"-".join( self.merge_key.split('-')[:self.MERGE_CLEMENCY]) subkeys_back = u"-".join( self.merge_key.split('-')[-self.MERGE_CLEMENCY:]) q = q.filter(Q(merge_key__istartswith=subkeys_front) | Q(merge_key__iendswith=subkeys_back)) for item in q.all(): self.merge_candidate.add(item) def save(self, *args, **kwargs): # prevent circular save merge_key_generated = False if 'merge_key_generated' in kwargs: merge_key_generated = kwargs.pop('merge_key_generated') self.generate_merge_key() item = super(Merge, self).save(*args, **kwargs) if not merge_key_generated: self.generate_merge_candidate() return item def archive(self): self.archived = True self.save() for m in self.merge_candidate.all(): m.delete() for m in self.merge_exclusion.all(): m.delete() def merge(self, item): merge_model_objects(self, item) self.generate_merge_candidate() class OrganizationType(GeneralType): class Meta: verbose_name = _(u"Organization type") verbose_name_plural = _(u"Organization types") ordering = ('label',) post_save.connect(post_save_cache, sender=OrganizationType) post_delete.connect(post_save_cache, sender=OrganizationType) organization_type_pk_lazy = lazy(OrganizationType.get_or_create_pk, unicode) organization_type_pks_lazy = lazy(OrganizationType.get_or_create_pks, unicode) class Organization(Address, Merge, OwnPerms, ValueGetter): TABLE_COLS = ('name', 'organization_type', 'town') SHOW_URL = 'show-organization' # search parameters EXTRA_REQUEST_KEYS = {} BASE_SEARCH_VECTORS = ['name', 'town'] # alternative names of fields for searches ALT_NAMES = { 'name': ( pgettext_lazy("key for text search", u"name"), 'name__iexact' ), 'organization_type': ( pgettext_lazy("key for text search", u"type"), 'organization_type__label__iexact' ), } for v in ALT_NAMES.values(): for language_code, language_lbl in settings.LANGUAGES: activate(language_code) EXTRA_REQUEST_KEYS[unicode(v[0])] = v[1] deactivate() # fields name = models.CharField(_(u"Name"), max_length=500) organization_type = models.ForeignKey(OrganizationType, verbose_name=_(u"Type")) cached_label = models.TextField(_(u"Cached name"), null=True, blank=True, db_index=True) history = HistoricalRecords() class Meta: verbose_name = _(u"Organization") verbose_name_plural = _(u"Organizations") permissions = ( ("view_organization", u"Can view all Organizations"), ("view_own_organization", u"Can view own Organization"), ("add_own_organization", u"Can add own Organization"), ("change_own_organization", u"Can change own Organization"), ("delete_own_organization", u"Can delete own Organization"), ) def simple_lbl(self): if self.name: return self.name return u"{} - {}".format(self.organization_type, self.town or "") def __unicode__(self): if self.cached_label: return self.cached_label self.save() return self.cached_label def _generate_cached_label(self): if self.name: return self.name attrs = ["organization_type", "address", "town"] items = [unicode(getattr(self, attr)) for attr in attrs if getattr(self, attr)] if not items: items = [unicode(_(u"unknown organization"))] return u" - ".join(items) def generate_merge_key(self): self.merge_key = slugify(self.name if self.name else '') if not self.merge_key: self.merge_key = self.EMPTY_MERGE_KEY if self.town: self.merge_key += "-" + slugify(self.town or '') if self.address: self.merge_key += "-" + slugify(self.address or '') @property def associated_filename(self): values = [unicode(getattr(self, attr)) for attr in ('organization_type', 'name') if getattr(self, attr)] return slugify(u"-".join(values)) post_save.connect(cached_label_changed, sender=Organization) class PersonType(GeneralType): class Meta: verbose_name = _(u"Person type") verbose_name_plural = _(u"Person types") ordering = ('label',) post_save.connect(post_save_cache, sender=PersonType) post_delete.connect(post_save_cache, sender=PersonType) person_type_pk_lazy = lazy(PersonType.get_or_create_pk, unicode) person_type_pks_lazy = lazy(PersonType.get_or_create_pks, unicode) class TitleType(GeneralType): class Meta: verbose_name = _(u"Title type") verbose_name_plural = _(u"Title types") ordering = ('label',) post_save.connect(post_save_cache, sender=TitleType) post_delete.connect(post_save_cache, sender=TitleType) class Person(Address, Merge, OwnPerms, ValueGetter): _prefix = 'person_' TYPE = ( ('Mr', _(u'Mr')), ('Ms', _(u'Miss')), ('Mr and Miss', _(u'Mr and Mrs')), ('Md', _(u'Mrs')), ('Dr', _(u'Doctor')), ) TABLE_COLS = ('name', 'surname', 'raw_name', 'email', 'person_types_list', 'attached_to', 'town') TABLE_COLS_ACCOUNT = ('name', 'surname', 'raw_name', 'email', 'profiles_list', 'attached_to', 'town') SHOW_URL = 'show-person' MODIFY_URL = 'person_modify' BASE_SEARCH_VECTORS = ['name', 'surname', 'raw_name', 'town', 'attached_to__name', 'email'] # search parameters REVERSED_BOOL_FIELDS = ['ishtaruser__isnull'] EXTRA_REQUEST_KEYS = { 'ishtaruser__isnull': 'ishtaruser__isnull', 'attached_to': 'attached_to', } COL_LABELS = { 'attached_to': _(u"Organization") } # alternative names of fields for searches ALT_NAMES = { 'name': ( pgettext_lazy("key for text search", u"name"), 'name__iexact' ), 'surname': ( pgettext_lazy("key for text search", u"surname"), 'surname__iexact' ), 'email': ( pgettext_lazy("key for text search", u"email"), 'email__iexact' ), 'person_types': ( pgettext_lazy("key for text search", u"type"), 'person_types__label__iexact' ), 'attached_to': ( pgettext_lazy("key for text search", u"organization"), 'attached_to__cached_label__iexact' ), 'ishtaruser__isnull': ( pgettext_lazy("key for text search", u"has-account"), 'ishtaruser__isnull' ), } for v in ALT_NAMES.values(): for language_code, language_lbl in settings.LANGUAGES: activate(language_code) EXTRA_REQUEST_KEYS[unicode(v[0])] = v[1] deactivate() # fields old_title = models.CharField(_(u"Title"), max_length=100, choices=TYPE, blank=True, null=True) title = models.ForeignKey(TitleType, verbose_name=_(u"Title"), blank=True, null=True) salutation = models.CharField(_(u"Salutation"), max_length=200, blank=True, null=True) surname = models.CharField(_(u"Surname"), max_length=50, blank=True, null=True) name = models.CharField(_(u"Name"), max_length=200, blank=True, null=True) raw_name = models.CharField(_(u"Raw name"), max_length=300, blank=True, null=True) contact_type = models.CharField(_(u"Contact type"), max_length=300, blank=True, null=True) comment = models.TextField(_(u"Comment"), blank=True, null=True) person_types = models.ManyToManyField(PersonType, verbose_name=_(u"Types")) attached_to = models.ForeignKey( 'Organization', related_name='members', on_delete=models.SET_NULL, verbose_name=_(u"Is attached to"), blank=True, null=True) cached_label = models.TextField(_(u"Cached name"), null=True, blank=True, db_index=True) history = HistoricalRecords() class Meta: verbose_name = _(u"Person") verbose_name_plural = _(u"Persons") permissions = ( ("view_person", u"Can view all Persons"), ("view_own_person", u"Can view own Person"), ("add_own_person", u"Can add own Person"), ("change_own_person", u"Can change own Person"), ("delete_own_person", u"Can delete own Person"), ) @property def full_title(self): return u" ".join( [unicode(getattr(self, attr)) for attr in ['title', 'salutation'] if getattr(self, attr)]) @property def current_profile(self): q = self.profiles.filter(current=True) if q.count(): return q.all()[0] q = self.profiles nb = q.count() if not nb: return # arbitrary set the first one as the current profile = q.all()[0] profile.current = True profile.save() return profile def simple_lbl(self): values = [unicode(getattr(self, attr)) for attr in ('surname', 'name') if getattr(self, attr)] if not values and self.raw_name: values = [self.raw_name] return u" ".join(values) def __unicode__(self): if self.cached_label: return self.cached_label self.save() return self.cached_label def _generate_cached_label(self): lbl = get_external_id('person_raw_name', self) if not lbl: return u"-" if self.attached_to: attached_to = unicode(self.attached_to) lbl += u" ({})".format(attached_to) return lbl def fancy_str(self): values = [""] values += [unicode(getattr(self, attr)) for attr in ('surname', 'name') if getattr(self, attr)] if not values and self.raw_name: values += [self.raw_name] values += [""] if self.attached_to: attached_to = unicode(self.attached_to) if values: values.append(u'-') values.append(attached_to) return u" ".join(values) def get_values(self, prefix=''): values = super(Person, self).get_values(prefix=prefix) if not self.attached_to: values.update( Person.get_empty_values(prefix=prefix + 'attached_to_')) return values person_types_list_lbl = _(u"Types") @property def person_types_list(self): return u", ".join([unicode(pt) for pt in self.person_types.all()]) profiles_list_lbl = _(u"Profiles") @property def profiles_list(self): return u", ".join([unicode(p) for p in self.profiles.all()]) def generate_merge_key(self): if self.name and self.name.strip(): self.merge_key = slugify(self.name.strip()) + \ ((u'-' + slugify(self.surname.strip())) if self.surname else u'') elif self.raw_name and self.raw_name.strip(): self.merge_key = slugify(self.raw_name.strip()) elif self.attached_to: self.merge_key = self.attached_to.merge_key else: self.merge_key = self.EMPTY_MERGE_KEY if self.merge_key != self.EMPTY_MERGE_KEY and self.attached_to: self.merge_key += "-" + self.attached_to.merge_key def is_natural(self): return not self.attached_to def has_right(self, right_name, session=None, obj=None): if '.' in right_name: right_name = right_name.split('.')[-1] res, cache_key = "", "" if session: cache_key = 'session-{}-{}'.format(session.session_key, right_name) res = cache.get(cache_key) if res in (True, False): return res # list all cache key in order to clean them on profile change cache_key_list = 'sessionlist-{}'.format(session.session_key) key_list = cache.get(cache_key_list, []) key_list.append(cache_key) cache.set(cache_key_list, key_list, settings.CACHE_TIMEOUT) if type(right_name) in (list, tuple): res = bool(self.profiles.filter( current=True, profile_type__txt_idx__in=right_name).count()) or \ bool(self.profiles.filter( current=True, profile_type__groups__permissions__codename__in=right_name ).count()) or \ bool(self.ishtaruser.user_ptr.groups.filter( permissions__codename__in=right_name ).count()) or\ bool(self.ishtaruser.user_ptr.user_permissions.filter( codename__in=right_name).count()) else: res = bool( self.profiles.filter( current=True, profile_type__txt_idx=right_name).count()) or \ bool(self.profiles.filter( current=True, profile_type__groups__permissions__codename=right_name ).count()) or \ bool(self.ishtaruser.user_ptr.groups.filter( permissions__codename__in=[right_name] ).count()) or \ bool(self.ishtaruser.user_ptr.user_permissions.filter( codename__in=[right_name]).count()) if session: cache.set(cache_key, res, settings.CACHE_TIMEOUT) return res def full_label(self): values = [] if self.title: values = [self.title.label] values += [unicode(getattr(self, attr)) for attr in ('salutation', 'surname', 'name') if getattr(self, attr)] if not values and self.raw_name: values = [self.raw_name] if self.attached_to: values.append(u"- " + unicode(self.attached_to)) return u" ".join(values) @property def associated_filename(self): values = [unicode(getattr(self, attr)) for attr in ('surname', 'name', 'attached_to') if getattr(self, attr)] return slugify(u"-".join(values)) def docs_q(self): return Document.objects.filter(authors__person=self) def operation_docs_q(self): return Document.objects.filter( authors__person=self, operations__pk__isnull=False) def contextrecord_docs_q(self): return Document.objects.filter( authors__person=self, context_records__pk__isnull=False) def find_docs_q(self): return Document.objects.filter( authors__person=self, finds__pk__isnull=False) def save(self, *args, **kwargs): super(Person, self).save(*args, **kwargs) raw_name = get_external_id('person_raw_name', self) if raw_name and self.raw_name != raw_name: self.raw_name = raw_name self.save() if hasattr(self, 'responsible_town_planning_service'): for fle in self.responsible_town_planning_service.all(): fle.save() # force update of raw_town_planning_service if hasattr(self, 'general_contractor'): for fle in self.general_contractor.all(): fle.save() # force update of raw_general_contractor @classmethod def get_query_owns(cls, user): return \ Q(operation_scientist_responsability__collaborators__ishtaruser =user.ishtaruser) | \ Q(operation_scientist_responsability__scientist__ishtaruser =user.ishtaruser) | \ Q(operation_collaborator__collaborators__ishtaruser =user.ishtaruser) | \ Q(operation_collaborator__scientist__ishtaruser =user.ishtaruser) post_save.connect(cached_label_changed, sender=Person) class ProfileType(GeneralType): groups = models.ManyToManyField(Group, verbose_name=_(u"Groups"), blank=True) class Meta: verbose_name = _(u"Profile type") verbose_name_plural = _(u"Profile types") ordering = ('label',) post_save.connect(post_save_cache, sender=ProfileType) post_delete.connect(post_save_cache, sender=ProfileType) class ProfileTypeSummary(ProfileType): class Meta: proxy = True verbose_name = _(u"Profile type summary") verbose_name_plural = _(u"Profile types summary") class UserProfile(models.Model): name = models.CharField(_(u"Name"), blank=True, default=u"", max_length=100) profile_type = models.ForeignKey( ProfileType, verbose_name=_(u"Profile type")) areas = models.ManyToManyField("Area", verbose_name=_(u"Areas"), blank=True, related_name='profiles') current = models.BooleanField(_(u"Current profile"), default=False) show_field_number = models.BooleanField( _(u"Show field number"), default=False) person = models.ForeignKey( Person, verbose_name=_(u"Person"), related_name='profiles') class Meta: verbose_name = _(u"User profile") verbose_name_plural = _(u"User profiles") unique_together = (('name', 'profile_type', 'person'),) def __unicode__(self): lbl = self.name or unicode(self.profile_type) if not self.areas.count(): return lbl return u"{} ({})".format(lbl, u", ".join( [unicode(area) for area in self.areas.all()])) @property def query_towns(self): return Town.objects.filter( Q(areas__profiles=self) | Q(areas__parent__profiles=self) | Q(areas__parent__parent__profiles=self) | Q(areas__parent__parent__parent__profiles=self) | Q(areas__parent__parent__parent__parent__profiles=self) ) @property def area_labels(self): return u", ".join([unicode(area) for area in self.areas.all()]) def duplicate(self, **kwargs): areas = [area for area in self.areas.all()] new_item = self new_item.pk = None name = self.name for key in kwargs: if key == 'name': name = kwargs[key] setattr(new_item, key, kwargs[key]) while UserProfile.objects.filter( name=name, profile_type=self.profile_type, person=self.person).count(): name += unicode(_(u" - duplicate")) new_item.name = name new_item.save() for area in areas: new_item.areas.add(area) return new_item def save(self, force_insert=False, force_update=False, using=None, update_fields=None): super(UserProfile, self).save( force_insert=force_insert, force_update=force_update, using=using, update_fields=update_fields) # only one current profile per user if not self.current: return q = UserProfile.objects.filter( person=self.person, current=True).exclude(pk=self.pk) if not q.count(): return for p in q.all(): p.current = False p.save() def post_save_userprofile(sender, **kwargs): if not kwargs.get('instance'): return instance = kwargs.get('instance') instance.person.ishtaruser.show_field_number(update=True) post_save.connect(post_save_userprofile, sender=UserProfile) class IshtarUser(FullSearch): TABLE_COLS = ('username', 'person__name', 'person__surname', 'person__email', 'person__person_types_list', 'person__attached_to__name') BASE_SEARCH_VECTORS = [ 'user_ptr__username', 'person__name', 'person__surname', 'person__email', 'person__town', 'person__attached_to__name'] # search parameters EXTRA_REQUEST_KEYS = { 'person__person_types_list': 'person__person_types__label' } COL_LABELS = { 'person__attached_to__name': _(u"Organization"), 'username': _(u"Username") } # alternative names of fields for searches ALT_NAMES = { 'username': ( pgettext_lazy("key for text search", u"username"), 'user_ptr__username__iexact' ), 'name': ( pgettext_lazy("key for text search", u"name"), 'person__name__iexact' ), 'surname': ( pgettext_lazy("key for text search", u"surname"), 'person__surname__iexact' ), 'email': ( pgettext_lazy("key for text search", u"email"), 'person__email__iexact' ), 'person_types': ( pgettext_lazy("key for text search", u"type"), 'person__person_types__label__iexact' ), 'attached_to': ( pgettext_lazy("key for text search", u"organization"), 'person__attached_to__cached_label__iexact' ), } for v in ALT_NAMES.values(): for language_code, language_lbl in settings.LANGUAGES: activate(language_code) EXTRA_REQUEST_KEYS[unicode(v[0])] = v[1] deactivate() # fields user_ptr = models.OneToOneField(User, primary_key=True, related_name='ishtaruser') person = models.OneToOneField(Person, verbose_name=_(u"Person"), related_name='ishtaruser') advanced_shortcut_menu = models.BooleanField( _(u"Advanced shortcut menu"), default=False) class Meta: verbose_name = _(u"Ishtar user") verbose_name_plural = _(u"Ishtar users") def __unicode__(self): return unicode(self.person) def show_field_number(self, update=False): cache_key, value = get_cache(self.__class__, ['show_field_number']) if not update and value is not None: return value value = False if self.current_profile: value = self.current_profile.show_field_number cache.set(cache_key, value, settings.CACHE_TIMEOUT) return value @property def current_profile_name(self): q = UserProfile.objects.filter(current=True, person__ishtaruser=self) if q.count(): vals = q.values('profile_type__label', 'name').all()[0] return vals['name'] or vals['profile_type__label'] profile = self.person.current_profile if not profile: return u"" return unicode(profile) @property def current_profile(self): return self.person.current_profile @classmethod def set_superuser(cls, user): q = cls.objects.filter(user_ptr=user) if not q.count(): return ishtaruser = q.all()[0] person = ishtaruser.person admin, created = ProfileType.objects.get_or_create( txt_idx='administrator') if user.is_superuser: if UserProfile.objects.filter( profile_type=admin, person=person).count(): return UserProfile.objects.get_or_create( profile_type=admin, person=person, defaults={'current': True}) else: q = UserProfile.objects.filter( profile_type=admin, person=person) if q.count(): for up in q.all(): up.delete() @classmethod def create_from_user(cls, user): default = user.username surname = user.first_name or default name = user.last_name or default email = user.email person = Person.objects.create(surname=surname, name=name, email=email, history_modifier=user) isht_user = cls.objects.create(user_ptr=user, person=person) return isht_user def has_right(self, right_name, session=None): return self.person.has_right(right_name, session=session) def has_perm(self, perm, model=None, session=None, obj=None): return self.person.has_right(perm, session=session, obj=None) def full_label(self): return self.person.full_label() class Basket(models.Model): """ Abstract class for a basket Subclass must be defined with an "items" ManyToManyField """ IS_BASKET = True label = models.CharField(_(u"Label"), max_length=1000) comment = models.TextField(_(u"Comment"), blank=True, null=True) user = models.ForeignKey(IshtarUser, blank=True, null=True) available = models.BooleanField(_(u"Available"), default=True) class Meta: abstract = True unique_together = (('label', 'user'),) def __unicode__(self): return self.label @property def cached_label(self): return unicode(self) @classmethod def get_short_menu_class(cls, pk): return 'basket' @property def associated_filename(self): return "{}-{}".format(datetime.date.today().strftime( "%Y-%m-%d"), slugify(self.label)) class AuthorType(GeneralType): order = models.IntegerField(_(u"Order"), default=1) class Meta: verbose_name = _(u"Author type") verbose_name_plural = _(u"Author types") ordering = ['order', 'label'] post_save.connect(post_save_cache, sender=AuthorType) post_delete.connect(post_save_cache, sender=AuthorType) class Author(FullSearch): PARENT_SEARCH_VECTORS = ['person'] SLUG = "author" person = models.ForeignKey(Person, verbose_name=_(u"Person"), related_name='author') author_type = models.ForeignKey(AuthorType, verbose_name=_(u"Author type")) cached_label = models.TextField(_(u"Cached name"), null=True, blank=True, db_index=True) class Meta: verbose_name = _(u"Author") verbose_name_plural = _(u"Authors") ordering = ('author_type__order', 'person__name') permissions = ( ("view_author", u"Can view all Authors"), ("view_own_author", u"Can view own Author"), ("add_own_author", u"Can add own Author"), ("change_own_author", u"Can change own Author"), ("delete_own_author", u"Can delete own Author"), ) def __unicode__(self): if self.cached_label: return self.cached_label self.save() return self.cached_label def _generate_cached_label(self): return unicode(self.person) + settings.JOINT + \ unicode(self.author_type) def fancy_str(self): return self.person.fancy_str() + settings.JOINT + \ unicode(self.author_type) def related_sources(self): return list(self.treatmentsource_related.all()) + \ list(self.operationsource_related.all()) + \ list(self.findsource_related.all()) + \ list(self.contextrecordsource_related.all()) post_save.connect(cached_label_changed, sender=Author) class SourceType(HierarchicalType): class Meta: verbose_name = _(u"Source type") verbose_name_plural = _(u"Source types") ordering = ['label'] post_save.connect(post_save_cache, sender=SourceType) post_delete.connect(post_save_cache, sender=SourceType) class SupportType(GeneralType): class Meta: verbose_name = _(u"Support type") verbose_name_plural = _(u"Support types") post_save.connect(post_save_cache, sender=SupportType) post_delete.connect(post_save_cache, sender=SupportType) class Format(GeneralType): class Meta: verbose_name = _(u"Format type") verbose_name_plural = _(u"Format types") ordering = ['label'] post_save.connect(post_save_cache, sender=Format) post_delete.connect(post_save_cache, sender=Format) class LicenseType(GeneralType): url = models.URLField(_(u"URL"), blank=True, null=True) class Meta: verbose_name = _(u"License type") verbose_name_plural = _(u"License types") ordering = ('label',) post_save.connect(post_save_cache, sender=LicenseType) post_delete.connect(post_save_cache, sender=LicenseType) class Document(OwnPerms, ImageModel, FullSearch, Imported): # order is important: put the image in the first match found # other will be symbolic links RELATED_MODELS = [ 'treatment_files', 'treatments', 'finds', 'context_records', 'operations', 'sites', 'warehouses', 'files' ] # same fields but in order for forms RELATED_MODELS_ALT = [ 'finds', 'context_records', 'operations', 'sites', 'files', 'warehouses', 'treatments', 'treatment_files', ] SLUG = 'document' LINK_SPLIT = u"<||>" TABLE_COLS = ['title', 'source_type', 'cache_related_label', 'authors__cached_label', 'associated_url'] COL_LINK = ['associated_url'] BASE_SEARCH_VECTORS = ['title', 'source_type__label', 'external_id', 'reference', 'description', 'comment', 'additional_information'] PARENT_SEARCH_VECTORS = ['authors', ] BOOL_FIELDS = ['duplicate'] COL_LABELS = {"authors__cached_label": _(u"Authors")} CACHED_LABELS = ['cache_related_label'] EXTRA_REQUEST_KEYS = { "operations": "operations__pk", "context_records": "context_records__pk", "context_records__operation": "context_records__operation__pk", "finds": "finds__pk", "finds__base_finds__context_record": "finds__base_finds__context_record__pk", "finds__base_finds__context_record__operation": "finds__base_finds__context_record__operation__pk", 'authors__cached_label': 'authors__cached_label', 'authors__person__pk': 'authors__person__pk', } # alternative names of fields for searches ALT_NAMES = { 'authors': ( pgettext_lazy("key for text search", u"author"), 'authors__cached_label__iexact' ), 'title': ( pgettext_lazy("key for text search", u"title"), 'title__iexact' ), 'source_type': ( pgettext_lazy("key for text search", u"type"), 'source_type__label__iexact' ), 'reference': ( pgettext_lazy("key for text search", u"reference"), 'reference__iexact' ), 'internal_reference': ( pgettext_lazy("key for text search", u"internal-reference"), 'internal_reference__iexact' ), 'description': ( pgettext_lazy("key for text search", u"description"), 'description__iexact' ), 'comment': ( pgettext_lazy("key for text search", u"comment"), 'comment__iexact' ), 'additional_information': ( pgettext_lazy("key for text search", u"additional-information"), 'additional_information__iexact' ), 'duplicate': ( pgettext_lazy("key for text search", u"has-duplicate"), 'duplicate' ), } for v in ALT_NAMES.values(): for language_code, language_lbl in settings.LANGUAGES: activate(language_code) EXTRA_REQUEST_KEYS[unicode(v[0])] = v[1] deactivate() title = models.TextField(_(u"Title"), blank=True, default='') associated_file = models.FileField( upload_to=get_image_path, blank=True, null=True, max_length=255) index = models.IntegerField(verbose_name=_(u"Index"), blank=True, null=True) external_id = models.TextField(_(u"External ID"), null=True, blank=True) reference = models.TextField(_(u"Ref."), null=True, blank=True) internal_reference = models.TextField(_(u"Internal ref."), null=True, blank=True) source_type = models.ForeignKey(SourceType, verbose_name=_(u"Type"), null=True, blank=True) licenses = models.ManyToManyField(LicenseType, verbose_name=_(u"License"), blank=True) support_type = models.ForeignKey(SupportType, verbose_name=_(u"Support"), blank=True, null=True,) format_type = models.ForeignKey(Format, verbose_name=_(u"Format"), blank=True, null=True,) scale = models.CharField(_(u"Scale"), max_length=30, null=True, blank=True) authors = models.ManyToManyField(Author, verbose_name=_(u"Authors"), related_name="documents") authors_raw = models.CharField(verbose_name=_(u"Authors (raw)"), blank=True, null=True, max_length=250) associated_url = models.URLField( blank=True, null=True, max_length=1000, verbose_name=_(u"Numerical ressource (web address)")) receipt_date = models.DateField(blank=True, null=True, verbose_name=_(u"Receipt date")) creation_date = models.DateField(blank=True, null=True, verbose_name=_(u"Creation date")) receipt_date_in_documentation = models.DateField( blank=True, null=True, verbose_name=_(u"Receipt date in documentation")) item_number = models.IntegerField(_(u"Number of items"), default=1) description = models.TextField(_(u"Description"), blank=True, null=True) comment = models.TextField(_(u"Comment"), blank=True, null=True) additional_information = models.TextField(_(u"Additional information"), blank=True, null=True) duplicate = models.NullBooleanField(_(u"Has a duplicate"), blank=True, null=True) associated_links = models.TextField(_(u"Symbolic links"), blank=True, null=True) cache_related_label = models.TextField( _(u"Related"), blank=True, null=True, db_index=True, help_text=_(u"Cached value - do not edit")) class Meta: verbose_name = _(u"Document") verbose_name_plural = _(u"Documents") ordering = ('title',) permissions = ( ("view_document", ugettext(u"Can view all Documents")), ("view_own_document", ugettext(u"Can view own Document")), ("add_own_document", ugettext(u"Can add own Document")), ("change_own_document", ugettext(u"Can change own Document")), ("delete_own_document", ugettext(u"Can delete own Document")), ) def __unicode__(self): return self.title """ @property def code(self): if not self.index: return u"{}-".format(self.operation.code_patriarche or '') return u"{}-{:04d}".format(self.operation.code_patriarche or '', self.index) """ @property def associated_file_name(self): if not self.associated_file: return "" return os.path.basename(self.associated_file.name) @property def images(self): # mimic a queryset pointing to himself return Document.objects.filter( pk=self.pk, image__isnull=False).exclude(image='') @property def has_related(self): for rel in self.RELATED_MODELS: if getattr(self, rel).count(): return True return False @classmethod def get_query_owns(cls, ishtaruser): Operation = cls.operations.rel.related_model ArchaeologicalSite = cls.sites.rel.related_model q = cls._construct_query_own( 'operations__', Operation._get_query_owns_dicts(ishtaruser) ) | cls._construct_query_own( 'sites__', ArchaeologicalSite._get_query_owns_dicts(ishtaruser) ) return q def get_associated_operation(self): raise NotImplementedError() @property def associated_filename(self): values = [unicode(getattr(self, attr)) for attr in ('source_type', 'title') if getattr(self, attr)] return slugify(u"-".join(values)) def _get_base_image_paths(self): if self.pk: # m2m not available if not created... for related_model in self.RELATED_MODELS: q = getattr(self, related_model).all() if q.count(): item = q.all()[0] yield item._get_base_image_path() def _get_base_image_path(self): for path in self._get_base_image_paths(): return path return u"upload" def _get_available_filename(self, path, test_link=None): """ Get a filename not used If name already used - generate a name with schema: [base]-[current_number + 1].[suffix] :param path: base path :param test_link: test if an existing path match with this link :return: if test_link is not None, (new_path, link_match) otherwise the new_path """ file_split = path.split('.') suffix, base = "", "" if len(file_split) > 1: base = u".".join(file_split[0:-1]) suffix = file_split[-1] else: base = path base_split = base.split('-') current_nb = 0 if len(base_split) > 1: try: current_nb = int(base_split[-1]) base = u"-".join(base_split[0:-1]) + u"-" except ValueError: pass while os.path.exists(path): if test_link and os.path.islink(path) \ and os.readlink(path) == test_link: return path, True current_nb += 1 path = u"{}-{}.{}".format(base, current_nb, suffix) if test_link: return path, False return path def _move_image(self): """ Move to the relevant path and create appropriate symbolic links :return: list of associated links """ reference_path = None initial_path = self.image.path filename = os.path.basename(initial_path) links = [] for related_model in self.RELATED_MODELS: q = getattr(self, related_model).all() if q.count(): item = q.all()[0] base_path = item._get_base_image_path() new_path = base_path + u"/" + filename if not reference_path: reference_path = settings.MEDIA_ROOT + new_path # correct path if initial_path == reference_path: continue if not os.path.exists(os.path.dirname(reference_path)): os.makedirs(os.path.dirname(reference_path)) reference_path = self._get_available_filename( reference_path) try: os.rename(initial_path, reference_path) os.rename(self.thumbnail.path, self._get_thumb_name(reference_path)) self.image.name = reference_path[ len(settings.MEDIA_ROOT):] self.save(no_path_change=True) except OSError: # file probably not on harddrive - will be cleaned pass continue # create a link new_path = settings.MEDIA_ROOT + new_path if not os.path.exists(os.path.dirname(new_path)): os.makedirs(os.path.dirname(new_path)) new_path, match = self._get_available_filename( new_path, test_link=reference_path) links.append(new_path) if match: # the current link is correct continue os.symlink(reference_path, new_path) return links def related_label(self): items = [] for rel_attr in reversed(self.RELATED_MODELS): for item in getattr(self, rel_attr).all(): items.append(unicode(item)) return u" ; ".join(items) def _generate_cache_related_label(self): return self.related_label() def save(self, *args, **kwargs): no_path_change = 'no_path_change' in kwargs \ and kwargs.pop('no_path_change') super(Document, self).save(*args, **kwargs) if self.image and not no_path_change and \ not getattr(self, '_no_path_change', False): links = self._move_image() links = self.LINK_SPLIT.join(links) if links != self.associated_links: self.associated_links = links self.save(no_path_change=True) post_save.connect(cached_label_changed, sender=Document) class Arrondissement(models.Model): name = models.CharField(u"Nom", max_length=30) department = models.ForeignKey(Department, verbose_name=u"Département") def __unicode__(self): return settings.JOINT.join((self.name, unicode(self.department))) class Canton(models.Model): name = models.CharField(u"Nom", max_length=30) arrondissement = models.ForeignKey(Arrondissement, verbose_name=u"Arrondissement") def __unicode__(self): return settings.JOINT.join( (self.name, unicode(self.arrondissement))) class TownManager(models.GeoManager): def get_by_natural_key(self, numero_insee, year): return self.get(numero_insee=numero_insee, year=year) class Town(Imported, models.Model): name = models.CharField(_(u"Name"), max_length=100) surface = models.IntegerField(_(u"Surface (m2)"), blank=True, null=True) center = models.PointField(_(u"Localisation"), srid=settings.SRID, blank=True, null=True) limit = models.MultiPolygonField(_(u"Limit"), blank=True, null=True) numero_insee = models.CharField(u"Code commune (numéro INSEE)", max_length=120) departement = models.ForeignKey( Department, verbose_name=_(u"Department"), null=True, blank=True) year = models.IntegerField( _("Year of creation"), null=True, blank=True, help_text=_(u"Filling this field is relevant to distinguish old towns " u"from new towns.")) children = models.ManyToManyField( 'Town', verbose_name=_(u"Town children"), blank=True, related_name='parents') cached_label = models.CharField(_(u"Cached name"), max_length=500, null=True, blank=True, db_index=True) objects = TownManager() class Meta: verbose_name = _(u"Town") verbose_name_plural = _(u"Towns") if settings.COUNTRY == 'fr': ordering = ['numero_insee'] unique_together = (('numero_insee', 'year'),) def natural_key(self): return (self.numero_insee, self.year) def __unicode__(self): if self.cached_label: return self.cached_label self.save() return self.cached_label @property def label_with_areas(self): label = [self.name] for area in self.areas.all(): label.append(u" - ") label.append(area.full_label) return u" ".join(label) def generate_geo(self, force=False): force = self.generate_limit(force=force) self.generate_center(force=force) self.generate_area(force=force) def generate_limit(self, force=False): if not force and self.limit: return parents = None if not self.parents.count(): return for parent in self.parents.all(): if not parent.limit: return if not parents: parents = parent.limit else: parents = parents.union(parent.limit) # if union is a simple polygon make it a multi if 'MULTI' not in parents.wkt: parents = parents.wkt.replace('POLYGON', 'MULTIPOLYGON(') + ")" if not parents: return self.limit = parents self.save() return True def generate_center(self, force=False): if not force and (self.center or not self.limit): return self.center = self.limit.centroid if not self.center: return False self.save() return True def generate_area(self, force=False): if not force and (self.surface or not self.limit): return self.surface = self.limit.transform(settings.SURFACE_SRID, clone=True).area if not self.surface: return False self.save() return True def update_town_code(self): if not self.numero_insee or not self.children.count() or not self.year: return old_num = self.numero_insee[:] numero = old_num.split('-')[0] self.numero_insee = u"{}-{}".format(numero, self.year) if self.numero_insee != old_num: return True def _generate_cached_label(self): cached_label = self.name if settings.COUNTRY == "fr" and self.numero_insee: dpt_len = 2 if self.numero_insee.startswith('97') or \ self.numero_insee.startswith('98') or \ self.numero_insee[0] not in ('0', '1', '2', '3', '4', '5', '6', '7', '8', '9'): dpt_len = 3 cached_label = u"%s - %s" % (self.name, self.numero_insee[:dpt_len]) if self.year and self.children.count(): cached_label += u" ({})".format(self.year) return cached_label def post_save_town(sender, **kwargs): cached_label_changed(sender, **kwargs) town = kwargs['instance'] town.generate_geo() if town.update_town_code(): town.save() post_save.connect(post_save_town, sender=Town) def town_child_changed(sender, **kwargs): town = kwargs['instance'] if town.update_town_code(): town.save() m2m_changed.connect(town_child_changed, sender=Town.children.through) class Area(HierarchicalType): towns = models.ManyToManyField(Town, verbose_name=_(u"Towns"), blank=True, related_name='areas') reference = models.CharField(_(u"Reference"), max_length=200, blank=True, null=True) parent = models.ForeignKey( 'self', blank=True, null=True, verbose_name=_(u"Parent"), help_text=_(u"Only four level of parent are managed."), related_name='children', on_delete=models.SET_NULL ) class Meta: verbose_name = _(u"Area") verbose_name_plural = _(u"Areas") ordering = ('label',) def __unicode__(self): if not self.reference: return self.label return u"{} ({})".format(self.label, self.reference) @property def full_label(self): label = [unicode(self)] if self.parent: label.append(self.parent.full_label) return u" / ".join(label) class OperationType(GeneralType): order = models.IntegerField(_(u"Order"), default=1) preventive = models.BooleanField(_(u"Is preventive"), default=True) judiciary = models.BooleanField(_(u"Is judiciary"), default=False) class Meta: verbose_name = _(u"Operation type") verbose_name_plural = _(u"Operation types") ordering = ['judiciary', '-preventive', 'order', 'label'] @classmethod def get_types(cls, dct=None, instances=False, exclude=None, empty_first=True, default=None, initial=None): dct = dct or {} exclude = exclude or [] initial = initial or [] tuples = [] dct['available'] = True if not instances and empty_first and not default: tuples.append(('', '--')) if default and not instances: try: default = cls.objects.get(txt_idx=default) tuples.append((default.pk, _(unicode(default)))) except cls.DoesNotExist: pass items = cls.objects.filter(**dct) if default and not instances: exclude.append(default.txt_idx) if exclude: items = items.exclude(txt_idx__in=exclude) current_preventive, current_judiciary, current_lst = None, None, None item_list = list(items.order_by(*cls._meta.ordering).all()) new_vals = cls._get_initial_types(initial, [i.pk for i in item_list], instance=True) item_list += new_vals for item in item_list: item.rank = 0 if instances: return item_list for item in item_list: if not current_lst or item.preventive != current_preventive \ or item.judiciary != current_judiciary: if current_lst: tuples.append(current_lst) if item.judiciary: gp_lbl = _(u"Judiciary") elif item.preventive: gp_lbl = _(u"Preventive") else: gp_lbl = _(u"Research") current_lst = [gp_lbl, []] current_preventive = item.preventive current_judiciary = item.judiciary current_lst[1].append((item.pk, _(unicode(item)))) if current_lst: tuples.append(current_lst) return tuples @classmethod def is_preventive(cls, ope_type_id, key=''): try: op_type = cls.objects.get(pk=ope_type_id) except cls.DoesNotExist: return False if not key: return op_type.preventive return key == op_type.txt_idx @classmethod def is_judiciary(cls, ope_type_id): try: op_type = cls.objects.get(pk=ope_type_id) except cls.DoesNotExist: return False return op_type.judiciary post_save.connect(post_save_cache, sender=OperationType) post_delete.connect(post_save_cache, sender=OperationType) class SpatialReferenceSystem(GeneralType): order = models.IntegerField(_(u"Order"), default=10) auth_name = models.CharField( _(u"Authority name"), default=u'EPSG', max_length=256) srid = models.IntegerField(_(u"Authority SRID")) class Meta: verbose_name = _(u"Spatial reference system") verbose_name_plural = _(u"Spatial reference systems") ordering = ('label',) post_save.connect(post_save_cache, sender=SpatialReferenceSystem) post_delete.connect(post_save_cache, sender=SpatialReferenceSystem) class AdministrationScript(models.Model): path = models.CharField(_(u"Filename"), max_length=30) name = models.TextField(_(u"Name"), null=True, blank=True) class Meta: verbose_name = _(u"Administration script") verbose_name_plural = _(u"Administration scripts") ordering = ['name'] def __unicode__(self): return unicode(self.name) SCRIPT_STATE = (("S", _(u"Scheduled")), ("P", _(u"In progress")), ("FE", _(u"Finished with errors")), ("F", _(u"Finished")), ) SCRIPT_STATE_DCT = dict(SCRIPT_STATE) class AdministrationTask(models.Model): script = models.ForeignKey(AdministrationScript) state = models.CharField(_(u"State"), max_length=2, choices=SCRIPT_STATE, default='S') creation_date = models.DateTimeField(default=datetime.datetime.now) launch_date = models.DateTimeField(null=True, blank=True) finished_date = models.DateTimeField(null=True, blank=True) result = models.TextField(_(u"Result"), null=True, blank=True) class Meta: verbose_name = _(u"Administration task") verbose_name_plural = _(u"Administration tasks") ordering = ['script'] def __unicode__(self): state = _(u"Unknown") if self.state in SCRIPT_STATE_DCT: state = unicode(SCRIPT_STATE_DCT[self.state]) return u"{} - {} - {}".format(self.script, self.creation_date, state) def execute(self): if self.state != 'S': return self.launch_date = datetime.datetime.now() script_dir = settings.ISHTAR_SCRIPT_DIR if not script_dir: self.result = unicode( _(u"ISHTAR_SCRIPT_DIR is not set in your " u"local_settings. Contact your administrator.")) self.state = 'FE' self.finished_date = datetime.datetime.now() self.save() return if '..' in script_dir: self.result = unicode( _(u"Your ISHTAR_SCRIPT_DIR is containing " u"dots \"..\". As it can refer to relative " u"paths, it can be a security issue and this is " u"not allowed. Only put a full path.")) self.state = 'FE' self.finished_date = datetime.datetime.now() self.save() return if not os.path.isdir(script_dir): self.result = unicode( _(u"Your ISHTAR_SCRIPT_DIR: \"{}\" is not a valid directory.") ).format(script_dir) self.state = 'FE' self.finished_date = datetime.datetime.now() self.save() return script_name = None # only script inside the script directory can be executed for name in os.listdir(script_dir): if name == self.script.path: if os.path.isfile(os.path.join(script_dir, name)): script_name = os.path.join(script_dir, name) break if not script_name: self.result = unicode( _(u"Script \"{}\" is not available in your script directory. " u"Check your configuration.") ).format(self.script.path) self.state = 'FE' self.finished_date = datetime.datetime.now() self.save() return self.state = 'P' self.save() self.finished_date = datetime.datetime.now() try: session = Popen([script_name], stdout=PIPE, stderr=PIPE) stdout, stderr = session.communicate() except OSError as e: self.state = 'FE' self.result = u"Error executing \"{}\" script: {}".format( self.script.path, e) self.save() return self.finished_date = datetime.datetime.now() if stderr: self.state = 'FE' self.result = u"Error: {}".format(stderr.decode('utf-8')) else: self.state = 'F' self.result = u"{}".format(stdout.decode('utf-8')) self.save()