#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2010-2017 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. """ Models description """ import copy from collections import OrderedDict import datetime import inspect from importlib import import_module from jinja2 import TemplateSyntaxError, UndefinedError import json import logging import os import pyqrcode import re import shutil import string import tempfile import time from io import BytesIO from subprocess import Popen, PIPE from PIL import Image from ooopy.OOoPy import OOoPy from ooopy.Transformer import Transformer as OOTransformer import ooopy.Transforms as OOTransforms import uuid import zipfile from django import forms from django.apps import apps from django.conf import settings from django.contrib.auth.models import User, Group from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.contenttypes.models import ContentType from django.contrib.gis.db import models from django.contrib.postgres.fields import JSONField from django.contrib.postgres.search import SearchVectorField, SearchVector from django.contrib.postgres.indexes import GinIndex from django.contrib.sites.models import Site from django.core.cache import cache from django.core.exceptions import ObjectDoesNotExist, ValidationError, \ MultipleObjectsReturned from django.core.files.uploadedfile import SimpleUploadedFile from django.core.files import File from django.core.serializers import serialize from django.core.urlresolvers import reverse, NoReverseMatch from django.core.validators import validate_slug from django.db import connection from django.db.models import Q, Max, Count, F from django.db.models.signals import post_save, post_delete, m2m_changed from django.db.utils import DatabaseError from django.template.defaultfilters import slugify from django.utils.functional import lazy from django.utils.safestring import SafeText, mark_safe from django.utils.translation import activate, deactivate from ishtar_common.utils import ugettext_lazy as _, ugettext, \ pgettext_lazy from ishtar_common.utils_secretary import IshtarSecretaryRenderer from simple_history.models import HistoricalRecords as BaseHistoricalRecords from simple_history.signals import post_create_historical_record, \ pre_create_historical_record from unidecode import unidecode from ishtar_common.alternative_configs import ALTERNATE_CONFIGS, \ ALTERNATE_CONFIGS_CHOICES from ishtar_common.data_importer import pre_importer_action from ishtar_common.model_managers import SlugModelManager, ExternalIdManager, \ TypeManager, UUIDModelManager from ishtar_common.model_merging import merge_model_objects from ishtar_common.models_imports import ImporterModel, ImporterType, \ ImporterDefault, ImporterDefaultValues, ImporterColumn, \ ImporterDuplicateField, Regexp, ImportTarget, TargetKey, FormaterType, \ Import, TargetKeyGroup, ValueFormater from ishtar_common.templatetags.link_to_window import simple_link_to_window from ishtar_common.utils import get_cache, disable_for_loaddata, create_slug, \ get_all_field_names, merge_tsvectors, cached_label_changed, post_save_geo, \ generate_relation_graph, max_size_help, task __all__ = [ 'ImporterModel', 'ImporterType', 'ImporterDefault', 'ImporterDefaultValues', 'ImporterColumn', 'ImporterDuplicateField', 'Regexp', 'ImportTarget', 'TargetKey', 'FormaterType', 'Import', 'TargetKeyGroup', 'ValueFormater', 'Organization', 'Person', 'valid_id', 'Town', 'SpatialReferenceSystem', 'OrganizationType', 'Document', 'GeneralType', 'get_external_id', 'LightHistorizedItem', 'OwnPerms', 'Address', 'post_save_cache', 'DashboardFormItem', 'ShortMenuItem', 'document_attached_changed', 'SearchAltName', 'DynamicRequest', 'GeoItem', 'QRCodeItem', 'SearchVectorConfig', 'DocumentItem' ] logger = logging.getLogger(__name__) def post_save_user(sender, **kwargs): user = kwargs['instance'] if kwargs["created"]: try: IshtarUser.create_from_user(user) except DatabaseError: # manage when db is not synced pass IshtarUser.set_superuser(user) post_save.connect(post_save_user, sender=User) class ValueGetter(object): _prefix = "" COL_LABELS = {} GET_VALUES_EXTRA = [] GET_VALUES_EXCLUDE_FIELDS = [ 'search_vector', 'id', 'multi_polygon', 'point_2d', 'point', 'history_m2m'] GET_VALUES_ = [ 'preservation_to_considers', 'alterations', 'alteration_causes'] GET_VALUES_EXTRA_TYPES = [ 'preservation_to_considers', 'alterations', 'alteration_causes'] def _get_values_documents(self, prefix="", filtr=None): values = {} if not hasattr(self, 'documents'): return values if not filtr or prefix + "documents" in filtr: values[prefix + "documents"] = [ doc.get_values(no_values=True) for doc in self.documents.all() ] if filtr and prefix + "main_image" not in filtr: return values if hasattr(self, "main_image") and self.main_image and hasattr( self.main_image, "get_values"): values[prefix + "main_image"] = self.main_image.get_values( no_values=True) return values def _get_values_update_sub_filter(self, filtr, prefix): if not filtr: return new_filtr = [] for k in filtr: if k.startswith(prefix): new_filtr.append(k[len(prefix):]) return new_filtr def get_values(self, prefix='', no_values=False, filtr=None, **kwargs): if not prefix: prefix = self._prefix values = {} if hasattr(self, "qrcode") and (not filtr or 'qrcode_path' in filtr): values['qrcode_path'] = self.qrcode_path for field_name in get_all_field_names(self): try: value = getattr(self, field_name) except (AttributeError, MultipleObjectsReturned): continue if field_name in self.GET_VALUES_EXCLUDE_FIELDS: continue if filtr and prefix + field_name not in filtr: if hasattr(value, 'get_values'): new_prefix = prefix + field_name + '_' new_filtr = self._get_values_update_sub_filter(filtr, new_prefix) values.update( value.get_values(new_prefix, filtr=new_filtr, **kwargs)) continue if hasattr(self, "get_values_for_" + field_name): values[prefix + field_name] = getattr( self, "get_values_for_" + field_name)() else: if hasattr(value, 'get_values'): new_prefix = prefix + field_name + '_' new_filtr = self._get_values_update_sub_filter(filtr, new_prefix) values.update( value.get_values(new_prefix, filtr=new_filtr, **kwargs)) else: values[prefix + field_name] = value values.update(self._get_values_documents(prefix=prefix, filtr=filtr)) for extra_field in self.GET_VALUES_EXTRA: values[prefix + extra_field] = getattr(self, extra_field) or '' for key in values.keys(): val = values[key] if val is None: val = '' elif (key in self.GET_VALUES_EXTRA_TYPES or "type" in key) and ( val.__class__.__name__.split('.')[0] == 'ManyRelatedManager'): val = u" ; ".join([str(v) for v in val.all()]) elif isinstance(val, (tuple, list, dict)): pass else: val = str(val) if val.endswith('.None'): val = '' values[key] = val if (prefix and prefix != self._prefix) or no_values: # do not provide KEYS and VALUES for sub-items return values value_list = [] for key in values.keys(): if key in ('KEYS', 'VALUES'): continue value_list.append((key, str(values[key]))) for global_var in GlobalVar.objects.all(): values[global_var.slug] = global_var.value or "" values['VALUES'] = json.dumps( values, indent=4, sort_keys=True, skipkeys=True, ensure_ascii=False, separators=("", " : "), ).replace(" " * 4, "\t") return values @classmethod def get_empty_values(cls, prefix=''): if not prefix: prefix = cls._prefix values = {} for field_name in get_all_field_names(cls): values[prefix + field_name] = '' return values class HistoryModel(models.Model): class Meta: abstract = True def m2m_listing(self, key, create=False): if not self.history_m2m or key not in self.history_m2m: return models = self.__class__.__module__ if not models.endswith('.models'): models += ".models" models = import_module(models) model = getattr( models, self.__class__.__name__[len('Historical'):]) related_model = getattr(model, key).rel.model return related_model.history_decompress(self.history_m2m[key], create=create) class HistoricalRecords(BaseHistoricalRecords): def _save_historic(self, manager, instance, history_date, history_type, history_user, history_change_reason, using, attrs): history_instance = manager.model( history_date=history_date, history_type=history_type, history_user=history_user, history_change_reason=history_change_reason, **attrs ) pre_create_historical_record.send( sender=manager.model, instance=instance, history_date=history_date, history_user=history_user, history_change_reason=history_change_reason, history_instance=history_instance, using=using, ) history_instance.save(using=using) post_create_historical_record.send( sender=manager.model, instance=instance, history_instance=history_instance, history_date=history_date, history_user=history_user, history_change_reason=history_change_reason, using=using, ) def create_historical_record(self, instance, history_type, using=None): try: history_modifier = getattr(instance, 'history_modifier', None) assert history_modifier except (User.DoesNotExist, AssertionError): # on batch removing of users, user could have disappeared return history_date = getattr(instance, "_history_date", datetime.datetime.now()) history_change_reason = getattr(instance, "changeReason", None) force = getattr(instance, "_force_history", False) manager = getattr(instance, self.manager_name) attrs = {} for field in instance._meta.fields: attrs[field.attname] = getattr(instance, field.attname) q_history = instance.history \ .filter(history_modifier_id=history_modifier.pk) \ .order_by('-history_date', '-history_id') # instance.skip_history_when_saving = True if not q_history.count(): if force: delattr(instance, '_force_history') self._save_historic( manager, instance, history_date, history_type, history_modifier, history_change_reason, using, attrs) return old_instance = q_history.all()[0] # multiple saving by the same user in a very short time are generaly # caused by post_save signals it is not relevant to keep them min_history_date = datetime.datetime.now() \ - datetime.timedelta(seconds=5) q = q_history.filter(history_date__isnull=False, history_date__gt=min_history_date) \ .order_by('-history_date', '-history_id') if not force and q.count(): return if force: delattr(instance, '_force_history') # record a new version only if data have been changed for field in instance._meta.fields: if getattr(old_instance, field.attname) != attrs[field.attname]: self._save_historic(manager, instance, history_date, history_type, history_modifier, history_change_reason, using, attrs) return def valid_id(cls): # valid ID validator for models def func(value): try: cls.objects.get(pk=value) except ObjectDoesNotExist: raise ValidationError(_("Not a valid item.")) return func def valid_ids(cls): def func(value): if "," in value: value = value.split(",") if type(value) not in (list, tuple): value = [value] for v in value: try: cls.objects.get(pk=v) except ObjectDoesNotExist: raise ValidationError( _("A selected item is not a valid item.")) return func def is_unique(cls, field): # unique validator for models def func(value): query = {field: value} try: assert cls.objects.filter(**query).count() == 0 except AssertionError: raise ValidationError(_("This item already exists.")) return func class OwnPerms(object): """ Manage special permissions for object's owner """ @classmethod def get_query_owns(cls, ishtaruser): """ Query object to get own items """ return None # implement for each object def can_view(self, request): if hasattr(self, "LONG_SLUG"): perm = "view_" + self.LONG_SLUG else: perm = "view_" + self.SLUG return self.can_do(request, perm) def can_do(self, request, action_name): """ Check permission availability for the current object. :param request: request object :param action_name: action name eg: "change_find" - "own" variation is checked :return: boolean """ if not getattr(request.user, 'ishtaruser', None): return False splited = action_name.split('_') action_own_name = splited[0] + '_own_' + '_'.join(splited[1:]) user = request.user if action_own_name == "view_own_findbasket": action_own_name = "view_own_find" return user.ishtaruser.has_right(action_name, request.session) or \ (user.ishtaruser.has_right(action_own_name, request.session) and self.is_own(user.ishtaruser)) def is_own(self, user, alt_query_own=None): """ Check if the current object is owned by the user """ if isinstance(user, IshtarUser): ishtaruser = user elif hasattr(user, 'ishtaruser'): ishtaruser = user.ishtaruser else: return False if not alt_query_own: query = self.get_query_owns(ishtaruser) else: query = getattr(self, alt_query_own)(ishtaruser) if not query: return False query &= Q(pk=self.pk) return self.__class__.objects.filter(query).count() @classmethod def has_item_of(cls, user): """ Check if the user own some items """ if isinstance(user, IshtarUser): ishtaruser = user elif hasattr(user, 'ishtaruser'): ishtaruser = user.ishtaruser else: return False query = cls.get_query_owns(ishtaruser) if not query: return False return cls.objects.filter(query).count() @classmethod def _return_get_owns(cls, owns, values, get_short_menu_class, label_key='cached_label'): if not owns: return [] sorted_values = [] if hasattr(cls, 'BASKET_MODEL'): owns_len = len(owns) for idx, item in enumerate(reversed(owns)): if get_short_menu_class: item = item[0] if type(item) == cls.BASKET_MODEL: basket = owns.pop(owns_len - idx - 1) sorted_values.append(basket) sorted_values = list(reversed(sorted_values)) if not values: if not get_short_menu_class: return sorted_values + list( sorted(owns, key=lambda x: getattr(x, label_key) or "")) return sorted_values + list( sorted(owns, key=lambda x: getattr(x[0], label_key) or "")) if not get_short_menu_class: return sorted_values + list( sorted(owns, key=lambda x: x[label_key] or "")) return sorted_values + list( sorted(owns, key=lambda x: x[0][label_key] or "")) @classmethod def get_owns(cls, user, replace_query=None, limit=None, values=None, get_short_menu_class=False, menu_filtr=None): """ Get Own items """ if not replace_query: replace_query = {} if hasattr(user, 'is_authenticated') and not user.is_authenticated(): returned = cls.objects.filter(pk__isnull=True) if values: returned = [] return returned if isinstance(user, User): try: ishtaruser = IshtarUser.objects.get(user_ptr=user) except IshtarUser.DoesNotExist: returned = cls.objects.filter(pk__isnull=True) if values: returned = [] return returned elif isinstance(user, IshtarUser): ishtaruser = user else: if values: return [] return cls.objects.filter(pk__isnull=True) items = [] if hasattr(cls, 'BASKET_MODEL'): items = list(cls.BASKET_MODEL.objects.filter(user=ishtaruser).all()) query = cls.get_query_owns(ishtaruser) if not query and not replace_query: returned = cls.objects.filter(pk__isnull=True) if values: returned = [] return returned if query: q = cls.objects.filter(query) else: # replace_query q = cls.objects.filter(replace_query) if values: q = q.values(*values) if limit: items += list(q.order_by('-pk')[:limit]) else: items += list(q.order_by(*cls._meta.ordering).all()) if get_short_menu_class: if values: if 'id' not in values: raise NotImplementedError( "Call of get_owns with get_short_menu_class option and" " no 'id' in values is not implemented") my_items = [] for i in items: if hasattr(cls, 'BASKET_MODEL') and \ type(i) == cls.BASKET_MODEL: dct = dict([(k, getattr(i, k)) for k in values]) my_items.append( (dct, cls.BASKET_MODEL.get_short_menu_class(i.pk))) else: my_items.append((i, cls.get_short_menu_class(i['id']))) items = my_items else: items = [(i, cls.get_short_menu_class(i.pk)) for i in items] return items @classmethod def _get_query_owns_dicts(cls, ishtaruser): """ List of query own dict to construct the query. Each dict are join with an AND operator, each dict key, values are joined with OR operator """ return [] @classmethod def _construct_query_own(cls, prefix, dct_list): q = None for subquery_dict in dct_list: subquery = None for k in subquery_dict: subsubquery = Q(**{prefix + k: subquery_dict[k]}) if subquery: subquery |= subsubquery else: subquery = subsubquery if not subquery: continue if q: q &= subquery else: q = subquery return q class CachedGen(object): @classmethod def refresh_cache(cls): raise NotImplementedError() @classmethod def _add_cache_key_to_refresh(cls, keys): cache_ckey, current_keys = get_cache(cls, ['_current_keys']) if type(current_keys) != list: current_keys = [] if keys not in current_keys: current_keys.append(keys) cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT) class Cached(CachedGen): slug_field = 'txt_idx' @classmethod def refresh_cache(cls): cache_ckey, current_keys = get_cache(cls, ['_current_keys']) if not current_keys: return for keys in current_keys: if len(keys) == 2 and keys[0] == '__slug': cls.get_cache(keys[1], force=True) elif keys[0] == '__get_types': default = None empty_first = True exclude = [] if len(keys) >= 2: default = keys.pop() if len(keys) > 1: empty_first = bool(keys.pop()) exclude = keys[1:] cls.get_types( exclude=exclude, empty_first=empty_first, default=default, force=True) elif keys[0] == '__get_help': cls.get_help(force=True) @classmethod def _add_cache_key_to_refresh(cls, keys): cache_ckey, current_keys = get_cache(cls, ['_current_keys']) if type(current_keys) != list: current_keys = [] if keys not in current_keys: current_keys.append(keys) cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT) @classmethod def get_cache(cls, slug, force=False): cache_key, value = get_cache(cls, ['__slug', slug]) if not force and value: return value try: k = {cls.slug_field: slug} obj = cls.objects.get(**k) cache.set(cache_key, obj, settings.CACHE_TIMEOUT) return obj except cls.DoesNotExist: cache.set(cache_key, None, settings.CACHE_TIMEOUT) return None @disable_for_loaddata def post_save_cache(sender, **kwargs): sender.refresh_cache() class GeneralType(Cached, models.Model): """ Abstract class for "types" """ label = models.TextField(_("Label")) txt_idx = models.TextField( _("Textual ID"), validators=[validate_slug], unique=True, help_text=_( "The slug is the standardized version of the name. It contains " "only lowercase letters, numbers and hyphens. Each slug must " "be unique.")) comment = models.TextField(_("Comment"), blank=True, null=True) available = models.BooleanField(_("Available"), default=True) HELP_TEXT = "" objects = TypeManager() class Meta: abstract = True def __str__(self): return self.label def natural_key(self): return (self.txt_idx,) def history_compress(self): return self.txt_idx @classmethod def history_decompress(cls, value, create=False): if not value: return [] res = [] for txt_idx in value: try: res.append(cls.objects.get(txt_idx=txt_idx)) except cls.DoesNotExist: continue return res @property def explicit_label(self): return "{} ({})".format(self.label, self._meta.verbose_name) @classmethod def create_default_for_test(cls): return [cls.objects.create(label='Test %d' % i) for i in range(5)] @property def short_label(self): return self.label @property def name(self): return self.label @classmethod def get_or_create(cls, slug, label=''): """ Get or create a new item. :param slug: textual id :param label: label for initialization if the item doesn't exist (not mandatory) :return: instancied item of the base class """ item = cls.get_cache(slug) if item: return item item, created = cls.objects.get_or_create( txt_idx=slug, defaults={'label': label}) return item @classmethod def get_or_create_pk(cls, slug): """ Get an id from a slug. Create the associated item if needed. :param slug: textual id :return: id of the item (string) """ return str(cls.get_or_create(slug).pk) @classmethod def get_or_create_pks(cls, slugs): """ Get and merge a list of ids from a slug list. Create the associated items if needed. :param slugs: textual ids :return: string with ids separated by "_" """ items = [] for slug in slugs: items.append(str(cls.get_or_create(slug).pk)) return "_".join(items) @classmethod def get_help(cls, dct=None, exclude=None, force=False, full_hierarchy=None): if not dct: dct = {} if not exclude: exclude = [] keys = ['__get_help'] keys += ["{}".format(ex) for ex in exclude] keys += ['{}-{}'.format(str(k), dct[k]) for k in dct] cache_key, value = get_cache(cls, keys) if value and not force: return mark_safe(value) help_text = cls.HELP_TEXT c_rank = -1 help_items = "\n" for item in cls.get_types(dct=dct, instances=True, exclude=exclude): if hasattr(item, '__iter__'): pk = item[0] item = cls.objects.get(pk=pk) item.rank = c_rank + 1 if hasattr(item, 'parent'): c_item = item parents = [] while c_item.parent: parents.append(c_item.parent.label) c_item = c_item.parent parents.reverse() parents.append(item.label) item.label = " / ".join(parents) if not item.comment: continue if c_rank > item.rank: help_items += "\n" elif c_rank < item.rank: help_items += "
\n" c_rank = item.rank help_items += "
%s
%s
" % ( item.label, "
".join(item.comment.split('\n'))) c_rank += 1 if c_rank: help_items += c_rank * "
" if help_text or help_items != u'\n': help_text = help_text + help_items else: help_text = "" cache.set(cache_key, help_text, settings.CACHE_TIMEOUT) return mark_safe(help_text) @classmethod def _get_initial_types(cls, initial, type_pks, instance=False): new_vals = [] if not initial: return [] if type(initial) not in (list, tuple): initial = [initial] for value in initial: try: pk = int(value) except (ValueError, TypeError): continue if pk in type_pks: continue try: extra_type = cls.objects.get(pk=pk) if instance: new_vals.append(extra_type) else: new_vals.append((extra_type.pk, str(extra_type))) except cls.DoesNotExist: continue return new_vals @classmethod def get_types(cls, dct=None, instances=False, exclude=None, empty_first=True, default=None, initial=None, force=False, full_hierarchy=False): if not dct: dct = {} if not exclude: exclude = [] types = [] if not instances and empty_first and not default: types = [('', '--')] types += cls._pre_get_types(dct, instances, exclude, default, force, get_full_hierarchy=full_hierarchy) if not initial: return types new_vals = cls._get_initial_types(initial, [idx for idx, lbl in types]) types += new_vals return types @classmethod def _pre_get_types(cls, dct=None, instances=False, exclude=None, default=None, force=False, get_full_hierarchy=False): if not dct: dct = {} if not exclude: exclude = [] # cache cache_key = None if not instances: keys = ['__get_types'] keys += ["{}".format(ex) for ex in exclude] + \ ["{}".format(default)] keys += ['{}-{}'.format(str(k), dct[k]) for k in dct] cache_key, value = get_cache(cls, keys) if value and not force: return value base_dct = dct.copy() if hasattr(cls, 'parent'): if not cache_key: return cls._get_parent_types( base_dct, instances, exclude=exclude, default=default, get_full_hierarchy=get_full_hierarchy) vals = [v for v in cls._get_parent_types( base_dct, instances, exclude=exclude, default=default, get_full_hierarchy=get_full_hierarchy)] cache.set(cache_key, vals, settings.CACHE_TIMEOUT) return vals if not cache_key: return cls._get_types(base_dct, instances, exclude=exclude, default=default) vals = [ v for v in cls._get_types(base_dct, instances, exclude=exclude, default=default) ] cache.set(cache_key, vals, settings.CACHE_TIMEOUT) return vals @classmethod def _get_types(cls, dct=None, instances=False, exclude=None, default=None): if not dct: dct = {} if not exclude: exclude = [] dct['available'] = True if default: try: default = cls.objects.get(txt_idx=default) yield (default.pk, _(str(default))) except cls.DoesNotExist: pass items = cls.objects.filter(**dct) if default and default != "None": if hasattr(default, 'txt_idx'): exclude.append(default.txt_idx) else: exclude.append(default) if exclude: items = items.exclude(txt_idx__in=exclude) for item in items.order_by(*cls._meta.ordering).all(): if instances: item.rank = 0 yield item else: yield (item.pk, _(str(item)) if item and str(item) else '') @classmethod def _get_childs_list(cls, dct=None, exclude=None, instances=False): if not dct: dct = {} if not exclude: exclude = [] if 'parent' in dct: dct.pop('parent') childs = cls.objects.filter(**dct) if exclude: childs = childs.exclude(txt_idx__in=exclude) if hasattr(cls, 'order'): childs = childs.order_by('order') res = {} if instances: for item in childs.all(): parent_id = item.parent_id or 0 if parent_id not in res: res[parent_id] = [] res[parent_id].append(item) else: for item in childs.values("id", "parent_id", "label").all(): parent_id = item["parent_id"] or 0 if item["id"] == item["parent_id"]: parent_id = 0 if parent_id not in res: res[parent_id] = [] res[parent_id].append((item["id"], item["label"])) return res PREFIX = "│ " PREFIX_EMPTY = "  " PREFIX_MEDIUM = "├ " PREFIX_LAST = "└ " PREFIX_CODES = ["\u2502", "\u251C", "\u2514"] @classmethod def _get_childs(cls, item, child_list, prefix=0, instances=False, is_last=False, last_of=None, get_full_hierarchy=False): if not last_of: last_of = [] prefix += 1 current_child_lst = [] if item in child_list: current_child_lst = child_list[item] lst = [] total = len(current_child_lst) full_hierarchy_initial = get_full_hierarchy for idx, child in enumerate(current_child_lst): mylast_of = last_of[:] p = '' if instances: child.rank = prefix lst.append(child) else: if full_hierarchy_initial: if isinstance(full_hierarchy_initial, str): p = full_hierarchy_initial + " > " else: p = "" else: cprefix = prefix while cprefix: cprefix -= 1 if not cprefix: if (idx + 1) == total: p += cls.PREFIX_LAST else: p += cls.PREFIX_MEDIUM elif is_last: if mylast_of: clast = mylast_of.pop(0) if clast: p += cls.PREFIX_EMPTY else: p += cls.PREFIX else: p += cls.PREFIX_EMPTY else: p += cls.PREFIX lst.append(( child[0], SafeText(p + str(_(child[1]))) )) clast_of = last_of[:] clast_of.append(idx + 1 == total) if instances: child_id = child.id else: child_id = child[0] if get_full_hierarchy: if p: if not p.endswith(" > "): p += " > " get_full_hierarchy = p + child[1] else: get_full_hierarchy = child[1] for sub_child in cls._get_childs( child_id, child_list, prefix, instances, is_last=((idx + 1) == total), last_of=clast_of, get_full_hierarchy=get_full_hierarchy): lst.append(sub_child) return lst @classmethod def _get_parent_types(cls, dct=None, instances=False, exclude=None, default=None, get_full_hierarchy=False): if not dct: dct = {} if not exclude: exclude = [] dct['available'] = True child_list = cls._get_childs_list(dct, exclude, instances) if 0 in child_list: for item in child_list[0]: if instances: item.rank = 0 item_id = item.pk yield item else: item_id = item[0] yield item if get_full_hierarchy: get_full_hierarchy = item[1] for child in cls._get_childs( item_id, child_list, instances=instances, get_full_hierarchy=get_full_hierarchy): yield child def save(self, *args, **kwargs): if not self.id and not self.label: self.label = " ".join(" ".join(self.txt_idx.split('-')) .split('_')).title() if not self.txt_idx: self.txt_idx = slugify(self.label)[:100] # clean old keys if self.pk: old = self.__class__.objects.get(pk=self.pk) content_type = ContentType.objects.get_for_model(self.__class__) if slugify(self.label) != slugify(old.label): ItemKey.objects.filter( object_id=self.pk, key=slugify(old.label), content_type=content_type).delete() if self.txt_idx != old.txt_idx: ItemKey.objects.filter( object_id=self.pk, key=old.txt_idx, content_type=content_type).delete() obj = super(GeneralType, self).save(*args, **kwargs) self.generate_key(force=True) return obj def add_key(self, key, force=False, importer=None, group=None, user=None): content_type = ContentType.objects.get_for_model(self.__class__) if not importer and not force and ItemKey.objects.filter( key=key, content_type=content_type).count(): return filtr = {'key': key, 'content_type': content_type} if group: filtr['group'] = group elif user: filtr['user'] = user else: filtr['importer'] = importer if force: ItemKey.objects.filter(**filtr).exclude(object_id=self.pk).delete() filtr['object_id'] = self.pk ItemKey.objects.get_or_create(**filtr) def generate_key(self, force=False): for key in (slugify(self.label), self.txt_idx): self.add_key(key) def get_keys(self, importer): keys = [self.txt_idx] content_type = ContentType.objects.get_for_model(self.__class__) base_q = Q(content_type=content_type, object_id=self.pk) subquery = Q(importer__isnull=True, user__isnull=True, group__isnull=True) subquery |= Q(user__isnull=True, group__isnull=True, importer=importer) if importer.user: subquery |= Q(user=importer.user, group__isnull=True, importer=importer) if importer.associated_group: subquery |= Q(user__isnull=True, group=importer.associated_group, importer=importer) q = ItemKey.objects.filter(base_q & subquery) for ik in q.exclude(key=self.txt_idx).all(): keys.append(ik.key) return keys @classmethod def generate_keys(cls): # content_type = ContentType.objects.get_for_model(cls) for item in cls.objects.all(): item.generate_key() def get_general_type_label(model, slug): obj = model.get_cache(slug) if not obj: return "" return str(obj) class HierarchicalType(GeneralType): parent = models.ForeignKey('self', blank=True, null=True, on_delete=models.SET_NULL, verbose_name=_("Parent")) class Meta: abstract = True def full_label(self): lbls = [self.label] item = self while item.parent: item = item.parent lbls.append(item.label) return " > ".join(reversed(lbls)) class TinyUrl(models.Model): CHAR_MAP = string.ascii_letters + string.digits CHAR_MAP_LEN = len(CHAR_MAP) link = models.URLField() @classmethod def index_to_char(cls, seq): return "".join([cls.CHAR_MAP[x] for x in seq]) def get_short_id(self): c_id = self.id digits = [] while c_id > 0: digits.append(c_id % self.CHAR_MAP_LEN) c_id //= self.CHAR_MAP_LEN digits.reverse() return self.index_to_char(digits) @classmethod def decode_id(cls, value): i = 0 for c in value: i = i * cls.CHAR_MAP_LEN + cls.CHAR_MAP.index(c) return i class ItemKey(models.Model): key = models.TextField(_("Key")) content_type = models.ForeignKey(ContentType) object_id = models.PositiveIntegerField() content_object = GenericForeignKey('content_type', 'object_id') importer = models.ForeignKey( Import, null=True, blank=True, help_text=_("Specific key to an import")) user = models.ForeignKey('IshtarUser', blank=True, null=True) group = models.ForeignKey(TargetKeyGroup, blank=True, null=True) def __str__(self): return self.key def get_image_path(instance, filename): # when using migrations instance is not a real ImageModel instance if not hasattr(instance, '_get_image_path'): return "upload/{}".format(filename) return instance._get_image_path(filename) class ImageContainerModel(object): def _get_image_path(self, filename): return "{}/{}".format(self._get_base_image_path(), filename) def _get_base_image_path(self): return "upload" class ImageModel(models.Model, ImageContainerModel): image = models.ImageField(upload_to=get_image_path, blank=True, null=True, max_length=255, help_text=max_size_help()) thumbnail = models.ImageField( upload_to=get_image_path, blank=True, null=True, max_length=255, help_text=max_size_help()) IMAGE_MAX_SIZE = settings.IMAGE_MAX_SIZE THUMB_MAX_SIZE = settings.THUMB_MAX_SIZE IMAGE_PREFIX = '' class Meta: abstract = True def has_changed(self, field): if not self.pk: return True manager = getattr(self.__class__, 'objects') old = getattr(manager.get(pk=self.pk), field) return not getattr(self, field) == old def create_thumb(self, image, size): """Returns the image resized to fit inside a box of the given size""" image.thumbnail(size, Image.ANTIALIAS) temp = BytesIO() image.save(temp, 'jpeg') temp.seek(0) return SimpleUploadedFile('temp', temp.read()) def save(self, *args, **kwargs): if 'force_copy' in kwargs: kwargs.pop('force_copy') super(ImageModel, self).save(*args, **kwargs) return # manage images if not self.has_changed('image'): return super(ImageModel, self).save(*args, **kwargs) if not self.image: self.thumbnail = None return super(ImageModel, self).save(*args, **kwargs) # # generate thumbnail # convert to jpg filename = os.path.splitext(os.path.split(self.image.name)[-1])[0] old_path = self.image.path filename = "%s.jpg" % filename image = None try: image = Image.open(self.image.file) # convert to RGB if image.mode not in ('L', 'RGB'): image = image.convert('RGB') # resize if necessary if self.IMAGE_MAX_SIZE: self.image.save(filename, self.create_thumb(image, self.IMAGE_MAX_SIZE), save=False) if old_path != self.image.path: try: os.remove(old_path) except OSError: # already clean pass # save the thumbnail thumb_filename = self._get_thumb_name(filename) self.thumbnail.save( thumb_filename, self.create_thumb(image, self.THUMB_MAX_SIZE), save=False) except IOError: pass finally: if image: image.close() return super(ImageModel, self).save(*args, **kwargs) def _get_thumb_name(self, filename): splited = filename.split('.') return "{}-thumb.{}".format( ".".join(splited[:-1]), splited[-1] ) class HistoryError(Exception): def __init__(self, value): self.value = value def __str__(self): return repr(self.value) PRIVATE_FIELDS = ('id', 'history_modifier', 'order', 'uuid') class BulkUpdatedItem(object): @classmethod def bulk_recursion(cls, transaction_id, extra_args): """ Prevent infinite recursion. Should not happen but wrong manipulation in the database or messy imports can generate circular relations :param transaction_id: current transaction ID (unix time) - if null a transaction ID is generated :param extra_args: arguments dealing with :return: (transaction ID, is a recursion) """ if not transaction_id: transaction_id = str(time.time()) args = ['cached_label_bulk_update', transaction_id] + extra_args key, val = get_cache(cls, args) if val: return transaction_id, True cache.set(key, 1, settings.CACHE_SMALLTIMEOUT) return transaction_id, False class RelationItem(models.Model): """ Items with relation between them """ MAIN_UP_MODEL_QUERY = "" relation_image = models.FileField( _("Generated relation image (SVG)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help() ) relation_bitmap_image = models.FileField( _("Generated relation image (PNG)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help() ) relation_dot = models.FileField( _("Generated relation image (DOT)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help() ) relation_image_above = models.FileField( _("Generated above relation image (SVG)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help() ) relation_dot_above = models.FileField( _("Generated above relation image (DOT)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help() ) relation_bitmap_image_above = models.FileField( _("Generated above relation image (PNG)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help() ) relation_image_below = models.FileField( _("Generated below relation image (SVG)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help() ) relation_dot_below = models.FileField( _("Generated below relation image (DOT)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help() ) relation_bitmap_image_below = models.FileField( _("Generated below relation image (PNG)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help() ) class Meta: abstract = True def generate_relation_image( self, highlight_current=True, render_above=True, render_below=True, full=False): generate_relation_graph(self, highlight_current=highlight_current, render_above=render_above, render_below=render_below, full=full) class JsonDataSectionManager(models.Manager): def get_by_natural_key(self, name, app_label, model): return self.get(name=name, content_type__app_label=app_label, content_type__model=model) class JsonDataSection(models.Model): content_type = models.ForeignKey(ContentType) name = models.CharField(_("Name"), max_length=200) order = models.IntegerField(_("Order"), default=10) objects = JsonDataSectionManager() class Meta: verbose_name = _("Json data - Menu") verbose_name_plural = _("Json data - Menus") ordering = ['order', 'name'] unique_together = ("name", "content_type") def natural_key(self): return (self.name, self.content_type.app_label, self.content_type.model) def __str__(self): return "{} - {}".format(self.content_type, self.name) JSON_VALUE_TYPES = ( ('T', _("Text")), ('LT', _("Long text")), ('I', _("Integer")), ('B', _("Boolean")), ('F', _("Float")), ('D', _("Date")), ('C', _("Choices")), ) class JsonDataFieldManager(models.Manager): def get_by_natural_key(self, key, app_label, model): return self.get(key=key, content_type__app_label=app_label, content_type__model=model) class JsonDataField(models.Model): name = models.CharField(_("Name"), max_length=200) content_type = models.ForeignKey(ContentType) key = models.CharField( _("Key"), max_length=200, help_text=_("Value of the key in the JSON schema. For hierarchical " "key use \"__\" to explain it. For instance for the key " "'my_subkey' with data such as {'my_key': {'my_subkey': " "'value'}}, its value will be reached with my_key__my_subkey.")) display = models.BooleanField(_("Display"), default=True) value_type = models.CharField(_("Type"), default="T", max_length=10, choices=JSON_VALUE_TYPES) order = models.IntegerField(_("Order"), default=10) search_index = models.BooleanField(_("Use in search indexes"), default=False) section = models.ForeignKey(JsonDataSection, blank=True, null=True, on_delete=models.SET_NULL) custom_forms = models.ManyToManyField( "CustomForm", blank=True, through="CustomFormJsonField") objects = JsonDataFieldManager() class Meta: verbose_name = _("Json data - Field") verbose_name_plural = _("Json data - Fields") ordering = ['order', 'name'] unique_together = ("content_type", "key") def natural_key(self): return (self.key, self.content_type.app_label, self.content_type.model) def __str__(self): return "{} - {}".format(self.content_type, self.name) def clean(self): if not self.section: return if self.section.content_type != self.content_type: raise ValidationError( _("Content types of the field and of the menu do not match")) class JsonData(models.Model, CachedGen): data = JSONField(default={}, blank=True) class Meta: abstract = True def pre_save(self): if not self.data: self.data = {} @property def json_sections(self): sections = [] try: content_type = ContentType.objects.get_for_model(self) except ContentType.DoesNotExists: return sections fields = list(JsonDataField.objects.filter( content_type=content_type, display=True, section__isnull=True ).all()) # no section fields fields += list(JsonDataField.objects.filter( content_type=content_type, display=True, section__isnull=False ).order_by('section__order', 'order').all()) for field in fields: value = None data = self.data.copy() for key in field.key.split('__'): if key in data: value = copy.copy(data[key]) data = data[key] else: value = None break if value is None: continue if type(value) in (list, tuple): value = " ; ".join([str(v) for v in value]) section_name = field.section.name if field.section else None if not sections or section_name != sections[-1][0]: # if section name is identical it is the same sections.append((section_name, [])) sections[-1][1].append((field.name, value)) return sections @classmethod def refresh_cache(cls): __, refreshed = get_cache(cls, ['cache_refreshed']) if refreshed and time.time() - refreshed < 1: return cache_ckey, current_keys = get_cache(cls, ['_current_keys']) if not current_keys: return for keys in current_keys: if keys[0] == '__get_dynamic_choices': cls._get_dynamic_choices(keys[1], force=True) @classmethod def _get_dynamic_choices(cls, key, force=False): """ Get choice from existing values :param key: data key :param force: if set to True do not use cache :return: tuple of choices (id, value) """ cache_key, value = get_cache(cls, ['__get_dynamic_choices', key]) if not force and value: return value choices = set() splitted_key = key[len('data__'):].split('__') q = cls.objects.filter( data__has_key=key[len('data__'):]).values_list('data', flat=True) for value in q.all(): for k in splitted_key: value = value[k] choices.add(value) choices = [('', '')] + [(v, v) for v in sorted(list(choices))] cache.set(cache_key, choices, settings.CACHE_SMALLTIMEOUT) return choices class Imported(models.Model): imports = models.ManyToManyField( Import, blank=True, related_name="imported_%(app_label)s_%(class)s") class Meta: abstract = True class SearchAltName(object): def __init__(self, search_key, search_query, extra_query=None, distinct_query=False): self.search_key = search_key self.search_query = search_query self.extra_query = extra_query or {} self.distinct_query = distinct_query class DynamicRequest(object): def __init__(self, label, app_name, model_name, form_key, search_key, type_query, search_query): self.label = label self.form_key = form_key self.search_key = search_key self.app_name = app_name self.model_name = model_name self.type_query = type_query self.search_query = search_query def get_all_types(self): model = apps.get_app_config(self.app_name).get_model(self.model_name) return model.objects.filter(available=True) def get_form_fields(self): fields = {} for item in self.get_all_types().all(): fields[self.form_key + "-" + item.txt_idx] = forms.CharField( label=str(self.label) + " " + str(item), required=False ) return fields def get_extra_query(self, slug): return { self.type_query: slug } def get_alt_names(self): alt_names = {} for item in self.get_all_types().all(): alt_names[self.form_key + "-" + item.txt_idx] = SearchAltName( self.search_key + "-" + item.txt_idx, self.search_query, self.get_extra_query(item.txt_idx), distinct_query=True ) return alt_names class SearchVectorConfig(object): def __init__(self, key, language=None, func=None): self.key = key if language: self.language = language if language == "local": self.language = settings.ISHTAR_SEARCH_LANGUAGE else: self.language = "simple" self.func = func def format(self, value): if value == 'None': value = '' if not self.func: return [value] return self.func(value) class FullSearch(models.Model): search_vector = SearchVectorField(_("Search vector"), blank=True, null=True, help_text=_("Auto filled at save")) EXTRA_REQUEST_KEYS = {} DYNAMIC_REQUESTS = {} ALT_NAMES = {} BASE_SEARCH_VECTORS = [] PROPERTY_SEARCH_VECTORS = [] INT_SEARCH_VECTORS = [] M2M_SEARCH_VECTORS = [] PARENT_SEARCH_VECTORS = [] # prevent circular dependency PARENT_ONLY_SEARCH_VECTORS = [] class Meta: abstract = True @classmethod def general_types(cls): for k in get_all_field_names(cls): field = cls._meta.get_field(k) if not hasattr(field, 'rel') or not field.rel: continue rel_model = field.rel.to if issubclass(rel_model, (GeneralType, HierarchicalType)): yield k @classmethod def get_alt_names(cls): alt_names = cls.ALT_NAMES.copy() for dr_k in cls.DYNAMIC_REQUESTS: alt_names.update(cls.DYNAMIC_REQUESTS[dr_k].get_alt_names()) return alt_names @classmethod def get_query_parameters(cls): query_parameters = {} for v in cls.get_alt_names().values(): for language_code, language_lbl in settings.LANGUAGES: activate(language_code) query_parameters[str(v.search_key)] = v deactivate() return query_parameters def _update_search_field(self, search_vector_conf, search_vectors, data): for value in search_vector_conf.format(data): with connection.cursor() as cursor: cursor.execute("SELECT to_tsvector(%s, %s)", [ search_vector_conf.language, value]) row = cursor.fetchone() search_vectors.append(row[0]) def _update_search_number_field(self, search_vectors, val): search_vectors.append("'{}':1".format(val)) def update_search_vector(self, save=True, exclude_parent=False): """ Update the search vector :param save: True if you want to save the object immediately :return: True if modified """ if not hasattr(self, 'search_vector'): return if not self.pk: # logger.warning("Cannot update search vector before save or " # "after deletion.") return if not self.BASE_SEARCH_VECTORS and not self.M2M_SEARCH_VECTORS \ and not self.INT_SEARCH_VECTORS \ and not self.PROPERTY_SEARCH_VECTORS \ and not self.PARENT_SEARCH_VECTORS: logger.warning("No search_vectors defined for {}".format( self.__class__)) return if getattr(self, '_search_updated', None): return self._search_updated = True old_search = "" if self.search_vector: old_search = self.search_vector[:] search_vectors = [] base_q = self.__class__.objects.filter(pk=self.pk) # many to many have to be queried one by one otherwise only one is fetch for m2m_search_vector in self.M2M_SEARCH_VECTORS: key = m2m_search_vector.key.split('__')[0] rel_key = getattr(self, key) for item in rel_key.values('pk').all(): query_dct = {key + "__pk": item['pk']} q = copy.copy(base_q).filter(**query_dct) q = q.annotate( search=SearchVector( m2m_search_vector.key, config=m2m_search_vector.language) ).values('search') search_vectors.append(q.all()[0]['search']) # int/float are not well managed by the SearchVector for int_search_vector in self.INT_SEARCH_VECTORS: q = base_q.values(int_search_vector.key) for val in int_search_vector.format( q.all()[0][int_search_vector.key]): self._update_search_number_field(search_vectors, val) if not exclude_parent: # copy parent vector fields for PARENT_SEARCH_VECTOR in self.PARENT_SEARCH_VECTORS: parent = getattr(self, PARENT_SEARCH_VECTOR) if hasattr(parent, 'all'): # m2m for p in parent.all(): search_vectors.append(p.search_vector) elif parent: search_vectors.append(parent.search_vector) for PARENT_ONLY_SEARCH_VECTOR in self.PARENT_ONLY_SEARCH_VECTORS: parent = getattr(self, PARENT_ONLY_SEARCH_VECTOR) if hasattr(parent, 'all'): # m2m for p in parent.all(): search_vectors.append( p.update_search_vector(save=False, exclude_parent=True) ) elif parent: search_vectors.append( parent.update_search_vector(save=False, exclude_parent=True) ) if self.BASE_SEARCH_VECTORS: # query "simple" fields q = base_q.values(*[sv.key for sv in self.BASE_SEARCH_VECTORS]) res = q.all()[0] for base_search_vector in self.BASE_SEARCH_VECTORS: data = res[base_search_vector.key] data = unidecode(str(data)) self._update_search_field(base_search_vector, search_vectors, data) if self.PROPERTY_SEARCH_VECTORS: for property_search_vector in self.PROPERTY_SEARCH_VECTORS: data = getattr(self, property_search_vector.key) if callable(data): data = data() if not data: continue data = str(data) self._update_search_field(property_search_vector, search_vectors, data) if hasattr(self, 'data') and self.data: content_type = ContentType.objects.get_for_model(self) for json_field in JsonDataField.objects.filter( content_type=content_type, search_index=True).all(): data = copy.deepcopy(self.data) no_data = False for key in json_field.key.split('__'): if key not in data: no_data = True break data = data[key] if no_data or not data: continue if json_field.value_type == 'B': if data is True: data = json_field.name else: continue elif json_field.value_type in ('I', 'F'): self._update_search_number_field(search_vectors, data) continue elif json_field.value_type == 'D': # only index year self._update_search_number_field(search_vectors, data.year) continue for lang in ("simple", settings.ISHTAR_SEARCH_LANGUAGE): with connection.cursor() as cursor: cursor.execute("SELECT to_tsvector(%s, %s)", [lang, data]) row = cursor.fetchone() search_vectors.append(row[0]) new_search_vector = merge_tsvectors(search_vectors) changed = old_search != new_search_vector self.search_vector = new_search_vector if save and changed: self.__class__.objects.filter(pk=self.pk).update( search_vector=new_search_vector) elif not save: return new_search_vector return changed class FixAssociated(object): ASSOCIATED = {} def fix_associated(self): for key in self.ASSOCIATED: item = getattr(self, key) if not item: continue dct = self.ASSOCIATED[key] for dct_key in dct: subkey, ctype = dct_key expected_values = dct[dct_key] if not isinstance(expected_values, (list, tuple)): expected_values = [expected_values] if hasattr(ctype, "txt_idx"): try: expected_values = [ctype.objects.get(txt_idx=v) for v in expected_values] except ctype.DoesNotExist: # type not yet initialized return current_vals = getattr(item, subkey) is_many = False if hasattr(current_vals, "all"): is_many = True current_vals = current_vals.all() else: current_vals = [current_vals] is_ok = False for current_val in current_vals: if current_val in expected_values: is_ok = True break if is_ok: continue # the first value is used new_value = expected_values[0] if is_many: getattr(item, subkey).add(new_value) else: setattr(item, subkey, new_value) class QRCodeItem(models.Model, ImageContainerModel): HAS_QR_CODE = True qrcode = models.ImageField(upload_to=get_image_path, blank=True, null=True, max_length=255) class Meta: abstract = True @property def qrcode_path(self): if not self.qrcode: self.generate_qrcode() if not self.qrcode: # error on qrcode generation return "" return self.qrcode.path def generate_qrcode(self, request=None, secure=True, tmpdir=None): url = self.get_absolute_url() site = Site.objects.get_current() if request: scheme = request.scheme else: if secure: scheme = "https" else: scheme = "http" url = scheme + "://" + site.domain + url tiny_url = TinyUrl() tiny_url.link = url tiny_url.save() short_url = scheme + "://" + site.domain + reverse( 'tiny-redirect', args=[tiny_url.get_short_id()]) qr = pyqrcode.create(short_url, version=settings.ISHTAR_QRCODE_VERSION) tmpdir_created = False if not tmpdir: tmpdir = tempfile.mkdtemp("-qrcode") tmpdir_created = True filename = tmpdir + os.sep + 'qrcode.png' qr.png(filename, scale=settings.ISHTAR_QRCODE_SCALE) with open(filename, 'rb') as qrfile: self.qrcode.save("qrcode.png", File(qrfile)) self.skip_history_when_saving = True self._no_move = True self.save() if tmpdir_created: shutil.rmtree(tmpdir) class DocumentItem(object): ALT_NAMES = { 'documents__image__isnull': SearchAltName( pgettext_lazy("key for text search", "has-image"), 'documents__image__isnull'), 'documents__associated_url__isnull': SearchAltName( pgettext_lazy("key for text search", "has-url"), 'documents__associated_url__isnull'), 'documents__associated_file__isnull': SearchAltName( pgettext_lazy("key for text search", "has-attached-file"), 'documents__associated_file__isnull'), } def public_representation(self): images = [] if getattr(self, "main_image", None): images.append(self.main_image.public_representation()) images += [ image.public_representation() for image in self.images_without_main_image.all() ] return {"images": images} @property def images(self): if not hasattr(self, 'documents'): return Document.objects.none() return self.documents.filter( image__isnull=False).exclude(image="").order_by("pk") @property def images_without_main_image(self): if not hasattr(self, 'main_image') or not hasattr(self, 'documents'): return self.images if not self.main_image: return self.documents.filter( image__isnull=False).exclude( image="").order_by("pk") return self.documents.filter( image__isnull=False).exclude( image="").exclude(pk=self.main_image.pk).order_by("pk") def get_extra_actions(self, request): """ For sheet template: return "Add document / image" action """ # url, base_text, icon, extra_text, extra css class, is a quick action try: actions = super(DocumentItem, self).get_extra_actions(request) except AttributeError: actions = [] if not hasattr(self, 'SLUG'): return actions can_add_doc = self.can_do(request, 'add_document') if can_add_doc and ( not hasattr(self, "is_locked") or not self.is_locked(request.user)): actions += [ ( reverse("create-document") + "?{}={}".format( self.SLUG, self.pk), _("Add document/image"), "fa fa-plus", _("doc./image"), "", False ) ] return actions class SpatialReferenceSystem(GeneralType): order = models.IntegerField(_("Order"), default=10) auth_name = models.CharField( _("Authority name"), default=u'EPSG', max_length=256) srid = models.IntegerField(_("Authority SRID")) class Meta: verbose_name = _("Spatial reference system") verbose_name_plural = _("Spatial reference systems") ordering = ('label',) post_save.connect(post_save_cache, sender=SpatialReferenceSystem) post_delete.connect(post_save_cache, sender=SpatialReferenceSystem) class GeoItem(models.Model): GEO_SOURCE = ( ('T', _("Town")), ('P', _("Precise")), ('M', _("Polygon")) ) # gis x = models.FloatField(_(u'X'), blank=True, null=True) y = models.FloatField(_(u'Y'), blank=True, null=True) z = models.FloatField(_(u'Z'), blank=True, null=True) estimated_error_x = models.FloatField(_(u'Estimated error for X'), blank=True, null=True) estimated_error_y = models.FloatField(_(u'Estimated error for Y'), blank=True, null=True) estimated_error_z = models.FloatField(_(u'Estimated error for Z'), blank=True, null=True) spatial_reference_system = models.ForeignKey( SpatialReferenceSystem, verbose_name=_("Spatial Reference System"), blank=True, null=True) point = models.PointField(_("Point"), blank=True, null=True, dim=3) point_2d = models.PointField(_("Point (2D)"), blank=True, null=True) point_source = models.CharField( _("Point source"), choices=GEO_SOURCE, max_length=1, blank=True, null=True) point_source_item = models.CharField( _("Point source item"), max_length=100, blank=True, null=True) multi_polygon = models.MultiPolygonField(_("Multi polygon"), blank=True, null=True) multi_polygon_source = models.CharField( _("Multi-polygon source"), choices=GEO_SOURCE, max_length=1, blank=True, null=True) multi_polygon_source_item = models.CharField( _("Multi polygon source item"), max_length=100, blank=True, null=True) GEO_LABEL = "" class Meta: abstract = True def get_town_centroid(self): raise NotImplementedError def get_town_polygons(self): raise NotImplementedError @property def display_coordinates(self): if not self.point_2d: return "" profile = get_current_profile() if not profile.display_srs or not profile.display_srs.srid: return self.x, self.y point = self.point_2d.transform(profile.display_srs.srid, clone=True) return round(point.x, 5), round(point.y, 5) @property def display_spatial_reference_system(self): profile = get_current_profile() if not profile.display_srs or not profile.display_srs.srid: return self.spatial_reference_system return profile.display_srs def get_precise_points(self): if self.point_source == 'P' and self.point_2d: return self.point_2d, self.point, self.point_source_item def get_precise_polygons(self): if self.multi_polygon_source == 'P' and self.multi_polygon: return self.multi_polygon, self.multi_polygon_source_item def most_precise_geo(self): if self.point_source == 'M': return 'multi_polygon' current_source = str(self.__class__._meta.verbose_name) if self.multi_polygon_source_item == current_source \ and (self.multi_polygon_source == "P" or self.point_source_item != current_source): return 'multi_polygon' if self.point_source_item == current_source\ and self.point_source == 'P': return 'point' if self.multi_polygon_source == 'P': return 'multi_polygon' if self.point_source == 'P': return 'point' if self.multi_polygon: return 'multi_polygon' if self.point_2d: return 'point' def geo_point_source(self): if not self.point_source: return "" src = "{} - {}".format( dict(self.GEO_SOURCE)[self.point_source], self.point_source_item ) return src def geo_polygon_source(self): if not self.multi_polygon_source: return "" src = "{} - {}".format( dict(self.GEO_SOURCE)[self.multi_polygon_source], self.multi_polygon_source_item ) return src def _geojson_serialize(self, geom_attr): if not hasattr(self, geom_attr): return "" cached_label_key = 'cached_label' if self.GEO_LABEL: cached_label_key = self.GEO_LABEL if getattr(self, "CACHED_LABELS", None): cached_label_key = self.CACHED_LABELS[-1] geojson = serialize( 'geojson', self.__class__.objects.filter(pk=self.pk), geometry_field=geom_attr, fields=(cached_label_key,)) geojson_dct = json.loads(geojson) profile = get_current_profile() precision = profile.point_precision features = geojson_dct.pop('features') for idx in range(len(features)): feature = features[idx] lbl = feature['properties'].pop(cached_label_key) feature['properties']['name'] = lbl feature['properties']['id'] = self.pk if precision is not None: geom_type = feature["geometry"].get("type", None) if geom_type == "Point": feature["geometry"]["coordinates"] = [ round(coord, precision) for coord in feature["geometry"]["coordinates"] ] geojson_dct['features'] = features geojson_dct['link_template'] = simple_link_to_window(self).replace( '999999', '' ) geojson = json.dumps(geojson_dct) return geojson @property def point_2d_geojson(self): return self._geojson_serialize('point_2d') @property def multi_polygon_geojson(self): return self._geojson_serialize('multi_polygon') class TemplateItem: @classmethod def _label_templates_q(cls): model_name = "{}.{}".format( cls.__module__, cls.__name__) q = Q(associated_model__klass=model_name, for_labels=True, available=True) alt_model_name = model_name.replace( "models_finds", "models").replace( "models_treatments", "models") if alt_model_name != model_name: q |= Q(associated_model__klass=model_name, for_labels=True, available=True) return DocumentTemplate.objects.filter(q) @classmethod def has_label_templates(cls): return cls._label_templates_q().count() @classmethod def label_templates(cls): return cls._label_templates_q() def get_extra_templates(self, request): cls = self.__class__ templates = [] name = str(cls.__name__) module = str(cls.__module__) if "archaeological_finds" in module: if "models_finds" in name or "models_treatments" in name: names = [ name, name.replace("models_finds", "models" ).replace("models_treatments", "models") ] else: names = [name, name.replace("models", "models_finds"), name.replace("models", "models_treatments")] else: names = [name] model_names = [ "{}.{}".format(module, name) for name in names ] q = DocumentTemplate.objects.filter( associated_model__klass__in=model_names, for_labels=False, available=True) for template in q.all(): urlname = "generate-document" templates.append( (template.name, reverse( urlname, args=[template.slug, self.pk])) ) return templates class StatisticItem: STATISTIC_MODALITIES = [] # example: "year", "operation_type__label" STATISTIC_MODALITIES_OPTIONS = OrderedDict() # example: # OrderedDict([('year', _("Year")), # ("operation_type__label", _("Operation type"))]) STATISTIC_SUM_VARIABLE = OrderedDict( (("pk", (_("Number"), 1)),) ) # example: "Price", "Volume" - the number is a multiplier class CascasdeUpdate: DOWN_MODEL_UPDATE = [] def cascade_update(self): for down_model in self.DOWN_MODEL_UPDATE: if not settings.USE_BACKGROUND_TASK: rel = getattr(self, down_model) if hasattr(rel.model, "need_update"): rel.update(need_update=True) continue for item in getattr(self, down_model).all(): cached_label_changed(item.__class__, instance=item) if hasattr(item, "point_2d"): post_save_geo(item.__class__, instance=item) def duplicate_item(item, user=None, data=None): model = item.__class__ new = model.objects.get(pk=item.pk) for field in model._meta.fields: # pk is in PRIVATE_FIELDS so: new.pk = None and a new # item will be created on save if field.name == "uuid": new.uuid = uuid.uuid4() elif field.name in PRIVATE_FIELDS: setattr(new, field.name, None) if user: new.history_user = user if data: for k in data: setattr(new, k, data[k]) new.save() # m2m fields m2m = [field.name for field in model._meta.many_to_many if field.name not in PRIVATE_FIELDS] for field in m2m: for val in getattr(item, field).all(): if val not in getattr(new, field).all(): getattr(new, field).add(val) return new class BaseHistorizedItem(StatisticItem, TemplateItem, FullSearch, Imported, JsonData, FixAssociated, CascasdeUpdate): """ Historized item with external ID management. All historized items are searchable and have a data json field. Historized items can be "locked" for edition. """ IS_BASKET = False SHOW_URL = None EXTERNAL_ID_KEY = '' EXTERNAL_ID_DEPENDENCIES = [] HISTORICAL_M2M = [] history_modifier = models.ForeignKey( User, related_name='+', on_delete=models.SET_NULL, verbose_name=_("Last editor"), blank=True, null=True) history_creator = models.ForeignKey( User, related_name='+', on_delete=models.SET_NULL, verbose_name=_("Creator"), blank=True, null=True) last_modified = models.DateTimeField(auto_now=True) history_m2m = JSONField(default={}, blank=True) need_update = models.BooleanField( verbose_name=_("Need update"), default=False) locked = models.BooleanField( verbose_name=_("Item locked for edition"), default=False) lock_user = models.ForeignKey( User, related_name='+', on_delete=models.SET_NULL, verbose_name=_("Locked by"), blank=True, null=True) ALT_NAMES = { 'history_creator': SearchAltName( pgettext_lazy("key for text search", u"created-by"), 'history_creator__ishtaruser__person__cached_label__iexact' ), 'history_modifier': SearchAltName( pgettext_lazy("key for text search", u"modified-by"), 'history_modifier__ishtaruser__person__cached_label__iexact' ), 'modified_before': SearchAltName( pgettext_lazy("key for text search", "modified-before"), 'last_modified__lte' ), 'modified_after': SearchAltName( pgettext_lazy("key for text search", "modified-after"), 'last_modified__gte' ), } class Meta: abstract = True @classmethod def get_verbose_name(cls): return cls._meta.verbose_name def is_locked(self, user=None): if not user: return self.locked return self.locked and (not self.lock_user or self.lock_user != user) def merge(self, item, keep_old=False): merge_model_objects(self, item, keep_old=keep_old) def public_representation(self): return {} def duplicate(self, user=None, data=None): return duplicate_item(self, user, data) def update_external_id(self, save=False): if not self.EXTERNAL_ID_KEY or ( self.external_id and not getattr(self, 'auto_external_id', False)): return external_id = get_external_id(self.EXTERNAL_ID_KEY, self) if external_id == self.external_id: return self.auto_external_id = True self.external_id = external_id self._cached_label_checked = False if save: self.skip_history_when_saving = True self.save() return external_id def get_last_history_date(self): q = self.history.values("history_date").order_by('-history_date') if not q.count(): return return q.all()[0]['history_date'] def get_previous(self, step=None, date=None, strict=False): """ Get a "step" previous state of the item """ assert step or date historized = self.history.all() item = None if step: if len(historized) <= step: # silently return the last step if too far in the history item = historized[len(historized) - 1] else: item = historized[step] else: for step, item in enumerate(historized): if item.history_date == date: break # ended with no match if item.history_date != date: return item._step = step if len(historized) != (step + 1): item._previous = historized[step + 1].history_date else: item._previous = None if step > 0: item._next = historized[step - 1].history_date else: item._next = None item.history_date = historized[step].history_date model = self.__class__ for k in get_all_field_names(model): field = model._meta.get_field(k) if hasattr(field, 'rel') and field.rel: if not hasattr(item, k + '_id'): setattr(item, k, getattr(self, k)) continue val = getattr(item, k + '_id') if not val: setattr(item, k, None) continue try: val = field.rel.to.objects.get(pk=val) setattr(item, k, val) except ObjectDoesNotExist: if strict: raise HistoryError("The class %s has no pk %d" % ( str(field.rel.to), val)) setattr(item, k, None) item.pk = self.pk return item @property def last_edition_date(self): try: return self.history.order_by('-history_date').all()[0].history_date except (AttributeError, IndexError): return @property def history_creation_date(self): try: return self.history.order_by('history_date').all()[0].history_date except (AttributeError, IndexError): return def rollback(self, date): """ Rollback to a previous state """ to_del, new_item = [], None for item in self.history.all(): if item.history_date == date: new_item = item break to_del.append(item) if not new_item: raise HistoryError("The date to rollback to doesn't exist.") try: field_keys = [f.name for f in self._meta.fields] for k in field_keys: if k != 'id' and hasattr(self, k): if not hasattr(new_item, k): k = k + "_id" setattr(self, k, getattr(new_item, k)) try: self.history_modifier = User.objects.get( pk=new_item.history_modifier_id) except User.ObjectDoesNotExist: pass self.save() saved_m2m = new_item.history_m2m.copy() for hist_key in self.HISTORICAL_M2M: # after each association m2m is rewrite - force the original # to be reset new_item.history_m2m = saved_m2m values = new_item.m2m_listing(hist_key, create=True) or [] hist_field = getattr(self, hist_key) hist_field.clear() for val in values: hist_field.add(val) # force label regeneration self._cached_label_checked = False self.save() except ObjectDoesNotExist: raise HistoryError("The rollback has failed.") # clean the obsolete history for historized_item in to_del: historized_item.delete() def m2m_listing(self, key): return getattr(self, key).all() def values(self): values = {} for f in self._meta.fields: k = f.name if k != 'id': values[k] = getattr(self, k) return values def get_absolute_url(self): try: return reverse('display-item', args=[self.SLUG, self.pk]) except NoReverseMatch: return def get_show_url(self): show_url = self.SHOW_URL if not show_url: show_url = 'show-' + self.__class__.__name__.lower() try: return reverse(show_url, args=[self.pk, '']) except NoReverseMatch: return @property def associated_filename(self): if [True for attr in ('get_town_label', 'get_department', 'reference', 'short_class_name') if not hasattr(self, attr)]: return '' items = [slugify(self.get_department()), slugify(self.get_town_label()).upper(), slugify(self.short_class_name), slugify(self.reference), slugify(self.name or '').replace('-', '_').capitalize()] last_edition_date = self.last_edition_date if last_edition_date: items.append(last_edition_date.strftime('%Y%m%d')) else: items.append('00000000') return "-".join([str(item) for item in items]) def save(self, *args, **kwargs): created = not self.pk if not getattr(self, 'skip_history_when_saving', False): assert hasattr(self, 'history_modifier') if created: self.history_creator = self.history_modifier # external ID can have related item not available before save external_id_updated = kwargs.pop('external_id_updated') \ if 'external_id_updated' in kwargs else False if not created and not external_id_updated: self.update_external_id() super(BaseHistorizedItem, self).save(*args, **kwargs) if created and self.update_external_id(): # force resave for external ID creation self.skip_history_when_saving = True self._updated_id = True return self.save(external_id_updated=True) for dep in self.EXTERNAL_ID_DEPENDENCIES: for obj in getattr(self, dep).all(): obj.update_external_id(save=True) self.fix_associated() return True LOGICAL_TYPES = ( ('above', _("Above")), ('below', _("Below")), ('equal', _("Equal")) ) class GeneralRelationType(GeneralType): order = models.IntegerField(_("Order"), default=1) symmetrical = models.BooleanField(_("Symmetrical")) tiny_label = models.CharField(_("Tiny label"), max_length=50, blank=True, null=True) inverse_relation = models.ForeignKey( 'self', verbose_name=_("Inverse relation"), blank=True, null=True) logical_relation = models.CharField( verbose_name=_("Logical relation"), max_length=10, choices=LOGICAL_TYPES, blank=True, null=True) class Meta: abstract = True def clean(self): # cannot have symmetrical and an inverse_relation if self.symmetrical and self.inverse_relation: raise ValidationError( _("Cannot have symmetrical and an inverse_relation")) def get_tiny_label(self): return self.tiny_label or self.label or "" def save(self, *args, **kwargs): obj = super(GeneralRelationType, self).save(*args, **kwargs) # after saving check that the inverse_relation of the inverse_relation # point to the saved object if self.inverse_relation \ and (not self.inverse_relation.inverse_relation or self.inverse_relation.inverse_relation != self): self.inverse_relation.inverse_relation = self self.inverse_relation.symmetrical = False self.inverse_relation.save() return obj class GeneralRecordRelations(object): @classmethod def general_types(cls): return ['relation_type'] def save(self, *args, **kwargs): super(GeneralRecordRelations, self).save(*args, **kwargs) # after saving create the symetric or the inverse relation sym_rel_type = self.relation_type if not self.relation_type.symmetrical: sym_rel_type = self.relation_type.inverse_relation # no symetric/inverse is defined if not sym_rel_type: return dct = {'right_record': self.left_record, 'left_record': self.right_record, 'relation_type': sym_rel_type} self.__class__.objects.get_or_create(**dct) return self def post_delete_record_relation(sender, instance, **kwargs): # delete symmetrical or inverse relation sym_rel_type = instance.relation_type if not instance.relation_type.symmetrical: sym_rel_type = instance.relation_type.inverse_relation # no symetric/inverse is defined if not sym_rel_type: return dct = {'right_record_id': instance.left_record_id, 'left_record_id': instance.right_record_id, 'relation_type': sym_rel_type} q = instance.__class__.objects.filter(**dct) if q.count(): q.delete() class SearchQuery(models.Model): label = models.TextField(_("Label"), blank=True) query = models.TextField(_("Query"), blank=True) content_type = models.ForeignKey(ContentType, verbose_name=_("Content type")) profile = models.ForeignKey("UserProfile", verbose_name=_("Profile")) is_alert = models.BooleanField(_("Is an alert"), default=False) class Meta: verbose_name = _("Search query") verbose_name_plural = _("Search queries") ordering = ['label'] def __str__(self): return str(self.label) class ShortMenuItem(object): """ Item available in the short menu """ UP_MODEL_QUERY = {} @classmethod def get_short_menu_class(cls, pk): return '' @property def short_class_name(self): return "" class QuickAction(object): """ Quick action available from tables """ def __init__(self, url, icon_class='', text='', target=None, rights=None, module=None): self.url = url self.icon_class = icon_class self.text = text self.rights = rights self.target = target self.module = module assert self.target in ('one', 'many', None) def is_available(self, user, session=None, obj=None): if self.module and not getattr(get_current_profile(), self.module): return False if not self.rights: # no restriction return True if not user or not hasattr(user, 'ishtaruser') or not user.ishtaruser: return False user = user.ishtaruser for right in self.rights: if user.has_perm(right, session=session, obj=obj): return True return False @property def rendered_icon(self): if not self.icon_class: return "" return "".format(self.icon_class) @property def base_url(self): if self.target is None: url = reverse(self.url) else: # put arbitrary pk for the target url = reverse(self.url, args=[0]) url = url[:-2] # all quick action url have to finish with the # pk of the selected item and a "/" return url class MainItem(ShortMenuItem): """ Item with quick actions available from tables Extra actions are available from sheets """ QUICK_ACTIONS = [] @classmethod def get_quick_actions(cls, user, session=None, obj=None): """ Get a list of (url, title, icon, target) actions for an user """ qas = [] for action in cls.QUICK_ACTIONS: if not action.is_available(user, session=session, obj=obj): continue qas.append([action.base_url, mark_safe(action.text), mark_safe(action.rendered_icon), action.target or ""]) return qas @classmethod def get_quick_action_by_url(cls, url): for action in cls.QUICK_ACTIONS: if action.url == url: return action def regenerate_external_id(self): if not hasattr(self, "external_id"): return self.skip_history_when_saving = True self._no_move = True if hasattr(self, "auto_external_id"): self.external_id = None self.save() def get_extra_actions(self, request): if not hasattr(self, 'SLUG'): return [] actions = [] if request.user.is_superuser and hasattr(self, "auto_external_id"): actions += [ ( reverse("regenerate-external-id") + "?{}={}".format( self.SLUG, self.pk), _("Regenerate ID"), "fa fa-key", _("regen."), "", True ) ] return actions class LightHistorizedItem(BaseHistorizedItem): history_date = models.DateTimeField(default=datetime.datetime.now) class Meta: abstract = True def save(self, *args, **kwargs): super(LightHistorizedItem, self).save(*args, **kwargs) return self PARSE_FORMULA = re.compile("{([^}]*)}") def _deduplicate(value): new_values = [] for v in value.split(u'-'): if v not in new_values: new_values.append(v) return u'-'.join(new_values) FORMULA_FILTERS = { 'upper': lambda x: x.upper(), 'lower': lambda x: x.lower(), 'capitalize': lambda x: x.capitalize(), 'slug': lambda x: slugify(x), 'deduplicate': _deduplicate } def get_external_id(key, item): profile = get_current_profile() if not hasattr(profile, key): return formula = getattr(profile, key) dct = {} for fkey in PARSE_FORMULA.findall(formula): filtered = fkey.split('|') initial_key = fkey[:] fkey = filtered[0] filters = [] for filtr in filtered[1:]: if filtr in FORMULA_FILTERS: filters.append(FORMULA_FILTERS[filtr]) if fkey.startswith('settings__'): dct[fkey] = getattr(settings, fkey[len('settings__'):]) or '' continue obj = item for k in fkey.split('__'): try: obj = getattr(obj, k) except ObjectDoesNotExist: obj = None if hasattr(obj, 'all') and hasattr(obj, 'count'): # query manager if not obj.count(): break obj = obj.all()[0] elif callable(obj): obj = obj() if obj is None: break if obj is None: dct[initial_key] = '' else: dct[initial_key] = str(obj) for filtr in filters: dct[initial_key] = filtr(dct[initial_key]) values = formula.format(**dct).split('||') value = values[0] for filtr in values[1:]: if filtr not in FORMULA_FILTERS: value += '||' + filtr continue value = FORMULA_FILTERS[filtr](value) return value CURRENCY = (("€", _("Euro")), ("$", _("US dollar"))) FIND_INDEX_SOURCE = (("O", _("Operations")), ("CR", _("Context records"))) SITE_LABELS = [('site', _("Site")), ('entity', _("Archaeological entity"))] TRANSLATED_SITE_LABELS = { 'site': { 'search': _("Site search"), 'new': _("New site"), 'modification': _("Site modification"), 'deletion': _("Site deletion"), "attached-to-operation": _("Site (attached to the operation)"), "name-attached-to-operation": _("Site name (attached to the operation)"), "attached-to-cr": _("Site (attached to the context record)"), "name-attached-to-cr": _("Site name (attached to the context record)"), }, 'entity': { 'search': _("Archaeological entity search"), 'new': _("New archaeological entity"), 'modification': _("Archaeological entity modification"), 'deletion': _("Archaeological entity deletion"), "attached-to-operation": _("Archaeological entity (attached to the " "operation)"), "name-attached-to-operation": _("Archaeological entity name (attached " "to the operation)"), "attached-to-cr": _("Archaeological entity (attached to the context " "record)"), "name-attached-to-cr": _("Archaeological entity name (attached to the context record)"), }, } ACCOUNT_NAMING_STYLE = ( ('NF', _("name.firstname")), ('FN', _("firstname.name")), ) class IshtarSiteProfile(models.Model, Cached): slug_field = 'slug' label = models.TextField(_("Name")) slug = models.SlugField(_("Slug"), unique=True) active = models.BooleanField(_("Current active"), default=False) experimental_feature = models.BooleanField( _("Activate experimental feature"), default=False) description = models.TextField(_("Description"), null=True, blank=True) warning_name = models.TextField(_("Warning name"), blank=True) warning_message = models.TextField(_("Warning message"), blank=True) config = models.CharField( _("Alternate configuration"), max_length=200, choices=ALTERNATE_CONFIGS_CHOICES, help_text=_("Choose an alternate configuration for label, " "index management"), null=True, blank=True ) files = models.BooleanField(_("Files module"), default=False) archaeological_site = models.BooleanField( _("Archaeological site module"), default=False) archaeological_site_label = models.CharField( _("Archaeological site type"), max_length=200, choices=SITE_LABELS, default='site' ) context_record = models.BooleanField(_("Context records module"), default=False) find = models.BooleanField(_("Finds module"), default=False, help_text=_("Need context records module")) find_index = models.CharField( _("Find index is based on"), default='O', max_length=2, choices=FIND_INDEX_SOURCE, help_text=_("To prevent irrelevant indexes, change this parameter " "only if there is no find in the database")) warehouse = models.BooleanField( _("Warehouses module"), default=False, help_text=_("Need finds module")) preservation = models.BooleanField(_("Preservation module"), default=False) mapping = models.BooleanField(_("Mapping module"), default=False) point_precision = models.IntegerField( _("Point precision (search and sheets)"), null=True, blank=True, help_text=_( "Number of digit to round from the decimal point for coordinates " "in WGS84 (latitude, longitude). Empty value means no round." ) ) locate_warehouses = models.BooleanField( _("Locate warehouse and containers"), default=False, help_text=_( "Mapping module must be activated. With many containers and " "background task not activated, activating this option may " "consume many resources.") ) use_town_for_geo = models.BooleanField( _("Use town to locate when coordinates are missing"), default=True) relation_graph = models.BooleanField(_("Generate relation graph"), default=False) underwater = models.BooleanField(_("Underwater module"), default=False) parcel_mandatory = models.BooleanField( _("Parcel are mandatory for context records"), default=True) homepage = models.TextField( _("Home page"), null=True, blank=True, help_text=_("Homepage of Ishtar - if not defined a default homepage " "will appear. Use the markdown syntax. {random_image} " "can be used to display a random image.")) operation_prefix = models.CharField( _("Main operation code prefix"), default=u'OA', null=True, blank=True, max_length=20 ) default_operation_prefix = models.CharField( _("Default operation code prefix"), default=u'OP', null=True, blank=True, max_length=20 ) operation_region_code = models.CharField( _("Operation region code"), null=True, blank=True, max_length=5 ) file_external_id = models.TextField( _("File external id"), default="{year}-{numeric_reference}", help_text=_("Formula to manage file external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive.")) parcel_external_id = models.TextField( _("Parcel external id"), default="{associated_file__external_id}{operation__code_patriarche}-" "{town__numero_insee}-{section}{parcel_number}", help_text=_("Formula to manage parcel external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive.")) context_record_external_id = models.TextField( _("Context record external id"), default="{parcel__external_id}-{label}", help_text=_("Formula to manage context record external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive.")) base_find_external_id = models.TextField( _("Base find external id"), default="{context_record__external_id}-{label}", help_text=_("Formula to manage base find external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive.")) find_external_id = models.TextField( _("Find external id"), default="{get_first_base_find__context_record__external_id}-{label}", help_text=_("Formula to manage find external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive.")) container_external_id = models.TextField( _("Container external id"), default="{responsible__external_id}-{index}", help_text=_("Formula to manage container external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive.")) warehouse_external_id = models.TextField( _("Warehouse external id"), default="{name|slug}", help_text=_("Formula to manage warehouse external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive.")) document_external_id = models.TextField( _("Document external id"), default="{index}", help_text=_("Formula to manage document external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive.")) person_raw_name = models.TextField( _("Raw name for person"), default="{name|upper} {surname}", help_text=_("Formula to manage person raw_name. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive.")) find_use_index = models.BooleanField(_("Use auto index for finds"), default=True) currency = models.CharField(_("Currency"), default="€", choices=CURRENCY, max_length=5) account_naming_style = models.CharField( _("Naming style for accounts"), max_length=2, default="NF", choices=ACCOUNT_NAMING_STYLE ) default_center = models.PointField( _("Maps - default center"), default='SRID=4326;POINT(2.4397 46.5528)') default_zoom = models.IntegerField( _("Maps - default zoom"), default=6) display_srs = models.ForeignKey( SpatialReferenceSystem, verbose_name=_("Spatial Reference System for display"), blank=True, null=True, help_text=_("Spatial Reference System used for display when no SRS is " "defined") ) objects = SlugModelManager() class Meta: verbose_name = _("Ishtar site profile") verbose_name_plural = _("Ishtar site profiles") ordering = ['label'] def __str__(self): return str(self.label) def natural_key(self): return (self.slug,) def has_overload(self, key): return self.config and self.config in ALTERNATE_CONFIGS and \ hasattr(ALTERNATE_CONFIGS[self.config], key) @classmethod def get_current_profile(cls, force=False): cache_key, value = get_cache(cls, ['is-current-profile']) if value and not force: return value q = cls.objects.filter(active=True) if not q.count(): obj = cls.objects.create( label="Default profile", slug='default', active=True) else: obj = q.all()[0] cache.set(cache_key, obj, settings.CACHE_TIMEOUT) return obj @classmethod def get_default_site_label(cls, key=None): return cls.get_current_profile().get_site_label(key) def get_site_label(self, key=None): if not key: return str(dict(SITE_LABELS)[self.archaeological_site_label]) return str( TRANSLATED_SITE_LABELS[self.archaeological_site_label][key] ) def save(self, *args, **kwargs): raw = False if 'raw' in kwargs: raw = kwargs.pop('raw') super(IshtarSiteProfile, self).save(*args, **kwargs) obj = self if raw: return obj q = self.__class__.objects.filter(active=True).exclude(slug=self.slug) if obj.active and q.count(): for profile in q.all(): profile.active = False profile.save(raw=True) changed = False if not obj.active and not q.count(): obj.active = True changed = True if obj.warehouse and not obj.find: obj.find = True changed = True if obj.find and not obj.context_record: obj.context_record = True changed = True if changed: obj = obj.save(raw=True) return obj def get_current_profile(force=False): return IshtarSiteProfile.get_current_profile(force=force) def _profile_mapping(): return get_current_profile().mapping profile_mapping = lazy(_profile_mapping) def cached_site_changed(sender, **kwargs): get_current_profile(force=True) from ishtar_common.menus import Menu MAIN_MENU = Menu(None) MAIN_MENU.init() MAIN_MENU.reinit_menu_for_all_user() post_save.connect(cached_site_changed, sender=IshtarSiteProfile) post_delete.connect(cached_site_changed, sender=IshtarSiteProfile) class CustomFormManager(models.Manager): def get_by_natural_key(self, name, form): return self.get(name=name, form=form) class CustomForm(models.Model): name = models.CharField(_("Name"), max_length=250) form = models.CharField(_("Form"), max_length=250) available = models.BooleanField(_("Available"), default=True) enabled = models.BooleanField( _("Enable this form"), default=True, help_text=_("Disable with caution: disabling a form with mandatory " "fields may lead to database errors.")) apply_to_all = models.BooleanField( _("Apply to all"), default=False, help_text=_("Apply this form to all users. If set to True, selecting " "user and user type is useless.")) users = models.ManyToManyField('IshtarUser', blank=True) user_types = models.ManyToManyField('PersonType', blank=True) objects = CustomFormManager() SERIALIZATION_EXCLUDE = ("users", ) class Meta: verbose_name = _("Custom form") verbose_name_plural = _("Custom forms") ordering = ['name', 'form'] unique_together = (('name', 'form'),) def natural_key(self): return (self.name, self.form) def __str__(self): return "{} - {}".format(self.name, self.form) def users_lbl(self): users = [str(user) for user in self.users.all()] return " ; ".join(users) users_lbl.short_description = _("Users") def user_types_lbl(self): user_types = [str(u) for u in self.user_types.all()] return " ; ".join(user_types) user_types_lbl.short_description = _("User types") @classmethod def register(cls): if hasattr(cls, '_register') and hasattr(cls, '_register_fields'): return cls._register, cls._register_fields cache_key, value = get_cache(cls.__class__, ['dct-forms'], app_label='ishtar_common') cache_key_fields, value_fields = get_cache( cls.__class__, ['dct-fields'], app_label='ishtar_common') if value and value_fields: cls._register = value cls._register_fields = value_fields return cls._register, cls._register_fields cls._register, cls._register_fields = {}, {} # ideally should be improved but only used in admin from ishtar_common.admin import ISHTAR_FORMS from ishtar_common.forms import CustomForm as CustomFormForm for app_form in ISHTAR_FORMS: app_name = app_form.__package__ if app_name == "archaeological_files_pdl": app_name = "archaeological_files" for form in dir(app_form): if 'Form' not in form and 'Select' not in form: # not very clean... but do not treat inappropriate items continue form = getattr(app_form, form) if not inspect.isclass(form) \ or not issubclass(form, CustomFormForm) \ or not getattr(form, 'form_slug', None): continue model_name = form.form_slug.split('-')[0].replace('_', '') if app_name not in cls._register_fields: cls._register_fields[app_name] = [] if model_name not in cls._register_fields[app_name]: cls._register_fields[app_name].append(model_name) cls._register[form.form_slug] = form return cls._register, cls._register_fields def get_form_class(self): register, register_fields = self.register() if self.form not in self._register: return return register[self.form] def get_available_json_fields(self): register, register_fields = self.register() if self.form not in self._register: return [] current_form = register[self.form] app_name = current_form.__module__.split('.')[0] if app_name == "archaeological_files_pdl": app_name = "archaeological_files" if app_name not in register_fields: return [] res = [] for model_name in register_fields[app_name]: q = ContentType.objects.filter(app_label=app_name, model=model_name) if not q.count(): continue ct = q.all()[0] for json_field in JsonDataField.objects.filter( content_type=ct).all(): res.append((json_field.pk, "{} ({})".format( json_field.name, dict(JSON_VALUE_TYPES)[json_field.value_type]))) return res class ExcludedFieldManager(models.Manager): def get_by_natural_key(self, custom_form_name, custom_form_form, field): return self.get(custom_form__name=custom_form_name, custom_form__form=custom_form_form, field=field) class ExcludedField(models.Model): custom_form = models.ForeignKey(CustomForm, related_name='excluded_fields') field = models.CharField(_("Field"), max_length=250) objects = ExcludedFieldManager() class Meta: verbose_name = _("Excluded field") verbose_name_plural = _("Excluded fields") unique_together = ("custom_form", "field") def natural_key(self): return (self.custom_form.name , self.custom_form.form, self.field) class CustomFormJsonFieldManager(models.Manager): def get_by_natural_key(self, custom_form_name, custom_form_form, json_field_key, json_field_app_label, json_field_model): return self.get( custom_form__name=custom_form_name, custom_form__form=custom_form_form, json_field__key=json_field_key, json_field__content_type__app_label=json_field_app_label, json_field__content_type__model=json_field_model ) class CustomFormJsonField(models.Model): custom_form = models.ForeignKey(CustomForm, related_name='json_fields') json_field = models.ForeignKey(JsonDataField, related_name='custom_form_details') label = models.CharField(_("Label"), max_length=200, blank=True, default='') order = models.IntegerField(verbose_name=_("Order"), default=1) help_text = models.TextField(_("Help"), blank=True, null=True) objects = CustomFormJsonFieldManager() class Meta: verbose_name = _("Custom form - Json data field") verbose_name_plural = _("Custom form - Json data fields") unique_together = ("custom_form", "json_field") def natural_key(self): return ( self.custom_form.name, self.custom_form.form, self.json_field.key, self.json_field.content_type.app_label, self.json_field.content_type.model ) class GlobalVar(models.Model, Cached): slug = models.SlugField(_("Variable name"), unique=True) description = models.TextField(_("Description of the variable"), null=True, blank=True) value = models.TextField(_("Value"), null=True, blank=True) objects = SlugModelManager() class Meta: verbose_name = _("Global variable") verbose_name_plural = _("Global variables") ordering = ['slug'] def natural_key(self): return (self.slug,) def __str__(self): return str(self.slug) def cached_globalvar_changed(sender, **kwargs): if not kwargs['instance']: return var = kwargs['instance'] cache_key, value = get_cache(GlobalVar, var.slug) cache.set(cache_key, var.value, settings.CACHE_TIMEOUT) post_save.connect(cached_globalvar_changed, sender=GlobalVar) class UserDashboard: def __init__(self): types = IshtarUser.objects.values('person__person_types', 'person__person_types__label') self.types = types.annotate(number=Count('pk')) \ .order_by('person__person_types') class StatsCache(models.Model): model = models.CharField(_("Model name"), max_length=200) model_pk = models.IntegerField(_("Associated primary key")) values = JSONField(default={}, blank=True) updated = models.DateTimeField(default=datetime.datetime.now) update_requested = models.DateTimeField(blank=True, null=True) class Meta: verbose_name = _("Cache for stats") verbose_name_plural = _("Caches for stats") def update_stats(statscache, item, funcname): if not settings.USE_BACKGROUND_TASK: current_values = statscache.values if not current_values: current_values = {} value = getattr(item, funcname)() current_values[funcname] = value statscache.values = current_values statscache.updated = datetime.datetime.now() statscache.save() return current_values now = datetime.datetime.now() app_name = item._meta.app_label model_name = item._meta.model_name statscache.update_requested = now.isoformat() statscache.save() _update_stats.delay(app_name, model_name, item.pk, funcname) return statscache.values def __get_stats_cache_values(model_name, model_pk): q = StatsCache.objects.filter( model=model_name, model_pk=model_pk ) nb = q.count() if nb >= 1: sc = q.all()[0] for extra in q.order_by("-id").all()[1:]: extra.delete() else: sc = StatsCache.objects.create( model=model_name, model_pk=model_pk ) values = sc.values if not values: values = {} return sc, values @task() def _update_stats(app, model, model_pk, funcname): model_name = app + "." + model model = apps.get_model(app, model) try: item = model.objects.get(pk=model_pk) except model.DoesNotExist: return value = getattr(item, funcname)() sc, current_values = __get_stats_cache_values(model_name, model_pk) current_values[funcname] = value sc.values = current_values sc.update_requested = None sc.updated = datetime.datetime.now() sc.save() class DashboardFormItem(object): """ Provide methods to manage statistics """ def _get_or_set_stats(self, funcname, update, timeout=settings.CACHE_TIMEOUT, expected_type=None): model_name = self._meta.app_label + "." + self._meta.model_name sc, __ = StatsCache.objects.get_or_create( model=model_name, model_pk=self.pk ) now = datetime.datetime.now() values = {} from_cache = False if not update and sc.values and funcname in sc.values and ( sc.updated + datetime.timedelta(seconds=timeout)) > now: values = sc.values from_cache = True if funcname not in values: values = update_stats(sc, self, funcname) if funcname in values: values = values[funcname] else: values = 0 if expected_type is not None and not isinstance(values, expected_type): if from_cache: return self._get_or_set_stats(funcname, True, expected_type=expected_type) else: return expected_type() return values @classmethod def get_periods(cls, slice='month', fltr={}, date_source='creation'): date_var = date_source + '_date' q = cls.objects.filter(**{date_var + '__isnull': False}) if fltr: q = q.filter(**fltr) if slice == 'year': return [res[date_var].year for res in list(q.values(date_var) .annotate( Count("id")).order_by())] elif slice == 'month': return [(res[date_var].year, res[date_var].month) for res in list(q.values(date_var) .annotate(Count("id")).order_by())] return [] @classmethod def get_by_year(cls, year, fltr={}, date_source='creation'): date_var = date_source + '_date' q = cls.objects.filter(**{date_var + '__isnull': False}) if fltr: q = q.filter(**fltr) return q.filter( **{date_var + '__year': year}).order_by('pk').distinct('pk') @classmethod def get_by_month(cls, year, month, fltr={}, date_source='creation'): date_var = date_source + '_date' q = cls.objects.filter(**{date_var + '__isnull': False}) if fltr: q = q.filter(**fltr) q = q.filter( **{date_var + '__year': year, date_var + '__month': month}) return q.order_by('pk').distinct('pk') @classmethod def get_total_number(cls, fltr=None): q = cls.objects if fltr: q = q.filter(**fltr) return q.order_by('pk').distinct('pk').count() class Dashboard(object): def __init__(self, model, slice='year', date_source=None, show_detail=None, fltr=None): if not fltr: fltr = {} # don't provide date_source if it is not relevant self.model = model self.total_number = model.get_total_number(fltr) self.show_detail = show_detail history_model = self.model.history.model # last edited - created self.recents, self.lasts = [], [] for last_lst, modif_type in ((self.lasts, '+'), (self.recents, '~')): last_ids = history_model.objects.values('id') \ .annotate(hd=Max('history_date')) last_ids = last_ids.filter(history_type=modif_type) from archaeological_finds.models import Find if self.model == Find: last_ids = last_ids.filter( downstream_treatment_id__isnull=True) if modif_type == '+': last_ids = last_ids.filter( upstream_treatment_id__isnull=True) last_ids = last_ids.order_by('-hd').distinct().all()[:5] for idx in last_ids: try: obj = self.model.objects.get(pk=idx['id']) except: # deleted object are always referenced in history continue obj.history_date = idx['hd'] last_lst.append(obj) # years base_kwargs = {'fltr': fltr.copy()} if date_source: base_kwargs['date_source'] = date_source periods_kwargs = copy.deepcopy(base_kwargs) periods_kwargs['slice'] = slice self.periods = model.get_periods(**periods_kwargs) self.periods = list(set(self.periods)) self.periods.sort() if not self.total_number or not self.periods: return kwargs_num = copy.deepcopy(base_kwargs) self.serie_labels = [_("Total")] # numbers if slice == 'year': self.values = [('year', "", list(reversed(self.periods)))] self.numbers = [model.get_by_year(year, **kwargs_num).count() for year in self.periods] self.values += [('number', _("Number"), list(reversed(self.numbers)))] if slice == 'month': periods = list(reversed(self.periods)) self.periods = ["%d-%s-01" % (p[0], ('0' + str(p[1])) if len(str(p[1])) == 1 else p[1]) for p in periods] self.values = [('month', "", self.periods)] if show_detail: for dpt, lbl in settings.ISHTAR_DPTS: self.serie_labels.append(str(dpt)) idx = 'number_' + str(dpt) kwargs_num['fltr']["towns__numero_insee__startswith"] = \ str(dpt) numbers = [model.get_by_month(*p.split('-')[:2], **kwargs_num).count() for p in self.periods] self.values += [(idx, dpt, list(numbers))] # put "Total" at the end self.serie_labels.append(self.serie_labels.pop(0)) kwargs_num = base_kwargs.copy() self.numbers = [model.get_by_month(*p.split('-')[:2], **kwargs_num).count() for p in self.periods] self.values += [('number', _("Total"), list(self.numbers))] # calculate self.average = self.get_average() self.variance = self.get_variance() self.standard_deviation = self.get_standard_deviation() self.median = self.get_median() self.mode = self.get_mode() # by operation if not hasattr(model, 'get_by_operation'): return operations = model.get_operations() operation_numbers = [model.get_by_operation(op).count() for op in operations] # calculate self.operation_average = self.get_average(operation_numbers) self.operation_variance = self.get_variance(operation_numbers) self.operation_standard_deviation = self.get_standard_deviation( operation_numbers) self.operation_median = self.get_median(operation_numbers) operation_mode_pk = self.get_mode(dict(zip(operations, operation_numbers))) if operation_mode_pk: from archaeological_operations.models import Operation self.operation_mode = str( Operation.objects.get(pk=operation_mode_pk)) def get_average(self, vals=[]): if not vals: vals = self.numbers[:] return sum(vals) / len(vals) def get_variance(self, vals=[]): if not vals: vals = self.numbers[:] avrg = self.get_average(vals) return self.get_average([(x - avrg) ** 2 for x in vals]) def get_standard_deviation(self, vals=None): if not vals: vals = self.numbers[:] return round(self.get_variance(vals) ** 0.5, 3) def get_median(self, vals=None): if not vals: vals = self.numbers[:] len_vals = len(vals) vals.sort() if (len_vals % 2) == 1: return vals[int(len_vals / 2)] else: return (vals[int(len_vals / 2) - 1] + vals[int(len_vals / 2)]) / 2.0 def get_mode(self, vals=None): if not vals: vals = dict(zip(self.periods, self.numbers[:])) mx = max(vals.values()) for v in vals: if vals[v] == mx: return v class DocumentTemplate(models.Model): name = models.CharField(_("Name"), max_length=100) slug = models.SlugField(_("Slug"), max_length=100, unique=True) associated_model = models.ForeignKey(ImporterModel) template = models.FileField( _("Template"), upload_to="templates/%Y/", help_text=max_size_help()) available = models.BooleanField(_("Available"), default=True) for_labels = models.BooleanField(_("Used for labels"), default=False) label_per_page = models.IntegerField( _("Number of label per page"), blank=True, null=True, help_text=_("Only relevant for label template") ) objects = SlugModelManager() SERIALIZATION_FILES = ("template", ) class Meta: verbose_name = _("Document template") verbose_name_plural = _("Document templates") ordering = ['associated_model', 'name'] def __str__(self): return self.name def natural_key(self): return (self.slug,) def clean(self): if self.for_labels and not self.label_per_page: raise ValidationError(_("For label template, you must provide " "number of label per page.")) def save(self, *args, **kwargs): if not self.slug: self.slug = create_slug(DocumentTemplate, self.name) return super(DocumentTemplate, self).save(*args, **kwargs) @classmethod def get_tuples(cls, dct=None, empty_first=True): if not dct: dct = {} dct['available'] = True if empty_first: yield ('', '----------') items = cls.objects.filter(**dct) for item in items.distinct().order_by(*cls._meta.ordering).all(): yield (item.pk, _(str(item))) def get_baselink_for_labels(self): return reverse('generate-labels', args=[self.slug]) def publish(self, c_object): tempdir = tempfile.mkdtemp("-ishtardocs") output_name = tempdir + os.path.sep + \ slugify(self.name.replace(' ', '_').lower()) + u'-' + \ datetime.date.today().strftime('%Y-%m-%d') + \ "." + self.template.name.split('.')[-1] values = c_object.get_values() engine = IshtarSecretaryRenderer() try: result = engine.render(self.template, **values) except TemplateSyntaxError as e: raise TemplateSyntaxError(str(e), e.lineno) except UndefinedError as e: raise TemplateSyntaxError(str(e), 0) except Exception as e: raise TemplateSyntaxError(str(e), 0) with open(output_name, 'wb') as output: output.write(result) return output_name def get_filter(self, template, regexp_list=None): if not regexp_list: return None z = zipfile.ZipFile(template) content = z.open('content.xml') full_content = content.read().decode("utf-8") filtr = [] for regexp in regexp_list: iter = re.finditer( regexp, full_content) for s in iter: key = s.groups()[0] if key not in filtr: filtr.append(key) return filtr def publish_labels(self, objects): if not objects: return tempdir = tempfile.mkdtemp("-ishtarlabels") main_output_name = tempdir + os.path.sep + \ slugify(self.name.replace(' ', '_').lower()) + u'-' + \ datetime.datetime.now().strftime('%Y-%m-%d-%H%M%S') suffix = "." + self.template.name.split('.')[-1] len_objects = len(objects) names = [] filtr = self.get_filter( self.template, # e.g.: {{items.4.key}} [r'{{ *items\.\d\.([A-Za-z0-9_.]*)(\|[A-Za-z0-9_.-]*)* *}}'] ) for idx in range(int(len(objects) / self.label_per_page) + 1): if idx * self.label_per_page >= len_objects: break values = {"items": []} for subidx in range(self.label_per_page): c_idx = idx * self.label_per_page + subidx if c_idx >= len_objects: break obj = objects[c_idx] values["items"].append(obj.get_values(filtr=filtr)) engine = IshtarSecretaryRenderer() try: result = engine.render(self.template, **values) except TemplateSyntaxError as e: raise TemplateSyntaxError(str(e), e.lineno) output_name = main_output_name + "-" + str(idx) + suffix names.append(output_name) with open(output_name, 'wb') as output: output.write(result) output_name = main_output_name + suffix o = OOoPy(infile=names[0], outfile=output_name) if len(names) > 1: t = OOTransformer( o.mimetype, OOTransforms.get_meta(o.mimetype), OOTransforms.Concatenate(*(names[1:])), OOTransforms.renumber_all(o.mimetype), OOTransforms.set_meta(o.mimetype), OOTransforms.Fix_OOo_Tag(), OOTransforms.Manifest_Append() ) t.transform (o) o.close() return output_name class NumberManager(models.Manager): def get_by_natural_key(self, number): return self.get(number=number) class State(models.Model): label = models.CharField(_("Label"), max_length=30) number = models.CharField(_("Number"), unique=True, max_length=3) objects = NumberManager() class Meta: verbose_name = _("State") ordering = ['number'] def __str__(self): return self.label def natural_key(self): return (self.number,) class Department(models.Model): label = models.CharField(_("Label"), max_length=30) number = models.CharField(_("Number"), unique=True, max_length=3) state = models.ForeignKey( 'State', verbose_name=_("State"), blank=True, null=True, on_delete=models.SET_NULL, ) objects = NumberManager() class Meta: verbose_name = _("Department") verbose_name_plural = _("Departments") ordering = ['number'] def __str__(self): return self.label def natural_key(self): return (self.number,) def history_compress(self): return self.number @classmethod def history_decompress(cls, full_value, create=False): if not full_value: return [] res = [] for value in full_value: try: res.append(cls.objects.get(number=value)) except cls.DoesNotExist: continue return res class Arrondissement(models.Model): name = models.CharField("Nom", max_length=30) department = models.ForeignKey(Department, verbose_name="Département") def __str__(self): return settings.JOINT.join((self.name, str(self.department))) class Canton(models.Model): name = models.CharField("Nom", max_length=30) arrondissement = models.ForeignKey(Arrondissement, verbose_name="Arrondissement") def __str__(self): return settings.JOINT.join( (self.name, str(self.arrondissement))) class TownManager(models.GeoManager): def get_by_natural_key(self, numero_insee, year): return self.get(numero_insee=numero_insee, year=year) class Town(Imported, models.Model): name = models.CharField(_("Name"), max_length=100) surface = models.IntegerField(_("Surface (m2)"), blank=True, null=True) center = models.PointField(_("Localisation"), srid=settings.SRID, blank=True, null=True) limit = models.MultiPolygonField(_("Limit"), blank=True, null=True) numero_insee = models.CharField("Code commune (numéro INSEE)", max_length=120) departement = models.ForeignKey( Department, verbose_name=_("Department"), on_delete=models.SET_NULL, null=True, blank=True) year = models.IntegerField( _("Year of creation"), null=True, blank=True, help_text=_("Filling this field is relevant to distinguish old towns " "from new towns.")) children = models.ManyToManyField( 'Town', verbose_name=_("Town children"), blank=True, related_name='parents') cached_label = models.CharField(_("Cached name"), max_length=500, null=True, blank=True, db_index=True) objects = TownManager() class Meta: verbose_name = _("Town") verbose_name_plural = _("Towns") if settings.COUNTRY == 'fr': ordering = ['numero_insee'] unique_together = (('numero_insee', 'year'),) def natural_key(self): return (self.numero_insee, self.year) def history_compress(self): values = {'numero_insee': self.numero_insee, 'year': self.year or ""} return values def get_values(self, prefix='', no_values=False, filtr=None, **kwargs): values = {} if not filtr or prefix in filtr or "label" in filtr: if prefix: values[prefix] = str(self) else: values['label'] = str(self) if not filtr or prefix + "name" in filtr: values[prefix + "name"] = self.name if not filtr or prefix + "numero_insee" in filtr: values[prefix + "numero_insee"] = self.numero_insee return values @classmethod def history_decompress(cls, full_value, create=False): if not full_value: return [] res = [] for value in full_value: try: res.append( cls.objects.get(numero_insee=value['numero_insee'], year=value['year'] or None)) except cls.DoesNotExist: continue return res def __str__(self): return self.cached_label or "" @property def label_with_areas(self): label = [self.name] if self.numero_insee: label.append("({})".format(self.numero_insee)) for area in self.areas.all(): label.append(" - ") label.append(area.full_label) return " ".join(label) def generate_geo(self, force=False): force = self.generate_limit(force=force) self.generate_center(force=force) self.generate_area(force=force) def generate_limit(self, force=False): if not force and self.limit: return parents = None if not self.parents.count(): return for parent in self.parents.all(): if not parent.limit: return if not parents: parents = parent.limit else: parents = parents.union(parent.limit) # if union is a simple polygon make it a multi if 'MULTI' not in parents.wkt: parents = parents.wkt.replace('POLYGON', 'MULTIPOLYGON(') + ")" if not parents: return self.limit = parents self.save() return True def generate_center(self, force=False): if not force and (self.center or not self.limit): return self.center = self.limit.centroid if not self.center: return False self.save() return True def generate_area(self, force=False): if not force and (self.surface or not self.limit): return surface = self.limit.transform(settings.SURFACE_SRID, clone=True).area if surface > 214748364 or not surface: return False self.surface = surface self.save() return True def update_town_code(self): if not self.numero_insee or not self.children.count() or not self.year: return old_num = self.numero_insee[:] numero = old_num.split('-')[0] self.numero_insee = "{}-{}".format(numero, self.year) if self.numero_insee != old_num: return True def _generate_cached_label(self): cached_label = self.name if settings.COUNTRY == "fr" and self.numero_insee: dpt_len = 2 if self.numero_insee.startswith('97') or \ self.numero_insee.startswith('98') or \ self.numero_insee[0] not in ('0', '1', '2', '3', '4', '5', '6', '7', '8', '9'): dpt_len = 3 cached_label = "%s - %s" % (self.name, self.numero_insee[:dpt_len]) if self.year and self.children.count(): cached_label += " ({})".format(self.year) return cached_label def post_save_town(sender, **kwargs): cached_label_changed(sender, **kwargs) town = kwargs['instance'] town.generate_geo() if town.update_town_code(): town.save() post_save.connect(post_save_town, sender=Town) def town_child_changed(sender, **kwargs): town = kwargs['instance'] if town.update_town_code(): town.save() m2m_changed.connect(town_child_changed, sender=Town.children.through) class Area(HierarchicalType): towns = models.ManyToManyField(Town, verbose_name=_("Towns"), blank=True, related_name='areas') reference = models.CharField(_("Reference"), max_length=200, blank=True, null=True) parent = models.ForeignKey( 'self', blank=True, null=True, verbose_name=_("Parent"), help_text=_("Only four level of parent are managed."), related_name='children', on_delete=models.SET_NULL ) class Meta: verbose_name = _("Area") verbose_name_plural = _("Areas") ordering = ('label',) def __str__(self): if not self.reference: return self.label return "{} ({})".format(self.label, self.reference) @property def full_label(self): label = [str(self)] if self.parent and self.parent.pk != self.pk: label.append(self.parent.full_label) return " / ".join(label) class Address(BaseHistorizedItem): FIELDS = ( "address", "address_complement", "postal_code", "town", "precise_town", "country", "alt_address", "alt_address_complement", "alt_postal_code", "alt_town", "alt_country", "phone", "phone_desc", "phone2", "phone_desc2", "phone3", "phone_desc3", "raw_phone", "mobile_phone", "email", "alt_address_is_prefered" ) address = models.TextField(_("Address"), null=True, blank=True) address_complement = models.TextField(_("Address complement"), null=True, blank=True) postal_code = models.CharField(_("Postal code"), max_length=10, null=True, blank=True) town = models.CharField(_("Town (freeform)"), max_length=150, null=True, blank=True) precise_town = models.ForeignKey( Town, verbose_name=_("Town (precise)"), null=True, blank=True) country = models.CharField(_("Country"), max_length=30, null=True, blank=True) alt_address = models.TextField(_("Other address: address"), null=True, blank=True) alt_address_complement = models.TextField( _("Other address: address complement"), null=True, blank=True) alt_postal_code = models.CharField(_("Other address: postal code"), max_length=10, null=True, blank=True) alt_town = models.CharField(_("Other address: town"), max_length=70, null=True, blank=True) alt_country = models.CharField(_("Other address: country"), max_length=30, null=True, blank=True) phone = models.CharField(_("Phone"), max_length=18, null=True, blank=True) phone_desc = models.CharField(_("Phone description"), max_length=300, null=True, blank=True) phone2 = models.CharField(_("Phone description 2"), max_length=18, null=True, blank=True) phone_desc2 = models.CharField(_("Phone description 2"), max_length=300, null=True, blank=True) phone3 = models.CharField(_("Phone 3"), max_length=18, null=True, blank=True) phone_desc3 = models.CharField(_("Phone description 3"), max_length=300, null=True, blank=True) raw_phone = models.TextField(_("Raw phone"), blank=True, null=True) mobile_phone = models.CharField(_("Mobile phone"), max_length=18, null=True, blank=True) email = models.EmailField( _("Email"), max_length=300, blank=True, null=True) alt_address_is_prefered = models.BooleanField( _("Alternative address is prefered"), default=False) history = HistoricalRecords() SUB_ADDRESSES = [] class Meta: abstract = True def get_short_html_items(self): items = [] if self.address: items.append( """{}""".format(self.address)) if self.address_complement: items.append( """{}""".format( self.address_complement)) if self.postal_code: items.append( """{}""".format( self.postal_code)) if self.precise_town: items.append( """{}""".format( self.precise_town.name)) elif self.town: items.append( """{}""".format( self.town)) if self.country: items.append( """{}""".format( self.country)) return items def get_short_html_detail(self): html = """
""" items = self.get_short_html_items() if not items: items = [ "{}".format( _("No associated address") ) ] html += "".join(items) html += """
""" return html def get_town_centroid(self): if self.precise_town: return self.precise_town.center, self._meta.verbose_name for sub_address in self.SUB_ADDRESSES: sub_item = getattr(self, sub_address) if sub_item and sub_item.precise_town: return sub_item.precise_town.center, sub_item._meta.verbose_name def get_town_polygons(self): if self.precise_town: return self.precise_town.limit, self._meta.verbose_name for sub_address in self.SUB_ADDRESSES: sub_item = getattr(self, sub_address) if sub_item and sub_item.precise_town: return sub_item.precise_town.limit, sub_item._meta.verbose_name def get_attribute(self, attr): if self.town or self.precise_town: return getattr(self, attr) for sub_address in self.SUB_ADDRESSES: sub_item = getattr(self, sub_address) if not sub_item: continue if sub_item.town or sub_item.precise_town: return getattr(sub_item, attr) return getattr(self, attr) def get_address(self): return self.get_attribute("address") def get_address_complement(self): return self.get_attribute("address_complement") def get_postal_code(self): return self.get_attribute("postal_code") def get_town(self): return self.get_attribute("town") def get_precise_town(self): return self.get_attribute("precise_town") def get_country(self): return self.get_attribute("country") def simple_lbl(self): return str(self) def full_address(self): lbl = self.simple_lbl() if lbl: lbl += "\n" lbl += self.address_lbl() return lbl def address_lbl(self): lbl = u'' prefix = '' if self.alt_address_is_prefered: prefix = 'alt_' if getattr(self, prefix + 'address'): lbl += getattr(self, prefix + 'address') if getattr(self, prefix + 'address_complement'): if lbl: lbl += "\n" lbl += getattr(self, prefix + 'address_complement') postal_code = getattr(self, prefix + 'postal_code') town = getattr(self, prefix + 'town') if postal_code or town: if lbl: lbl += "\n" lbl += "{}{}{}".format( postal_code or '', " " if postal_code and town else '', town or '') if self.phone: if lbl: lbl += "\n" lbl += "{} {}".format(str(_("Tel: ")), self.phone) if self.mobile_phone: if lbl: lbl += "\n" lbl += "{} {}".format(str(_("Mobile: ")), self.mobile_phone) if self.email: if lbl: lbl += "\n" lbl += "{} {}".format(str(_("Email: ")), self.email) return lbl class Merge(models.Model): merge_key = models.TextField(_("Merge key"), blank=True, null=True) merge_candidate = models.ManyToManyField("self", blank=True) merge_exclusion = models.ManyToManyField("self", blank=True) archived = models.NullBooleanField(default=False, blank=True, null=True) # 1 for one word similarity, 2 for two word similarity, etc. MERGE_CLEMENCY = None EMPTY_MERGE_KEY = '--' class Meta: abstract = True def generate_merge_key(self): if self.archived: return self.merge_key = slugify(self.name if self.name else '') if not self.merge_key: self.merge_key = self.EMPTY_MERGE_KEY self.merge_key = self.merge_key def generate_merge_candidate(self): if self.archived: return if not self.merge_key: self.generate_merge_key() self.save(merge_key_generated=True) if not self.pk or self.merge_key == self.EMPTY_MERGE_KEY: return q = self.__class__.objects \ .exclude(pk=self.pk) \ .exclude(merge_exclusion=self) \ .exclude(merge_candidate=self) \ .exclude(archived=True) if not self.MERGE_CLEMENCY: q = q.filter(merge_key=self.merge_key) else: subkeys_front = "-".join( self.merge_key.split('-')[:self.MERGE_CLEMENCY]) subkeys_back = "-".join( self.merge_key.split('-')[-self.MERGE_CLEMENCY:]) q = q.filter(Q(merge_key__istartswith=subkeys_front) | Q(merge_key__iendswith=subkeys_back)) for item in q.all(): self.merge_candidate.add(item) def save(self, *args, **kwargs): # prevent circular save merge_key_generated = False if 'merge_key_generated' in kwargs: merge_key_generated = kwargs.pop('merge_key_generated') self.generate_merge_key() item = super(Merge, self).save(*args, **kwargs) if not merge_key_generated: self.generate_merge_candidate() return item def archive(self): self.archived = True self.save() for m in self.merge_candidate.all(): m.delete() for m in self.merge_exclusion.all(): m.delete() def merge(self, item): merge_model_objects(self, item) self.generate_merge_candidate() class OrganizationType(GeneralType): class Meta: verbose_name = _("Organization type") verbose_name_plural = _("Organization types") ordering = ('label',) def get_orga_planning_service_label(): if apps.ready: lbl = get_general_type_label(OrganizationType, "planning_service") if lbl: return lbl return _("Error: planning_service type is missing") def get_orga_general_contractor_label(): if apps.ready: lbl = get_general_type_label(OrganizationType, "general_contractor") if lbl: return lbl return _("Error: general_contractor type is missing") post_save.connect(post_save_cache, sender=OrganizationType) post_delete.connect(post_save_cache, sender=OrganizationType) organization_type_pk_lazy = lazy(OrganizationType.get_or_create_pk, str) organization_type_pks_lazy = lazy(OrganizationType.get_or_create_pks, str) class Organization(Address, Merge, OwnPerms, ValueGetter, MainItem): TABLE_COLS = ('name', 'organization_type', 'town') SLUG = "organization" SHOW_URL = 'show-organization' DELETE_URL = 'delete-organization' # search parameters EXTRA_REQUEST_KEYS = {} BASE_SEARCH_VECTORS = [SearchVectorConfig('name'), SearchVectorConfig('town')] # alternative names of fields for searches ALT_NAMES = { 'name': SearchAltName( pgettext_lazy("key for text search", "name"), 'name__iexact' ), 'organization_type': SearchAltName( pgettext_lazy("key for text search", "type"), 'organization_type__label__iexact' ), } QA_EDIT = QuickAction( url="organization-qa-bulk-update", icon_class="fa fa-pencil", text=_(u"Bulk update"), target="many", rights=['change_organization']) QUICK_ACTIONS = [ QA_EDIT ] objects = UUIDModelManager() # fields uuid = models.UUIDField(default=uuid.uuid4) name = models.CharField(_("Name"), max_length=500) organization_type = models.ForeignKey(OrganizationType, verbose_name=_("Type")) url = models.URLField(verbose_name=_("Web address"), blank=True, null=True) cached_label = models.TextField(_("Cached name"), null=True, blank=True, db_index=True) history = HistoricalRecords() DOWN_MODEL_UPDATE = ['members'] class Meta: verbose_name = _("Organization") verbose_name_plural = _("Organizations") permissions = ( ("view_organization", "Can view all Organizations"), ("view_own_organization", "Can view own Organization"), ("add_own_organization", "Can add own Organization"), ("change_own_organization", "Can change own Organization"), ("delete_own_organization", "Can delete own Organization"), ) indexes = [ GinIndex(fields=['data']), ] def simple_lbl(self): if self.name: return self.name return "{} - {}".format(self.organization_type, self.town or "") def natural_key(self): return (self.uuid,) def __str__(self): return self.cached_label or "" def _generate_cached_label(self): if self.name: return self.name attrs = ["organization_type", "address", "town"] items = [str(getattr(self, attr)) for attr in attrs if getattr(self, attr)] if not items: items = [str(_("unknown organization"))] return " - ".join(items) def generate_merge_key(self): self.merge_key = slugify(self.name if self.name else '') if not self.merge_key: self.merge_key = self.EMPTY_MERGE_KEY if self.town: self.merge_key += "-" + slugify(self.town or '') if self.address: self.merge_key += "-" + slugify(self.address or '') @property def associated_filename(self): values = [str(getattr(self, attr)) for attr in ('organization_type', 'name') if getattr(self, attr)] return slugify("-".join(values)) post_save.connect(cached_label_changed, sender=Organization) class PersonType(GeneralType): class Meta: verbose_name = _("Person type") verbose_name_plural = _("Person types") ordering = ('label',) post_save.connect(post_save_cache, sender=PersonType) post_delete.connect(post_save_cache, sender=PersonType) person_type_pk_lazy = lazy(PersonType.get_or_create_pk, str) person_type_pks_lazy = lazy(PersonType.get_or_create_pks, str) def get_sra_agent_label(): if apps.ready: lbl = get_general_type_label(PersonType, "sra_agent") if lbl: return lbl return _("Error: sra_agent type is missing") def get_general_contractor_label(): if apps.ready: lbl = get_general_type_label(PersonType, "general_contractor") if lbl: return lbl return _("Error: general_contractor type is missing") def get_responsible_planning_service_label(): if apps.ready: lbl = get_general_type_label(PersonType, "responsible_planning_service") if lbl: return lbl return _("Error: responsible_planning_service type is missing") class TitleType(GeneralType): class Meta: verbose_name = _("Title type") verbose_name_plural = _("Title types") ordering = ('label',) post_save.connect(post_save_cache, sender=TitleType) post_delete.connect(post_save_cache, sender=TitleType) class Person(Address, Merge, OwnPerms, ValueGetter, MainItem): SLUG = "person" _prefix = 'person_' TYPE = ( ('Mr', _(u'Mr')), ('Ms', _(u'Miss')), ('Mr and Miss', _(u'Mr and Mrs')), ('Md', _(u'Mrs')), ('Dr', _(u'Doctor')), ) TABLE_COLS = ('name', 'surname', 'raw_name', 'email', 'person_types_list', 'attached_to', 'town') TABLE_COLS_ACCOUNT = ('name', 'surname', 'raw_name', 'email', 'profiles_list', 'attached_to', 'town') SHOW_URL = 'show-person' MODIFY_URL = 'person_modify' DELETE_URL = 'person_delete' BASE_SEARCH_VECTORS = [ SearchVectorConfig('name'), SearchVectorConfig('surname'), SearchVectorConfig('raw_name'), SearchVectorConfig('town'), SearchVectorConfig('attached_to__name'), SearchVectorConfig('email')] # search parameters REVERSED_BOOL_FIELDS = ['ishtaruser__isnull'] EXTRA_REQUEST_KEYS = { 'ishtaruser__isnull': 'ishtaruser__isnull', 'attached_to': 'attached_to', } COL_LABELS = { 'attached_to': _("Organization") } # alternative names of fields for searches ALT_NAMES = { 'name': SearchAltName( pgettext_lazy("key for text search", "name"), 'name__iexact' ), 'surname': SearchAltName( pgettext_lazy("key for text search", "surname"), 'surname__iexact' ), 'email': SearchAltName( pgettext_lazy("key for text search", "email"), 'email__iexact' ), 'person_types': SearchAltName( pgettext_lazy("key for text search", "type"), 'person_types__label__iexact' ), 'attached_to': SearchAltName( pgettext_lazy("key for text search", "organization"), 'attached_to__cached_label__iexact' ), 'ishtaruser__isnull': SearchAltName( pgettext_lazy("key for text search", "has-account"), 'ishtaruser__isnull' ), } QA_EDIT = QuickAction( url="person-qa-bulk-update", icon_class="fa fa-pencil", text=_(u"Bulk update"), target="many", rights=['change_person']) QUICK_ACTIONS = [ QA_EDIT ] objects = UUIDModelManager() # fields uuid = models.UUIDField(default=uuid.uuid4) old_title = models.CharField(_("Title"), max_length=100, choices=TYPE, blank=True, null=True) title = models.ForeignKey(TitleType, verbose_name=_("Title"), on_delete=models.SET_NULL, blank=True, null=True) salutation = models.CharField(_("Salutation"), max_length=200, blank=True, null=True) surname = models.CharField(_("Surname"), max_length=50, blank=True, null=True) name = models.CharField(_("Name"), max_length=200, blank=True, null=True) raw_name = models.CharField(_("Raw name"), max_length=300, blank=True, null=True) contact_type = models.CharField(_("Contact type"), max_length=300, blank=True, null=True) comment = models.TextField(_("Comment"), blank=True, null=True) person_types = models.ManyToManyField(PersonType, verbose_name=_("Types")) attached_to = models.ForeignKey( 'Organization', related_name='members', on_delete=models.SET_NULL, verbose_name=_("Is attached to"), blank=True, null=True) cached_label = models.TextField(_("Cached name"), null=True, blank=True, db_index=True) history = HistoricalRecords() DOWN_MODEL_UPDATE = ["author"] class Meta: verbose_name = _("Person") verbose_name_plural = _("Persons") indexes = [ GinIndex(fields=['data']), ] permissions = ( ("view_person", "Can view all Persons"), ("view_own_person", "Can view own Person"), ("add_own_person", "Can add own Person"), ("change_own_person", "Can change own Person"), ("delete_own_person", "Can delete own Person"), ) def natural_key(self): return (self.uuid,) @property def full_title(self): return " ".join( [str(getattr(self, attr)) for attr in ['title', 'salutation'] if getattr(self, attr)]) @property def current_profile(self): q = self.profiles.filter(current=True) if q.count(): return q.all()[0] q = self.profiles nb = q.count() if not nb: return # arbitrary set the first one as the current profile = q.all()[0] profile.current = True profile.save() return profile def get_short_html_items(self): items = super(Person, self).get_short_html_items() if items or not self.attached_to: return items orga_address = self.attached_to.get_short_html_items() if not orga_address: return [] items.append( """{}""".format( self.attached_to.name)) items += orga_address return items def simple_lbl(self): values = [str(getattr(self, attr)) for attr in ('surname', 'name') if getattr(self, attr)] if not values and self.raw_name: values = [self.raw_name] return " ".join(values) def __str__(self): return self.cached_label or "" def _generate_cached_label(self): lbl = get_external_id('person_raw_name', self) if not lbl: return "-" if self.attached_to: attached_to = str(self.attached_to) lbl += " ({})".format(attached_to) return lbl def fancy_str(self): values = [""] values += [str(getattr(self, attr)) for attr in ('surname', 'name') if getattr(self, attr)] if not values and self.raw_name: values += [self.raw_name] values += [""] if self.attached_to: attached_to = str(self.attached_to) if values: values.append(u'-') values.append(attached_to) return " ".join(values) def get_values(self, prefix='', no_values=False, filtr=None, **kwargs): values = super(Person, self).get_values( prefix=prefix, no_values=no_values, filtr=filtr, **kwargs) if not self.attached_to: values.update( Person.get_empty_values(prefix=prefix + 'attached_to_')) return values person_types_list_lbl = _("Types") @property def person_types_list(self): return ", ".join([str(pt) for pt in self.person_types.all()]) profiles_list_lbl = _("Profiles") @property def profiles_list(self): return ", ".join([str(p) for p in self.profiles.all()]) def generate_merge_key(self): if self.name and self.name.strip(): self.merge_key = slugify(self.name.strip()) + \ ((u'-' + slugify(self.surname.strip())) if self.surname else u'') elif self.raw_name and self.raw_name.strip(): self.merge_key = slugify(self.raw_name.strip()) elif self.attached_to: self.merge_key = self.attached_to.merge_key else: self.merge_key = self.EMPTY_MERGE_KEY if self.merge_key != self.EMPTY_MERGE_KEY and self.attached_to: self.merge_key += "-" + self.attached_to.merge_key def is_natural(self): return not self.attached_to def has_right(self, right_name, session=None, obj=None): if '.' in right_name: right_name = right_name.split('.')[-1] res, cache_key = "", "" if session: cache_key = 'session-{}-{}'.format(session.session_key, right_name) res = cache.get(cache_key) if res in (True, False): return res # list all cache key in order to clean them on profile change cache_key_list = 'sessionlist-{}'.format(session.session_key) key_list = cache.get(cache_key_list, []) key_list.append(cache_key) cache.set(cache_key_list, key_list, settings.CACHE_TIMEOUT) if type(right_name) in (list, tuple): res = bool(self.profiles.filter( current=True, profile_type__txt_idx__in=right_name).count()) or \ bool(self.profiles.filter( current=True, profile_type__groups__permissions__codename__in=right_name ).count()) or \ bool(self.ishtaruser.user_ptr.groups.filter( permissions__codename__in=right_name ).count()) or \ bool(self.ishtaruser.user_ptr.user_permissions.filter( codename__in=right_name).count()) else: res = bool( self.profiles.filter( current=True, profile_type__txt_idx=right_name).count()) or \ bool(self.profiles.filter( current=True, profile_type__groups__permissions__codename=right_name ).count()) or \ bool(self.ishtaruser.user_ptr.groups.filter( permissions__codename__in=[right_name] ).count()) or \ bool(self.ishtaruser.user_ptr.user_permissions.filter( codename__in=[right_name]).count()) if session: cache.set(cache_key, res, settings.CACHE_TIMEOUT) return res def full_label(self): values = [] if self.title: values = [self.title.label] values += [str(getattr(self, attr)) for attr in ('salutation', 'surname', 'name') if getattr(self, attr)] if not values and self.raw_name: values = [self.raw_name] if self.attached_to: values.append("- " + str(self.attached_to)) return " ".join(values) @property def associated_filename(self): values = [str(getattr(self, attr)) for attr in ('surname', 'name', 'attached_to') if getattr(self, attr)] return slugify("-".join(values)) def docs_q(self): return Document.objects.filter(authors__person=self) def operation_docs_q(self): return Document.objects.filter( authors__person=self, operations__pk__isnull=False) def contextrecord_docs_q(self): return Document.objects.filter( authors__person=self, context_records__pk__isnull=False) def find_docs_q(self): return Document.objects.filter( authors__person=self, finds__pk__isnull=False) def save(self, *args, **kwargs): super(Person, self).save(*args, **kwargs) raw_name = get_external_id('person_raw_name', self) if raw_name and self.raw_name != raw_name: self.raw_name = raw_name self.save() if hasattr(self, 'responsible_town_planning_service'): for fle in self.responsible_town_planning_service.all(): fle.save() # force update of raw_town_planning_service if hasattr(self, 'general_contractor'): for fle in self.general_contractor.all(): fle.save() # force update of raw_general_contractor @classmethod def get_query_owns(cls, ishtaruser): return \ Q(operation_scientist_responsability__collaborators__ishtaruser =ishtaruser) | \ Q(operation_scientist_responsability__scientist__ishtaruser =ishtaruser) | \ Q(operation_collaborator__collaborators__ishtaruser =ishtaruser) | \ Q(operation_collaborator__scientist__ishtaruser=ishtaruser) post_save.connect(cached_label_changed, sender=Person) class ProfileType(GeneralType): groups = models.ManyToManyField(Group, verbose_name=_("Groups"), blank=True) class Meta: verbose_name = _("Profile type") verbose_name_plural = _("Profile types") ordering = ('label',) post_save.connect(post_save_cache, sender=ProfileType) post_delete.connect(post_save_cache, sender=ProfileType) class ProfileTypeSummary(ProfileType): class Meta: proxy = True verbose_name = _("Profile type summary") verbose_name_plural = _("Profile types summary") class UserProfile(models.Model): name = models.CharField(_("Name"), blank=True, default="", max_length=100) profile_type = models.ForeignKey( ProfileType, verbose_name=_("Profile type")) areas = models.ManyToManyField("Area", verbose_name=_("Areas"), blank=True, related_name='profiles') current = models.BooleanField(_("Current profile"), default=False) show_field_number = models.BooleanField( _("Show field number"), default=False) auto_pin = models.BooleanField(_("Automatically pin"), default=False) display_pin_menu = models.BooleanField(_("Display pin menu"), default=False) person = models.ForeignKey( Person, verbose_name=_("Person"), related_name='profiles') class Meta: verbose_name = _("User profile") verbose_name_plural = _("User profiles") unique_together = (('name', 'profile_type', 'person'),) def __str__(self): lbl = self.name or str(self.profile_type) if not self.areas.count(): return lbl return "{} ({})".format(lbl, ", ".join( [str(area) for area in self.areas.all()])) @property def query_towns(self): return Town.objects.filter( Q(areas__profiles=self) | Q(areas__parent__profiles=self) | Q(areas__parent__parent__profiles=self) | Q(areas__parent__parent__parent__profiles=self) | Q(areas__parent__parent__parent__parent__profiles=self) ) @property def area_labels(self): return ", ".join([str(area) for area in self.areas.all()]) def duplicate(self, **kwargs): areas = [area for area in self.areas.all()] new_item = self new_item.pk = None name = self.name for key in kwargs: if key == 'name': name = kwargs[key] setattr(new_item, key, kwargs[key]) while UserProfile.objects.filter( name=name, profile_type=self.profile_type, person=self.person).count(): name += str(_(" - duplicate")) new_item.name = name new_item.save() for area in areas: new_item.areas.add(area) return new_item def save(self, force_insert=False, force_update=False, using=None, update_fields=None): super(UserProfile, self).save( force_insert=force_insert, force_update=force_update, using=using, update_fields=update_fields) # only one current profile per user if not self.current: return q = UserProfile.objects.filter( person=self.person, current=True).exclude(pk=self.pk) if not q.count(): return for p in q.all(): p.current = False p.save() def post_save_userprofile(sender, **kwargs): if not kwargs.get('instance'): return instance = kwargs.get('instance') try: instance.person.ishtaruser.show_field_number(update=True) except IshtarUser.DoesNotExist: return post_save.connect(post_save_userprofile, sender=UserProfile) class IshtarUser(FullSearch): SLUG = "ishtaruser" TABLE_COLS = ('username', 'person__name', 'person__surname', 'person__email', 'person__person_types_list', 'person__attached_to__name') BASE_SEARCH_VECTORS = [ SearchVectorConfig('user_ptr__username'), SearchVectorConfig('person__name'), SearchVectorConfig('person__surname'), SearchVectorConfig('person__email'), SearchVectorConfig('person__town'), SearchVectorConfig('person__attached_to__name')] CACHED_LABELS = [] # needed to force search vector update # search parameters EXTRA_REQUEST_KEYS = { 'person__person_types_list': 'person__person_types__label' } COL_LABELS = { 'person__attached_to__name': _("Organization"), 'username': _("Username") } # alternative names of fields for searches ALT_NAMES = { 'username': SearchAltName( pgettext_lazy("key for text search", "username"), 'user_ptr__username__iexact' ), 'name': SearchAltName( pgettext_lazy("key for text search", "name"), 'person__name__iexact' ), 'surname': SearchAltName( pgettext_lazy("key for text search", "surname"), 'person__surname__iexact' ), 'email': SearchAltName( pgettext_lazy("key for text search", "email"), 'person__email__iexact' ), 'person_types': SearchAltName( pgettext_lazy("key for text search", "type"), 'person__person_types__label__iexact' ), 'attached_to': SearchAltName( pgettext_lazy("key for text search", "organization"), 'person__attached_to__cached_label__iexact' ), } # fields user_ptr = models.OneToOneField(User, primary_key=True, related_name='ishtaruser') person = models.OneToOneField(Person, verbose_name=_("Person"), related_name='ishtaruser') advanced_shortcut_menu = models.BooleanField( _("Advanced shortcut menu"), default=False) class Meta: verbose_name = _("Ishtar user") verbose_name_plural = _("Ishtar users") def __str__(self): return str(self.person) def show_field_number(self, update=False): cache_key, value = get_cache(self.__class__, ['show_field_number']) if not update and value is not None: return value value = False if self.current_profile: value = self.current_profile.show_field_number cache.set(cache_key, value, settings.CACHE_TIMEOUT) return value @property def current_profile_name(self): q = UserProfile.objects.filter(current=True, person__ishtaruser=self) if q.count(): vals = q.values('profile_type__label', 'name').all()[0] return vals['name'] or vals['profile_type__label'] profile = self.person.current_profile if not profile: return "" return str(profile) @property def current_profile(self): return self.person.current_profile @classmethod def set_superuser(cls, user): q = cls.objects.filter(user_ptr=user) if not q.count(): return ishtaruser = q.all()[0] person = ishtaruser.person admin, created = ProfileType.objects.get_or_create( txt_idx='administrator') if user.is_superuser: if UserProfile.objects.filter( profile_type=admin, person=person).count(): return UserProfile.objects.get_or_create( profile_type=admin, person=person, defaults={'current': True}) @classmethod def create_from_user(cls, user): default = user.username surname = user.first_name or default name = user.last_name or default email = user.email person = Person.objects.create(surname=surname, name=name, email=email, history_modifier=user) isht_user = cls.objects.create(user_ptr=user, person=person) return isht_user def has_right(self, right_name, session=None): return self.person.has_right(right_name, session=session) def has_perm(self, perm, model=None, session=None, obj=None): return self.person.has_right(perm, session=session, obj=None) def full_label(self): return self.person.full_label() post_save.connect(cached_label_changed, sender=IshtarUser) class Basket(FullSearch, OwnPerms, ValueGetter, TemplateItem): """ Abstract class for a basket Subclass must be defined with an "items" ManyToManyField """ IS_BASKET = True uuid = models.UUIDField(default=uuid.uuid4) label = models.CharField(_("Label"), max_length=1000) comment = models.TextField(_("Comment"), blank=True, null=True) slug = models.SlugField(_("Slug"), blank=True, null=True) public = models.BooleanField(_("Public"), default=False) user = models.ForeignKey( IshtarUser, blank=True, null=True, related_name='%(class)ss', on_delete=models.SET_NULL, verbose_name=_("Owner")) available = models.BooleanField(_("Available"), default=True) shared_with = models.ManyToManyField( IshtarUser, verbose_name=_("Shared (read) with"), blank=True, related_name='shared_%(class)ss' ) shared_write_with = models.ManyToManyField( IshtarUser, verbose_name=_("Shared (read/edit) with"), blank=True, related_name='shared_write_%(class)ss' ) objects = UUIDModelManager() TABLE_COLS = ['label', 'user'] BASE_SEARCH_VECTORS = [ SearchVectorConfig('label'), SearchVectorConfig('comment', 'local'), ] PARENT_SEARCH_VECTORS = ['user'] # M2M_SEARCH_VECTORS = [SearchVectorConfig('items')] CACHED_LABELS = [] # needed to force search vector update class Meta: abstract = True ordering = ('label', ) unique_together = (('label', 'user'),) def __str__(self): return self.label def natural_key(self): return (self.uuid, ) @classmethod def BASE_REQUEST(cls, request): if not request.user or not getattr(request.user, 'ishtaruser', None): return Q(pk=None) ishtaruser = request.user.ishtaruser return Q(user=ishtaruser) | Q(shared_with=ishtaruser) | Q( shared_write_with=ishtaruser) @property def cached_label(self): return str(self) @property def full_label(self): return "{} - {} ({})".format(self.label, self.user, self.items.count()) @classmethod def get_short_menu_class(cls, pk): return 'basket' @property def associated_filename(self): return "{}-{}".format(datetime.date.today().strftime( "%Y-%m-%d"), slugify(self.label)) @classmethod def get_query_owns(cls, ishtaruser): return Q(user=ishtaruser) | Q(shared_with=ishtaruser) | Q( shared_write_with=ishtaruser) @classmethod def get_write_query_owns(cls, ishtaruser): return Q(user=ishtaruser) | Q( shared_write_with=ishtaruser) def get_values(self, prefix='', no_values=False, filtr=None, **kwargs): values = super(Basket, self).get_values( prefix=prefix, no_values=no_values, filtr=filtr, **kwargs) if not filtr or "items" in filtr: values["items"] = [item.get_values() for item in self.items.all()] return values def duplicate(self, label=None, ishtaruser=None): """ Duplicate the basket. Items in basket are copied but not shared users :param label: if provided use the name :param ishtaruser: if provided an alternate user is used :return: the new basket """ items = [] through = self.items.through basket_pk = "{}_id".format(self.SLUG) item_pk = "{}_id".format(self.items.model.SLUG) q = through.objects.filter(**{basket_pk: self.pk}) for r in q.values("pk", item_pk).order_by("pk").all(): items.append(r[item_pk]) new_item = self new_item.pk = None if ishtaruser: new_item.user = ishtaruser if not label: label = new_item.label while self.__class__.objects.filter( label=label, user=new_item.user).count(): label += str(_(" - duplicate")) new_item.label = label new_item.save() for item in items: through.objects.create(**{basket_pk: new_item.pk, item_pk: item}) return new_item class AuthorType(GeneralType): order = models.IntegerField(_("Order"), default=1) class Meta: verbose_name = _("Author type") verbose_name_plural = _("Author types") ordering = ['order', 'label'] post_save.connect(post_save_cache, sender=AuthorType) post_delete.connect(post_save_cache, sender=AuthorType) class Author(FullSearch): SLUG = "author" PARENT_SEARCH_VECTORS = ['person'] uuid = models.UUIDField(default=uuid.uuid4) person = models.ForeignKey(Person, verbose_name=_("Person"), related_name='author') author_type = models.ForeignKey(AuthorType, verbose_name=_("Author type")) cached_label = models.TextField(_("Cached name"), null=True, blank=True, db_index=True) objects = UUIDModelManager() class Meta: verbose_name = _("Author") verbose_name_plural = _("Authors") ordering = ('author_type__order', 'person__name') permissions = ( ("view_author", "Can view all Authors"), ("view_own_author", "Can view own Author"), ("add_own_author", "Can add own Author"), ("change_own_author", "Can change own Author"), ("delete_own_author", "Can delete own Author"), ) def __str__(self): return self.cached_label or "" def natural_key(self): return self.uuid, def _generate_cached_label(self): return str(self.person) + settings.JOINT + \ str(self.author_type) def fancy_str(self): return self.person.fancy_str() + settings.JOINT + \ str(self.author_type) def related_sources(self): return list(self.treatmentsource_related.all()) + \ list(self.operationsource_related.all()) + \ list(self.findsource_related.all()) + \ list(self.contextrecordsource_related.all()) def public_representation(self): return { "type": str(self.author_type), "person": str(self.person) } def merge(self, item, keep_old=False): merge_model_objects(self, item, keep_old=keep_old) def author_post_save(sender, **kwargs): if not kwargs.get('instance'): return cached_label_changed(sender, **kwargs) instance = kwargs.get('instance') q = Author.objects.filter(person=instance.person, author_type=instance.author_type) if q.count() <= 1: return authors = list(q.all()) for author in authors[1:]: authors[0].merge(author) post_save.connect(author_post_save, sender=Author) class SourceType(HierarchicalType): class Meta: verbose_name = _("Source type") verbose_name_plural = _("Source types") ordering = ['label'] post_save.connect(post_save_cache, sender=SourceType) post_delete.connect(post_save_cache, sender=SourceType) class SupportType(GeneralType): class Meta: verbose_name = _("Support type") verbose_name_plural = _("Support types") post_save.connect(post_save_cache, sender=SupportType) post_delete.connect(post_save_cache, sender=SupportType) class Format(GeneralType): class Meta: verbose_name = _("Format type") verbose_name_plural = _("Format types") ordering = ['label'] post_save.connect(post_save_cache, sender=Format) post_delete.connect(post_save_cache, sender=Format) class LicenseType(GeneralType): url = models.URLField(_("URL"), blank=True, null=True) class Meta: verbose_name = _("License type") verbose_name_plural = _("License types") ordering = ('label',) post_save.connect(post_save_cache, sender=LicenseType) post_delete.connect(post_save_cache, sender=LicenseType) class Document(BaseHistorizedItem, QRCodeItem, OwnPerms, ImageModel, ValueGetter, MainItem): APP = "ishtar-common" MODEL = "document" EXTERNAL_ID_KEY = 'document_external_id' DELETE_URL = 'delete-document' # order is important: put the image in the first match found # other will be symbolic links RELATED_MODELS = [ 'treatment_files', 'treatments', 'finds', 'context_records', 'operations', 'sites', 'warehouses', 'containers', 'files' ] # same fields but in order for forms RELATED_MODELS_ALT = [ 'finds', 'context_records', 'operations', 'sites', 'files', 'warehouses', 'containers', 'treatments', 'treatment_files', ] SLUG = 'document' LINK_SPLIT = "<||>" GET_VALUES_EXCLUDE_FIELDS = ValueGetter.GET_VALUES_EXCLUDE_FIELDS + [ "warehouses", "operations", "treatments", "files", "treatment_files", "id", "associated_links", "source_type_id", "history_creator_id", "containers", "sites", "main_image_warehouses", "main_image_operations", "main_image_treatments", "main_image_files", "main_image_treatment_files", "main_image_id", "main_image_associated_links", "main_image_source_type_id", "main_image_history_creator_id", "main_image_containers", "main_image_sites", ] TABLE_COLS = ['title', 'source_type', 'cache_related_label', 'authors__cached_label', 'associated_url'] COL_LINK = ['associated_url'] BASE_SEARCH_VECTORS = [ SearchVectorConfig("title"), SearchVectorConfig("source_type__label"), SearchVectorConfig("external_id"), SearchVectorConfig("reference"), SearchVectorConfig("description", "local"), SearchVectorConfig("comment", "local"), SearchVectorConfig("additional_information", "local"), ] PARENT_SEARCH_VECTORS = ['authors', ] BOOL_FIELDS = ['duplicate'] COL_LABELS = {"authors__cached_label": _("Authors")} CACHED_LABELS = ['cache_related_label'] EXTRA_REQUEST_KEYS = { "operations": "operations__pk", "context_records": "context_records__pk", "context_records__operation": "context_records__operation__pk", "finds": "finds__pk", "finds__base_finds__context_record": "finds__base_finds__context_record__pk", "finds__base_finds__context_record__operation": "finds__base_finds__context_record__operation__pk", 'authors__cached_label': 'authors__cached_label', 'authors__person__pk': 'authors__person__pk', } # alternative names of fields for searches ALT_NAMES = { 'authors': SearchAltName( pgettext_lazy("key for text search", "author"), 'authors__cached_label__iexact' ), 'title': SearchAltName( pgettext_lazy("key for text search", "title"), 'title__iexact' ), 'source_type': SearchAltName( pgettext_lazy("key for text search", "type"), 'source_type__label__iexact' ), 'reference': SearchAltName( pgettext_lazy("key for text search", "reference"), 'reference__iexact' ), 'internal_reference': SearchAltName( pgettext_lazy("key for text search", "internal-reference"), 'internal_reference__iexact' ), 'description': SearchAltName( pgettext_lazy("key for text search", "description"), 'description__iexact' ), 'comment': SearchAltName( pgettext_lazy("key for text search", "comment"), 'comment__iexact' ), 'additional_information': SearchAltName( pgettext_lazy("key for text search", "additional-information"), 'additional_information__iexact' ), 'duplicate': SearchAltName( pgettext_lazy("key for text search", "has-duplicate"), 'duplicate' ), 'operation': SearchAltName( pgettext_lazy("key for text search", "operation"), 'operations__cached_label__iexact' ), 'context_record': SearchAltName( pgettext_lazy("key for text search", "context-record"), 'context_records__cached_label__iexact' ), 'find_basket': SearchAltName( pgettext_lazy("key for text search", "basket-finds"), 'finds__basket__label__iexact' ), 'find': SearchAltName( pgettext_lazy("key for text search", "find"), 'finds__cached_label__iexact' ), 'find__denomination': SearchAltName( pgettext_lazy("key for text search", "find-denomination"), 'finds__denomination__iexact' ), 'file': SearchAltName( pgettext_lazy("key for text search", "file"), 'files__cached_label__iexact' ), 'container': SearchAltName( pgettext_lazy("key for text search", "container"), 'containers__cached_label__iexact' ), 'site': SearchAltName( pgettext_lazy("key for text search", "site"), 'sites__cached_label__iexact' ), 'warehouse': SearchAltName( pgettext_lazy("key for text search", "warehouse"), 'warehouses__name__iexact' ), 'image__isnull': SearchAltName( pgettext_lazy("key for text search", "has-image"), 'image__isnull'), } ALT_NAMES.update(BaseHistorizedItem.ALT_NAMES) # search parameters REVERSED_BOOL_FIELDS = ['image__isnull'] objects = ExternalIdManager() RELATIVE_SESSION_NAMES = [ ('find', 'finds__pk'), ('contextrecord', 'context_records__pk'), ('operation', 'operations__pk'), ('site', 'sites__pk'), ('file', 'files__pk'), ('warehouse', 'warehouses__pk'), ('treatment', 'treatments__pk'), ('treatmentfile', 'treatment_files__pk'), ] UP_MODEL_QUERY = { "operation": (pgettext_lazy("key for text search", "operation"), 'cached_label'), "contextrecord": (pgettext_lazy("key for text search", "context-record"), 'cached_label'), "file": (pgettext_lazy("key for text search", "file"), 'cached_label'), "find": (pgettext_lazy("key for text search", "find"), 'cached_label'), "site": (pgettext_lazy("key for text search", "site"), 'cached_label'), "warehouse": (pgettext_lazy("key for text search", "warehouse"), 'cached_label'), "treatment": (pgettext_lazy("key for text search", "treatment"), 'cached_label'), "treatmentfile": (pgettext_lazy("key for text search", "treatment-file"), 'cached_label'), } QA_EDIT = QuickAction( url="document-qa-bulk-update", icon_class="fa fa-pencil", text=_(u"Bulk update"), target="many", rights=['change_document', 'change_own_document']) QUICK_ACTIONS = [ QA_EDIT, QuickAction( url="document-qa-duplicate", icon_class="fa fa-clone", text=_("Duplicate"), target="one", rights=['change_document', 'change_own_document']), ] SERIALIZATION_FILES = ["image", "thumbnail", "associated_file"] title = models.TextField(_("Title"), blank=True, default='') associated_file = models.FileField( upload_to=get_image_path, blank=True, null=True, max_length=255, help_text=max_size_help()) index = models.IntegerField(verbose_name=_("Index"), blank=True, null=True) external_id = models.TextField(_("External ID"), null=True, blank=True) reference = models.TextField(_("Ref."), null=True, blank=True) internal_reference = models.TextField(_("Internal ref."), null=True, blank=True) source_type = models.ForeignKey(SourceType, verbose_name=_("Type"), on_delete=models.SET_NULL, null=True, blank=True) licenses = models.ManyToManyField(LicenseType, verbose_name=_("License"), blank=True) support_type = models.ForeignKey(SupportType, verbose_name=_("Support"), on_delete=models.SET_NULL, blank=True, null=True, ) format_type = models.ForeignKey(Format, verbose_name=_("Format"), on_delete=models.SET_NULL, blank=True, null=True) scale = models.CharField(_("Scale"), max_length=30, null=True, blank=True) authors = models.ManyToManyField(Author, verbose_name=_("Authors"), related_name="documents") authors_raw = models.CharField(verbose_name=_("Authors (raw)"), blank=True, null=True, max_length=250) associated_url = models.URLField( blank=True, null=True, max_length=1000, verbose_name=_("Numerical ressource (web address)")) receipt_date = models.DateField(blank=True, null=True, verbose_name=_("Receipt date")) creation_date = models.DateField(blank=True, null=True, verbose_name=_("Creation date")) receipt_date_in_documentation = models.DateField( blank=True, null=True, verbose_name=_("Receipt date in documentation")) item_number = models.IntegerField(_("Number of items"), default=1) description = models.TextField(_("Description"), blank=True, null=True) comment = models.TextField(_("Comment"), blank=True, null=True) additional_information = models.TextField(_("Additional information"), blank=True, null=True) duplicate = models.NullBooleanField(_("Has a duplicate"), blank=True, null=True) associated_links = models.TextField(_("Symbolic links"), blank=True, null=True) cache_related_label = models.TextField( _("Related"), blank=True, null=True, db_index=True, help_text=_("Cached value - do not edit")) class Meta: verbose_name = _("Document") verbose_name_plural = _("Documents") ordering = ('title',) permissions = ( ("view_document", ugettext("Can view all Documents")), ("view_own_document", ugettext("Can view own Document")), ("add_own_document", ugettext("Can add own Document")), ("change_own_document", ugettext("Can change own Document")), ("delete_own_document", ugettext("Can delete own Document")), ) indexes = [ GinIndex(fields=['data']), ] def __str__(self): return self.title def natural_key(self): return (self.external_id,) """ @property def code(self): if not self.index: return "{}-".format(self.operation.code_patriarche or '') return "{}-{:04d}".format(self.operation.code_patriarche or '', self.index) """ def duplicate_item(self, user=None, data=None): return duplicate_item(self, user, data) def public_representation(self): site = Site.objects.get_current() if settings.ISHTAR_SECURE: scheme = "https" else: scheme = "http" base_url = scheme + "://" + site.domain + "/" image = None if self.image: image = self.image.url if not image.startswith("http"): if not image.startswith("/"): image = "/" + image image = base_url + image thumbnail = None if self.thumbnail: thumbnail = self.thumbnail.url if not thumbnail.startswith("http"): if not thumbnail.startswith("/"): thumbnail = "/" + thumbnail thumbnail = base_url + thumbnail return { "title": self.title, "reference": self.reference, "type": self.source_type and str(self.source_type), "authors": [a.public_representation() for a in self.authors.all()], "image": image, "thumbnail": thumbnail, } def get_extra_actions(self, request): """ For sheet template """ # url, base_text, icon, extra_text, extra css class, is a quick action actions = super(Document, self).get_extra_actions(request) # is_locked = self.is_locked(request.user) can_edit_document = self.can_do(request, 'change_document') if can_edit_document: actions += [ (reverse("document-qa-duplicate", args=[self.pk]), _("Duplicate"), "fa fa-clone", "", "", True), ] return actions @property def thumbnail_path(self): if not self.thumbnail: return "" return self.thumbnail.path @property def image_path(self): if not self.image: return "" return self.image.path def get_values(self, prefix="", no_values=False, filtr=None, **kwargs): values = super(Document, self).get_values( prefix=prefix, no_values=no_values, filtr=filtr, **kwargs) if not filtr or prefix + "image_path" in filtr: values[prefix + "image_path"] = self.image_path if not filtr or prefix + "thumbnail_path" in filtr: values[prefix + "thumbnail_path"] = self.thumbnail_path return values @property def images_without_main_image(self): return [] @property def associated_file_name(self): if not self.associated_file: return "" return os.path.basename(self.associated_file.name) @property def images(self): # mimic a queryset pointing to himself return Document.objects.filter( pk=self.pk, image__isnull=False).exclude(image='') @property def main_image(self): if self.images.count(): return self.images.all()[0] @property def has_related(self): for rel in self.RELATED_MODELS: if getattr(self, rel).count(): return True return False @classmethod def get_query_owns(cls, ishtaruser): query_own_list = [] for rel_model in cls.RELATED_MODELS: klass = getattr(cls, rel_model).rel.related_model q_own_dct = klass._get_query_owns_dicts(ishtaruser) if q_own_dct: query_own_list.append( (rel_model + "__", q_own_dct) ) q = None for prefix, owns in query_own_list: subq = cls._construct_query_own(prefix, owns) if subq: if not q: q = subq else: q |= subq q |= cls._construct_query_own('', [ {'history_creator': ishtaruser.user_ptr} ]) return q def get_associated_operation(self): raise NotImplementedError() @property def associated_filename(self): values = [str(getattr(self, attr)) for attr in ('source_type', 'title') if getattr(self, attr)] return slugify("-".join(values)) def _get_base_image_paths(self): if self.pk: # m2m not available if not created... for related_model in self.RELATED_MODELS: q = getattr(self, related_model).all() if q.count(): item = q.all()[0] yield item._get_base_image_path() def _get_base_image_path(self): for path in self._get_base_image_paths(): return path return "upload" def _get_available_filename(self, path, test_link=None): """ Get a filename not used If name already used - generate a name with schema: [base]-[current_number + 1].[suffix] :param path: base path :param test_link: test if an existing path match with this link :return: if test_link is not None, (new_path, link_match) otherwise the new_path """ file_split = path.split('.') suffix, base = "", "" if len(file_split) > 1: base = ".".join(file_split[0:-1]) suffix = file_split[-1] else: base = path base_split = base.split('-') current_nb = 0 if len(base_split) > 1: try: current_nb = int(base_split[-1]) base = "-".join(base_split[0:-1]) + "-" except ValueError: pass while os.path.exists(path): if test_link and os.path.islink(path) \ and os.readlink(path) == test_link: return path, True current_nb += 1 path = "{}-{}.{}".format(base, current_nb, suffix) if test_link: return path, False return path def _move_image(self): """ Move to the relevant path and create appropriate symbolic links :return: list of associated links """ if getattr(self, "_no_move", False): return reference_path = None initial_path = self.image.path filename = os.path.basename(initial_path) links = [] for related_model in self.RELATED_MODELS: q = getattr(self, related_model).all() if q.count(): item = q.all()[0] base_path = item._get_base_image_path() new_path = base_path + "/" + filename if not reference_path: reference_path = settings.MEDIA_ROOT + new_path # correct path if initial_path == reference_path: continue if not os.path.exists(os.path.dirname(reference_path)): os.makedirs(os.path.dirname(reference_path)) reference_path = self._get_available_filename( reference_path) try: os.rename(initial_path, reference_path) os.rename(self.thumbnail.path, self._get_thumb_name(reference_path)) self.image.name = reference_path[ len(settings.MEDIA_ROOT):] self._no_move = True self.save(no_path_change=True) except OSError: # file probably not on HDD - will be cleaned pass continue # create a link new_path = settings.MEDIA_ROOT + new_path if not os.path.exists(os.path.dirname(new_path)): os.makedirs(os.path.dirname(new_path)) new_path, match = self._get_available_filename( new_path, test_link=reference_path) links.append(new_path) if match: # the current link is correct continue os.symlink(reference_path, new_path) return links def related_label(self): items = [] for rel_attr in reversed(self.RELATED_MODELS): for item in getattr(self, rel_attr).all(): items.append(str(item)) return " ; ".join(items) def _generate_cache_related_label(self): return self.related_label() @classmethod def get_next_index(cls): q = cls.objects.values('index').filter( index__isnull=False).order_by("-index") if not q.count(): return 1 cid = q.all()[0]['index'] if not cid: cid = 0 return cid + 1 def set_index(self): if self.index: return self.index = self.get_next_index() @classmethod @pre_importer_action def import_get_next_index(cls, context, value): context["index"] = cls.get_next_index() def save(self, *args, **kwargs): no_path_change = 'no_path_change' in kwargs \ and kwargs.pop('no_path_change') self.set_index() if not self.associated_url: self.associated_url = None super(Document, self).save(*args, **kwargs) if self.image and not no_path_change and \ not getattr(self, '_no_path_change', False): links = self._move_image() if not links: return links = self.LINK_SPLIT.join(links) if links != self.associated_links: self.associated_links = links self.save(no_path_change=True) def document_attached_changed(sender, **kwargs): # associate a default main image instance = kwargs.get("instance", None) model = kwargs.get("model", None) pk_set = kwargs.get("pk_set", None) if not instance or not model: return if hasattr(instance, "documents"): items = [instance] else: if not pk_set: return try: items = [model.objects.get(pk=pk) for pk in pk_set] except model.DoesNotExist: return for item in items: q = item.documents.filter( image__isnull=False).exclude(image='') if item.main_image: if q.filter(pk=item.main_image.pk).count(): return # the association has disappear not the main image anymore item.main_image = None item.skip_history_when_saving = True item.save() if not q.count(): return # by default get the lowest pk item.main_image = q.order_by('pk').all()[0] item.skip_history_when_saving = True item.save() post_save.connect(cached_label_changed, sender=Document) class OperationType(GeneralType): order = models.IntegerField(_("Order"), default=1) preventive = models.BooleanField(_("Is preventive"), default=True) judiciary = models.BooleanField(_("Is judiciary"), default=False) class Meta: verbose_name = _("Operation type") verbose_name_plural = _("Operation types") ordering = ['judiciary', '-preventive', 'order', 'label'] @classmethod def get_types(cls, dct=None, instances=False, exclude=None, empty_first=True, default=None, initial=None): dct = dct or {} exclude = exclude or [] initial = initial or [] tuples = [] dct['available'] = True if not instances and empty_first and not default: tuples.append(('', '--')) if default and not instances: try: default = cls.objects.get(txt_idx=default) tuples.append((default.pk, _(str(default)))) except cls.DoesNotExist: pass items = cls.objects.filter(**dct) if default and not instances: exclude.append(default.txt_idx) if exclude: items = items.exclude(txt_idx__in=exclude) current_preventive, current_judiciary, current_lst = None, None, None item_list = list(items.order_by(*cls._meta.ordering).all()) new_vals = cls._get_initial_types(initial, [i.pk for i in item_list], instance=True) item_list += new_vals for item in item_list: item.rank = 0 if instances: return item_list for item in item_list: if not current_lst or item.preventive != current_preventive \ or item.judiciary != current_judiciary: if current_lst: tuples.append(current_lst) if item.judiciary: gp_lbl = _("Judiciary") elif item.preventive: gp_lbl = _("Preventive") else: gp_lbl = _("Research") current_lst = [gp_lbl, []] current_preventive = item.preventive current_judiciary = item.judiciary current_lst[1].append((item.pk, _(str(item)))) if current_lst: tuples.append(current_lst) return tuples @classmethod def is_preventive(cls, ope_type_id, key=''): try: op_type = cls.objects.get(pk=ope_type_id) except cls.DoesNotExist: return False if not key: return op_type.preventive return key == op_type.txt_idx @classmethod def is_judiciary(cls, ope_type_id): try: op_type = cls.objects.get(pk=ope_type_id) except cls.DoesNotExist: return False return op_type.judiciary post_save.connect(post_save_cache, sender=OperationType) post_delete.connect(post_save_cache, sender=OperationType) class AdministrationScript(models.Model): path = models.CharField(_("Filename"), max_length=30) name = models.TextField(_("Name"), null=True, blank=True) class Meta: verbose_name = _("Administration script") verbose_name_plural = _("Administration scripts") ordering = ['name'] def __str__(self): return str(self.name) SCRIPT_STATE = (("S", _("Scheduled")), ("P", _("In progress")), ("FE", _("Finished with errors")), ("F", _("Finished")), ) SCRIPT_STATE_DCT = dict(SCRIPT_STATE) class AdministrationTask(models.Model): script = models.ForeignKey(AdministrationScript) state = models.CharField(_("State"), max_length=2, choices=SCRIPT_STATE, default='S') creation_date = models.DateTimeField(default=datetime.datetime.now) launch_date = models.DateTimeField(null=True, blank=True) finished_date = models.DateTimeField(null=True, blank=True) result = models.TextField(_("Result"), null=True, blank=True) class Meta: verbose_name = _("Administration task") verbose_name_plural = _("Administration tasks") ordering = ['script'] def __str__(self): state = _("Unknown") if self.state in SCRIPT_STATE_DCT: state = str(SCRIPT_STATE_DCT[self.state]) return "{} - {} - {}".format(self.script, self.creation_date, state) def execute(self): if self.state != 'S': return self.launch_date = datetime.datetime.now() script_dir = settings.ISHTAR_SCRIPT_DIR if not script_dir: self.result = str( _("ISHTAR_SCRIPT_DIR is not set in your " "local_settings. Contact your administrator.")) self.state = 'FE' self.finished_date = datetime.datetime.now() self.save() return if '..' in script_dir: self.result = str( _("Your ISHTAR_SCRIPT_DIR is containing " "dots \"..\". As it can refer to relative " "paths, it can be a security issue and this is " "not allowed. Only put a full path.")) self.state = 'FE' self.finished_date = datetime.datetime.now() self.save() return if not os.path.isdir(script_dir): self.result = str( _("Your ISHTAR_SCRIPT_DIR: \"{}\" is not a valid directory.") ).format(script_dir) self.state = 'FE' self.finished_date = datetime.datetime.now() self.save() return script_name = None # only script inside the script directory can be executed for name in os.listdir(script_dir): if name == self.script.path: if os.path.isfile(os.path.join(script_dir, name)): script_name = os.path.join(script_dir, name) break if not script_name: self.result = str( _("Script \"{}\" is not available in your script directory. " "Check your configuration.") ).format(self.script.path) self.state = 'FE' self.finished_date = datetime.datetime.now() self.save() return self.state = 'P' self.save() self.finished_date = datetime.datetime.now() try: session = Popen([script_name], stdout=PIPE, stderr=PIPE) stdout, stderr = session.communicate() except OSError as e: self.state = 'FE' self.result = "Error executing \"{}\" script: {}".format( self.script.path, e) self.save() return self.finished_date = datetime.datetime.now() if stderr: self.state = 'FE' self.result = "Error: {}".format(stderr.decode('utf-8')) else: self.state = 'F' self.result = "{}".format(stdout.decode('utf-8')) self.save() ITEM_TYPES = ( ("O", _("Operation")), ("S", _("Archaeological site")), ("CR", _("Context record")), ("F", _("Find")), ("W", _("Warehouse")), ) EXPORT_STATE = (("C", _("Created")),) + SCRIPT_STATE class ExportTask(models.Model): filter_type = models.CharField( _("Filter on"), max_length=2, choices=ITEM_TYPES, null=True, blank=True ) filter_text = models.TextField( _("Filter query"), null=True, blank=True, help_text=_("Textual query on this item (try it on the main " "interface)")) geo = models.BooleanField( _("Export geographic data"), default=True, help_text=_("Geographic data can represent large volume of " "information. Geographic data can be excluded from the " "export")) state = models.CharField(_("State"), max_length=2, choices=EXPORT_STATE, default='C') put_locks = models.BooleanField(_("Put locks on associated items"), default=False) lock_user = models.ForeignKey( User, related_name='+', on_delete=models.SET_NULL, verbose_name=_("Lock user"), blank=True, null=True, help_text=_("Owner of the lock if item are locked. Warning: if no " "user is provided the locks can be remove by any user " "with the permission to edit.") ) export_types = models.BooleanField(_("Export types"), default=True) export_conf = models.BooleanField(_("Export configuration"), default=True) export_importers = models.BooleanField(_("Export importers"), default=True) export_geo = models.BooleanField(_("Export towns, areas..."), default=True) export_dir = models.BooleanField(_("Export directory"), default=True) export_docs = models.BooleanField(_("Export documents"), default=True) export_items = models.BooleanField(_("Export main items"), default=True) creation_date = models.DateTimeField(default=datetime.datetime.now) launch_date = models.DateTimeField(null=True, blank=True) finished_date = models.DateTimeField(null=True, blank=True) result = models.FileField(_("Result"), null=True, blank=True, upload_to="exports/%Y/%m/") result_info = models.TextField(_("Result information"), null=True, blank=True) class Meta: verbose_name = _("Archive - Export") verbose_name_plural = _("Archive - Exports") ordering = ['creation_date'] def __str__(self): state = _("Unknown") if self.state in SCRIPT_STATE_DCT: state = str(SCRIPT_STATE_DCT[self.state]) return "Export - {} - {}".format(self.creation_date, state) @property def label(self): fltr = _("Whole database") if self.filter_type and self.filter_text: dct = dict(ITEM_TYPES) if self.filter_type in dct: fltr = '{} "{}"'.format(dct[self.filter_type], self.filter_text) return "{} - {}".format(fltr, self.creation_date) def clean(self): if (self.filter_text and not self.filter_type) or ( self.filter_type and not self.filter_text): raise ValidationError( _("To filter filter type and filter text must be filled.") ) class ImportTask(models.Model): creation_date = models.DateTimeField(default=datetime.datetime.now) launch_date = models.DateTimeField(null=True, blank=True) finished_date = models.DateTimeField(null=True, blank=True) import_user = models.ForeignKey( User, related_name='+', on_delete=models.SET_NULL, verbose_name=_("Import user"), blank=True, null=True, help_text=_("If set the \"Import user\" will be the editor for last " "version. If the field is left empty no history will be " "recorded.") ) state = models.CharField(_("State"), max_length=2, choices=EXPORT_STATE, default='C') delete_before = models.BooleanField( _("Delete before adding"), default=False, help_text=_("Delete existing items before adding")) releasing_locks = models.BooleanField( _("Releasing locks on associated items"), default=False) source = models.FileField(_("Source"), upload_to="imports/%Y/%m/") class Meta: verbose_name = _("Archive - Import") verbose_name_plural = _("Archive - Imports") ordering = ['creation_date'] def __str__(self): state = _("Unknown") if self.state in SCRIPT_STATE_DCT: state = str(SCRIPT_STATE_DCT[self.state]) return "Import - {} - {}".format(self.creation_date, state)