diff options
Diffstat (limited to 'ishtar_common/models.py')
-rw-r--r-- | ishtar_common/models.py | 2890 |
1 files changed, 54 insertions, 2836 deletions
diff --git a/ishtar_common/models.py b/ishtar_common/models.py index 001c1894f..f23cab926 100644 --- a/ishtar_common/models.py +++ b/ishtar_common/models.py @@ -21,7 +21,6 @@ Models description """ import copy -from collections import OrderedDict import datetime import inspect from importlib import import_module @@ -29,9 +28,6 @@ from jinja2 import TemplateSyntaxError, UndefinedError import json import logging import os -import pyqrcode -import re -import shutil import string import tempfile import time @@ -46,7 +42,6 @@ import zipfile from urllib.parse import urlencode from xml.etree import ElementTree as ET -from django import forms from django.apps import apps from django.conf import settings from django.contrib.auth.models import User, Group @@ -54,32 +49,22 @@ from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.contenttypes.models import ContentType from django.contrib.gis.db import models from django.contrib.postgres.fields import JSONField -from django.contrib.postgres.search import SearchVectorField, SearchVector from django.contrib.postgres.indexes import GinIndex from django.contrib.sites.models import Site from django.core.cache import cache from django.core.exceptions import ObjectDoesNotExist, ValidationError, \ MultipleObjectsReturned from django.core.files.uploadedfile import SimpleUploadedFile -from django.core.files import File -from django.core.serializers import serialize from django.core.urlresolvers import reverse, NoReverseMatch -from django.core.validators import validate_slug -from django.db import connection from django.db.models import Q, Max, Count, F from django.db.models.signals import post_save, post_delete, m2m_changed from django.db.utils import DatabaseError from django.template.defaultfilters import slugify from django.utils.functional import lazy -from django.utils.safestring import SafeText, mark_safe -from django.utils.translation import activate, deactivate from ishtar_common.utils import ugettext_lazy as _, ugettext, \ - pgettext_lazy + pgettext_lazy, get_external_id, get_current_profile, duplicate_item, \ + get_image_path from ishtar_common.utils_secretary import IshtarSecretaryRenderer -from simple_history.models import HistoricalRecords as BaseHistoricalRecords -from simple_history.signals import post_create_historical_record, \ - pre_create_historical_record -from unidecode import unidecode from ishtar_common.alternative_configs import ALTERNATE_CONFIGS, \ ALTERNATE_CONFIGS_CHOICES @@ -87,16 +72,24 @@ from ishtar_common.alternative_configs import ALTERNATE_CONFIGS, \ from ishtar_common.data_importer import pre_importer_action from ishtar_common.model_managers import SlugModelManager, ExternalIdManager, \ - TypeManager, UUIDModelManager + UUIDModelManager from ishtar_common.model_merging import merge_model_objects from ishtar_common.models_imports import ImporterModel, ImporterType, \ ImporterDefault, ImporterDefaultValues, ImporterColumn, \ ImporterDuplicateField, Regexp, ImportTarget, TargetKey, FormaterType, \ Import, TargetKeyGroup, ValueFormater -from ishtar_common.templatetags.link_to_window import simple_link_to_window -from ishtar_common.utils import get_cache, disable_for_loaddata, create_slug, \ - get_all_field_names, merge_tsvectors, cached_label_changed, post_save_geo, \ - generate_relation_graph, max_size_help, task +from ishtar_common.utils import get_cache, create_slug, \ + get_all_field_names, cached_label_changed, \ + generate_relation_graph, max_size_help + +from ishtar_common.models_common import GeneralType, HierarchicalType, \ + BaseHistorizedItem, LightHistorizedItem, FullSearch, Imported, \ + FixAssociated, SearchAltName, HistoryError, OwnPerms, Cached, \ + Address, post_save_cache, TemplateItem, SpatialReferenceSystem, \ + DashboardFormItem, document_attached_changed, SearchAltName, \ + DynamicRequest, GeoItem, QRCodeItem, SearchVectorConfig, DocumentItem, \ + QuickAction, MainItem, Merge, ShortMenuItem, Town, ImageContainerModel, \ + StatisticItem, CachedGen, CascasdeUpdate, Department, State __all__ = [ 'ImporterModel', 'ImporterType', 'ImporterDefault', 'ImporterDefaultValues', @@ -107,7 +100,8 @@ __all__ = [ 'LightHistorizedItem', 'OwnPerms', 'Address', 'post_save_cache', 'DashboardFormItem', 'ShortMenuItem', 'document_attached_changed', 'SearchAltName', 'DynamicRequest', 'GeoItem', 'QRCodeItem', - 'SearchVectorConfig', 'DocumentItem' + 'SearchVectorConfig', 'DocumentItem', 'CachedGen', 'StatisticItem', + 'CascasdeUpdate', 'Department', 'State' ] logger = logging.getLogger(__name__) @@ -262,88 +256,6 @@ class HistoryModel(models.Model): create=create) -class HistoricalRecords(BaseHistoricalRecords): - def _save_historic(self, manager, instance, history_date, history_type, - history_user, history_change_reason, using, attrs): - history_instance = manager.model( - history_date=history_date, - history_type=history_type, - history_user=history_user, - history_change_reason=history_change_reason, - **attrs - ) - - pre_create_historical_record.send( - sender=manager.model, - instance=instance, - history_date=history_date, - history_user=history_user, - history_change_reason=history_change_reason, - history_instance=history_instance, - using=using, - ) - - history_instance.save(using=using) - - post_create_historical_record.send( - sender=manager.model, - instance=instance, - history_instance=history_instance, - history_date=history_date, - history_user=history_user, - history_change_reason=history_change_reason, - using=using, - ) - - def create_historical_record(self, instance, history_type, using=None): - try: - history_modifier = getattr(instance, 'history_modifier', None) - assert history_modifier - except (User.DoesNotExist, AssertionError): - # on batch removing of users, user could have disappeared - return - history_date = getattr(instance, "_history_date", - datetime.datetime.now()) - history_change_reason = getattr(instance, "changeReason", None) - force = getattr(instance, "_force_history", False) - manager = getattr(instance, self.manager_name) - attrs = {} - for field in instance._meta.fields: - attrs[field.attname] = getattr(instance, field.attname) - q_history = instance.history \ - .filter(history_modifier_id=history_modifier.pk) \ - .order_by('-history_date', '-history_id') - # instance.skip_history_when_saving = True - if not q_history.count(): - if force: - delattr(instance, '_force_history') - self._save_historic( - manager, instance, history_date, history_type, history_modifier, - history_change_reason, using, attrs) - return - old_instance = q_history.all()[0] - # multiple saving by the same user in a very short time are generaly - # caused by post_save signals it is not relevant to keep them - min_history_date = datetime.datetime.now() \ - - datetime.timedelta(seconds=5) - q = q_history.filter(history_date__isnull=False, - history_date__gt=min_history_date) \ - .order_by('-history_date', '-history_id') - if not force and q.count(): - return - - if force: - delattr(instance, '_force_history') - - # record a new version only if data have been changed - for field in instance._meta.fields: - if getattr(old_instance, field.attname) != attrs[field.attname]: - self._save_historic(manager, instance, history_date, - history_type, history_modifier, - history_change_reason, using, attrs) - return - - def valid_id(cls): # valid ID validator for models def func(value): @@ -383,741 +295,6 @@ def is_unique(cls, field): return func -class OwnPerms(object): - """ - Manage special permissions for object's owner - """ - - @classmethod - def get_query_owns(cls, ishtaruser): - """ - Query object to get own items - """ - return None # implement for each object - - def can_view(self, request): - if hasattr(self, "LONG_SLUG"): - perm = "view_" + self.LONG_SLUG - else: - perm = "view_" + self.SLUG - return self.can_do(request, perm) - - def can_do(self, request, action_name): - """ - Check permission availability for the current object. - :param request: request object - :param action_name: action name eg: "change_find" - "own" variation is - checked - :return: boolean - """ - if not getattr(request.user, 'ishtaruser', None): - return False - splited = action_name.split('_') - action_own_name = splited[0] + '_own_' + '_'.join(splited[1:]) - user = request.user - if action_own_name == "view_own_findbasket": - action_own_name = "view_own_find" - return user.ishtaruser.has_right(action_name, request.session) or \ - (user.ishtaruser.has_right(action_own_name, request.session) - and self.is_own(user.ishtaruser)) - - def is_own(self, user, alt_query_own=None): - """ - Check if the current object is owned by the user - """ - if isinstance(user, IshtarUser): - ishtaruser = user - elif hasattr(user, 'ishtaruser'): - ishtaruser = user.ishtaruser - else: - return False - if not alt_query_own: - query = self.get_query_owns(ishtaruser) - else: - query = getattr(self, alt_query_own)(ishtaruser) - if not query: - return False - query &= Q(pk=self.pk) - return self.__class__.objects.filter(query).count() - - @classmethod - def has_item_of(cls, user): - """ - Check if the user own some items - """ - if isinstance(user, IshtarUser): - ishtaruser = user - elif hasattr(user, 'ishtaruser'): - ishtaruser = user.ishtaruser - else: - return False - query = cls.get_query_owns(ishtaruser) - if not query: - return False - return cls.objects.filter(query).count() - - @classmethod - def _return_get_owns(cls, owns, values, get_short_menu_class, - label_key='cached_label'): - if not owns: - return [] - sorted_values = [] - if hasattr(cls, 'BASKET_MODEL'): - owns_len = len(owns) - for idx, item in enumerate(reversed(owns)): - if get_short_menu_class: - item = item[0] - if type(item) == cls.BASKET_MODEL: - basket = owns.pop(owns_len - idx - 1) - sorted_values.append(basket) - sorted_values = list(reversed(sorted_values)) - if not values: - if not get_short_menu_class: - return sorted_values + list( - sorted(owns, key=lambda x: getattr(x, label_key) or "")) - return sorted_values + list( - sorted(owns, key=lambda x: getattr(x[0], label_key) or "")) - if not get_short_menu_class: - return sorted_values + list( - sorted(owns, key=lambda x: x[label_key] or "")) - return sorted_values + list( - sorted(owns, key=lambda x: x[0][label_key] or "")) - - @classmethod - def get_owns(cls, user, replace_query=None, limit=None, values=None, - get_short_menu_class=False, menu_filtr=None): - """ - Get Own items - """ - if not replace_query: - replace_query = {} - if hasattr(user, 'is_authenticated') and not user.is_authenticated(): - returned = cls.objects.filter(pk__isnull=True) - if values: - returned = [] - return returned - if isinstance(user, User): - try: - ishtaruser = IshtarUser.objects.get(user_ptr=user) - except IshtarUser.DoesNotExist: - returned = cls.objects.filter(pk__isnull=True) - if values: - returned = [] - return returned - elif isinstance(user, IshtarUser): - ishtaruser = user - else: - if values: - return [] - return cls.objects.filter(pk__isnull=True) - items = [] - if hasattr(cls, 'BASKET_MODEL'): - items = list(cls.BASKET_MODEL.objects.filter(user=ishtaruser).all()) - query = cls.get_query_owns(ishtaruser) - if not query and not replace_query: - returned = cls.objects.filter(pk__isnull=True) - if values: - returned = [] - return returned - if query: - q = cls.objects.filter(query) - else: # replace_query - q = cls.objects.filter(replace_query) - if values: - q = q.values(*values) - if limit: - items += list(q.order_by('-pk')[:limit]) - else: - items += list(q.order_by(*cls._meta.ordering).all()) - if get_short_menu_class: - if values: - if 'id' not in values: - raise NotImplementedError( - "Call of get_owns with get_short_menu_class option and" - " no 'id' in values is not implemented") - my_items = [] - for i in items: - if hasattr(cls, 'BASKET_MODEL') and \ - type(i) == cls.BASKET_MODEL: - dct = dict([(k, getattr(i, k)) for k in values]) - my_items.append( - (dct, cls.BASKET_MODEL.get_short_menu_class(i.pk))) - else: - my_items.append((i, cls.get_short_menu_class(i['id']))) - items = my_items - else: - items = [(i, cls.get_short_menu_class(i.pk)) for i in items] - return items - - @classmethod - def _get_query_owns_dicts(cls, ishtaruser): - """ - List of query own dict to construct the query. - Each dict are join with an AND operator, each dict key, values are - joined with OR operator - """ - return [] - - @classmethod - def _construct_query_own(cls, prefix, dct_list): - q = None - for subquery_dict in dct_list: - subquery = None - for k in subquery_dict: - subsubquery = Q(**{prefix + k: subquery_dict[k]}) - if subquery: - subquery |= subsubquery - else: - subquery = subsubquery - if not subquery: - continue - if q: - q &= subquery - else: - q = subquery - return q - - -class CachedGen(object): - @classmethod - def refresh_cache(cls): - raise NotImplementedError() - - @classmethod - def _add_cache_key_to_refresh(cls, keys): - cache_ckey, current_keys = get_cache(cls, ['_current_keys']) - if type(current_keys) != list: - current_keys = [] - if keys not in current_keys: - current_keys.append(keys) - cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT) - - -class Cached(CachedGen): - slug_field = 'txt_idx' - - @classmethod - def refresh_cache(cls): - cache_ckey, current_keys = get_cache(cls, ['_current_keys']) - if not current_keys: - return - for keys in current_keys: - if len(keys) == 2 and keys[0] == '__slug': - cls.get_cache(keys[1], force=True) - elif keys[0] == '__get_types': - default = None - empty_first = True - exclude = [] - if len(keys) >= 2: - default = keys.pop() - if len(keys) > 1: - empty_first = bool(keys.pop()) - exclude = keys[1:] - cls.get_types( - exclude=exclude, empty_first=empty_first, default=default, - force=True) - elif keys[0] == '__get_help': - cls.get_help(force=True) - - @classmethod - def _add_cache_key_to_refresh(cls, keys): - cache_ckey, current_keys = get_cache(cls, ['_current_keys']) - if type(current_keys) != list: - current_keys = [] - if keys not in current_keys: - current_keys.append(keys) - cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT) - - @classmethod - def get_cache(cls, slug, force=False): - cache_key, value = get_cache(cls, ['__slug', slug]) - if not force and value: - return value - try: - k = {cls.slug_field: slug} - obj = cls.objects.get(**k) - cache.set(cache_key, obj, settings.CACHE_TIMEOUT) - return obj - except cls.DoesNotExist: - cache.set(cache_key, None, settings.CACHE_TIMEOUT) - return None - - -@disable_for_loaddata -def post_save_cache(sender, **kwargs): - sender.refresh_cache() - - -class GeneralType(Cached, models.Model): - """ - Abstract class for "types" - """ - label = models.TextField(_("Label")) - txt_idx = models.TextField( - _("Textual ID"), validators=[validate_slug], - unique=True, - help_text=_( - "The slug is the standardized version of the name. It contains " - "only lowercase letters, numbers and hyphens. Each slug must " - "be unique.")) - comment = models.TextField(_("Comment"), blank=True, null=True) - available = models.BooleanField(_("Available"), default=True) - HELP_TEXT = "" - objects = TypeManager() - - class Meta: - abstract = True - - def __str__(self): - return self.label - - def natural_key(self): - return (self.txt_idx,) - - def history_compress(self): - return self.txt_idx - - @classmethod - def history_decompress(cls, value, create=False): - if not value: - return [] - res = [] - for txt_idx in value: - try: - res.append(cls.objects.get(txt_idx=txt_idx)) - except cls.DoesNotExist: - continue - return res - - @property - def explicit_label(self): - return "{} ({})".format(self.label, self._meta.verbose_name) - - @classmethod - def create_default_for_test(cls): - return [cls.objects.create(label='Test %d' % i) for i in range(5)] - - @property - def short_label(self): - return self.label - - @property - def name(self): - return self.label - - @classmethod - def get_or_create(cls, slug, label=''): - """ - Get or create a new item. - - :param slug: textual id - :param label: label for initialization if the item doesn't exist (not - mandatory) - - :return: instancied item of the base class - """ - - item = cls.get_cache(slug) - if item: - return item - if isinstance(slug, list): - slug = slug[0] - item, __ = cls.objects.get_or_create( - txt_idx=slug, defaults={'label': label}) - return item - - @classmethod - def get_or_create_pk(cls, slug): - """ - Get an id from a slug. Create the associated item if needed. - - :param slug: textual id - - :return: id of the item (string) - """ - return str(cls.get_or_create(slug).pk) - - @classmethod - def get_or_create_pks(cls, slugs): - """ - Get and merge a list of ids from a slug list. Create the associated - items if needed. - - :param slugs: textual ids - - :return: string with ids separated by "_" - """ - items = [] - for slug in slugs: - items.append(str(cls.get_or_create(slug).pk)) - return "_".join(items) - - @classmethod - def get_help(cls, dct=None, exclude=None, force=False, full_hierarchy=None): - if not dct: - dct = {} - if not exclude: - exclude = [] - keys = ['__get_help'] - keys += ["{}".format(ex) for ex in exclude] - keys += ['{}-{}'.format(str(k), dct[k]) for k in dct] - cache_key, value = get_cache(cls, keys) - if value and not force: - return mark_safe(value) - help_text = cls.HELP_TEXT - c_rank = -1 - help_items = "\n" - for item in cls.get_types(dct=dct, instances=True, exclude=exclude): - if hasattr(item, '__iter__'): - pk = item[0] - item = cls.objects.get(pk=pk) - item.rank = c_rank + 1 - if hasattr(item, 'parent'): - c_item = item - parents = [] - while c_item.parent: - parents.append(c_item.parent.label) - c_item = c_item.parent - parents.reverse() - parents.append(item.label) - item.label = " / ".join(parents) - if not item.comment: - continue - if c_rank > item.rank: - help_items += "</dl>\n" - elif c_rank < item.rank: - help_items += "<dl>\n" - c_rank = item.rank - help_items += "<dt>%s</dt><dd>%s</dd>" % ( - item.label, "<br/>".join(item.comment.split('\n'))) - c_rank += 1 - if c_rank: - help_items += c_rank * "</dl>" - if help_text or help_items != u'\n': - help_text = help_text + help_items - else: - help_text = "" - cache.set(cache_key, help_text, settings.CACHE_TIMEOUT) - return mark_safe(help_text) - - @classmethod - def _get_initial_types(cls, initial, type_pks, instance=False): - new_vals = [] - if not initial: - return [] - if type(initial) not in (list, tuple): - initial = [initial] - for value in initial: - try: - pk = int(value) - except (ValueError, TypeError): - continue - if pk in type_pks: - continue - try: - extra_type = cls.objects.get(pk=pk) - if instance: - new_vals.append(extra_type) - else: - new_vals.append((extra_type.pk, str(extra_type))) - except cls.DoesNotExist: - continue - return new_vals - - @classmethod - def get_types(cls, dct=None, instances=False, exclude=None, - empty_first=True, default=None, initial=None, force=False, - full_hierarchy=False): - if not dct: - dct = {} - if not exclude: - exclude = [] - types = [] - if not instances and empty_first and not default: - types = [('', '--')] - types += cls._pre_get_types(dct, instances, exclude, - default, force, - get_full_hierarchy=full_hierarchy) - if not initial: - return types - new_vals = cls._get_initial_types(initial, [idx for idx, lbl in types]) - types += new_vals - return types - - @classmethod - def _pre_get_types(cls, dct=None, instances=False, exclude=None, - default=None, force=False, get_full_hierarchy=False): - if not dct: - dct = {} - if not exclude: - exclude = [] - # cache - cache_key = None - if not instances: - keys = ['__get_types'] - keys += ["{}".format(ex) for ex in exclude] + \ - ["{}".format(default)] - keys += ['{}-{}'.format(str(k), dct[k]) for k in dct] - cache_key, value = get_cache(cls, keys) - if value and not force: - return value - base_dct = dct.copy() - if hasattr(cls, 'parent'): - if not cache_key: - return cls._get_parent_types( - base_dct, instances, exclude=exclude, - default=default, get_full_hierarchy=get_full_hierarchy) - vals = [v for v in cls._get_parent_types( - base_dct, instances, exclude=exclude, - default=default, get_full_hierarchy=get_full_hierarchy)] - cache.set(cache_key, vals, settings.CACHE_TIMEOUT) - return vals - - if not cache_key: - return cls._get_types(base_dct, instances, exclude=exclude, - default=default) - vals = [ - v for v in cls._get_types(base_dct, instances, exclude=exclude, - default=default) - ] - cache.set(cache_key, vals, settings.CACHE_TIMEOUT) - return vals - - @classmethod - def _get_types(cls, dct=None, instances=False, exclude=None, default=None): - if not dct: - dct = {} - if not exclude: - exclude = [] - dct['available'] = True - if default: - try: - default = cls.objects.get(txt_idx=default) - yield (default.pk, _(str(default))) - except cls.DoesNotExist: - pass - items = cls.objects.filter(**dct) - if default and default != "None": - if hasattr(default, 'txt_idx'): - exclude.append(default.txt_idx) - else: - exclude.append(default) - if exclude: - items = items.exclude(txt_idx__in=exclude) - for item in items.order_by(*cls._meta.ordering).all(): - if instances: - item.rank = 0 - yield item - else: - yield (item.pk, _(str(item)) if item and str(item) else '') - - @classmethod - def _get_childs_list(cls, dct=None, exclude=None, instances=False): - if not dct: - dct = {} - if not exclude: - exclude = [] - if 'parent' in dct: - dct.pop('parent') - childs = cls.objects.filter(**dct) - if exclude: - childs = childs.exclude(txt_idx__in=exclude) - if hasattr(cls, 'order'): - childs = childs.order_by('order') - res = {} - if instances: - for item in childs.all(): - parent_id = item.parent_id or 0 - if parent_id not in res: - res[parent_id] = [] - res[parent_id].append(item) - else: - for item in childs.values("id", "parent_id", "label").all(): - parent_id = item["parent_id"] or 0 - if item["id"] == item["parent_id"]: - parent_id = 0 - if parent_id not in res: - res[parent_id] = [] - res[parent_id].append((item["id"], item["label"])) - return res - - PREFIX = "│ " - PREFIX_EMPTY = " " - PREFIX_MEDIUM = "├ " - PREFIX_LAST = "└ " - PREFIX_CODES = ["\u2502", "\u251C", "\u2514"] - - @classmethod - def _get_childs(cls, item, child_list, prefix=0, instances=False, - is_last=False, last_of=None, get_full_hierarchy=False): - if not last_of: - last_of = [] - - prefix += 1 - current_child_lst = [] - if item in child_list: - current_child_lst = child_list[item] - - lst = [] - total = len(current_child_lst) - full_hierarchy_initial = get_full_hierarchy - for idx, child in enumerate(current_child_lst): - mylast_of = last_of[:] - p = '' - if instances: - child.rank = prefix - lst.append(child) - else: - if full_hierarchy_initial: - if isinstance(full_hierarchy_initial, str): - p = full_hierarchy_initial + " > " - else: - p = "" - else: - cprefix = prefix - while cprefix: - cprefix -= 1 - if not cprefix: - if (idx + 1) == total: - p += cls.PREFIX_LAST - else: - p += cls.PREFIX_MEDIUM - elif is_last: - if mylast_of: - clast = mylast_of.pop(0) - if clast: - p += cls.PREFIX_EMPTY - else: - p += cls.PREFIX - else: - p += cls.PREFIX_EMPTY - else: - p += cls.PREFIX - lst.append(( - child[0], SafeText(p + str(_(child[1]))) - )) - clast_of = last_of[:] - clast_of.append(idx + 1 == total) - if instances: - child_id = child.id - else: - child_id = child[0] - if get_full_hierarchy: - if p: - if not p.endswith(" > "): - p += " > " - get_full_hierarchy = p + child[1] - else: - get_full_hierarchy = child[1] - for sub_child in cls._get_childs( - child_id, child_list, prefix, instances, - is_last=((idx + 1) == total), last_of=clast_of, - get_full_hierarchy=get_full_hierarchy): - lst.append(sub_child) - return lst - - @classmethod - def _get_parent_types(cls, dct=None, instances=False, exclude=None, - default=None, get_full_hierarchy=False): - if not dct: - dct = {} - if not exclude: - exclude = [] - dct['available'] = True - child_list = cls._get_childs_list(dct, exclude, instances) - - if 0 in child_list: - for item in child_list[0]: - if instances: - item.rank = 0 - item_id = item.pk - yield item - else: - item_id = item[0] - yield item - if get_full_hierarchy: - get_full_hierarchy = item[1] - for child in cls._get_childs( - item_id, child_list, instances=instances, - get_full_hierarchy=get_full_hierarchy): - yield child - - def save(self, *args, **kwargs): - if not self.id and not self.label: - txt_idx = self.txt_idx - if isinstance(txt_idx, list): - txt_idx = txt_idx[0] - self.txt_idx = txt_idx - self.label = " ".join(" ".join(self.txt_idx.split('-')) - .split('_')).title() - if not self.txt_idx: - self.txt_idx = slugify(self.label)[:100] - - # clean old keys - if self.pk: - old = self.__class__.objects.get(pk=self.pk) - content_type = ContentType.objects.get_for_model(self.__class__) - if slugify(self.label) != slugify(old.label): - ItemKey.objects.filter( - object_id=self.pk, key=slugify(old.label), - content_type=content_type).delete() - if self.txt_idx != old.txt_idx: - ItemKey.objects.filter( - object_id=self.pk, key=old.txt_idx, - content_type=content_type).delete() - - obj = super(GeneralType, self).save(*args, **kwargs) - self.generate_key(force=True) - return obj - - def add_key(self, key, force=False, importer=None, group=None, - user=None): - content_type = ContentType.objects.get_for_model(self.__class__) - if not importer and not force and ItemKey.objects.filter( - key=key, content_type=content_type).count(): - return - filtr = {'key': key, 'content_type': content_type} - if group: - filtr['group'] = group - elif user: - filtr['user'] = user - else: - filtr['importer'] = importer - if force: - ItemKey.objects.filter(**filtr).exclude(object_id=self.pk).delete() - filtr['object_id'] = self.pk - ItemKey.objects.get_or_create(**filtr) - - def generate_key(self, force=False): - for key in (slugify(self.label), self.txt_idx): - self.add_key(key) - - def get_keys(self, importer): - keys = [self.txt_idx] - content_type = ContentType.objects.get_for_model(self.__class__) - base_q = Q(content_type=content_type, object_id=self.pk) - subquery = Q(importer__isnull=True, user__isnull=True, - group__isnull=True) - subquery |= Q(user__isnull=True, group__isnull=True, - importer=importer) - if importer.user: - subquery |= Q(user=importer.user, group__isnull=True, - importer=importer) - if importer.associated_group: - subquery |= Q(user__isnull=True, group=importer.associated_group, - importer=importer) - q = ItemKey.objects.filter(base_q & subquery) - for ik in q.exclude(key=self.txt_idx).all(): - keys.append(ik.key) - return keys - - @classmethod - def generate_keys(cls): - # content_type = ContentType.objects.get_for_model(cls) - for item in cls.objects.all(): - item.generate_key() - - def get_general_type_label(model, slug): obj = model.get_cache(slug) if not obj: @@ -1125,23 +302,6 @@ def get_general_type_label(model, slug): return str(obj) -class HierarchicalType(GeneralType): - parent = models.ForeignKey('self', blank=True, null=True, - on_delete=models.SET_NULL, - verbose_name=_("Parent")) - - class Meta: - abstract = True - - def full_label(self): - lbls = [self.label] - item = self - while item.parent: - item = item.parent - lbls.append(item.label) - return " > ".join(reversed(lbls)) - - class TinyUrl(models.Model): CHAR_MAP = string.ascii_letters + string.digits CHAR_MAP_LEN = len(CHAR_MAP) @@ -1183,24 +343,6 @@ class ItemKey(models.Model): return self.key -def get_image_path(instance, filename): - # when using migrations instance is not a real ImageModel instance - if not hasattr(instance, '_get_image_path'): - n = datetime.datetime.now() - return "upload/{}/{:02d}/{:02d}/{}".format( - n.year, n.month, n.day, filename) - return instance._get_image_path(filename) - - -class ImageContainerModel(object): - def _get_image_path(self, filename): - return "{}/{}".format(self._get_base_image_path(), filename) - - def _get_base_image_path(self): - n = datetime.datetime.now() - return "upload/{}/{:02d}/{:02d}".format(n.year, n.month, n.day) - - class ImageModel(models.Model, ImageContainerModel): image = models.ImageField(upload_to=get_image_path, blank=True, null=True, max_length=255, help_text=max_size_help()) @@ -1286,17 +428,6 @@ class ImageModel(models.Model, ImageContainerModel): ) -class HistoryError(Exception): - def __init__(self, value): - self.value = value - - def __str__(self): - return repr(self.value) - - -PRIVATE_FIELDS = ('id', 'history_modifier', 'order', 'uuid') - - class BulkUpdatedItem(object): @classmethod def bulk_recursion(cls, transaction_id, extra_args): @@ -1456,1071 +587,6 @@ class JsonDataField(models.Model): _("Content types of the field and of the menu do not match")) -class JsonData(models.Model, CachedGen): - data = JSONField(default={}, blank=True) - - class Meta: - abstract = True - - def pre_save(self): - if not self.data: - self.data = {} - - @property - def json_sections(self): - sections = [] - try: - content_type = ContentType.objects.get_for_model(self) - except ContentType.DoesNotExists: - return sections - fields = list(JsonDataField.objects.filter( - content_type=content_type, display=True, section__isnull=True - ).all()) # no section fields - - fields += list(JsonDataField.objects.filter( - content_type=content_type, display=True, section__isnull=False - ).order_by('section__order', 'order').all()) - - for field in fields: - value = None - data = self.data.copy() - for key in field.key.split('__'): - if key in data: - value = copy.copy(data[key]) - data = data[key] - else: - value = None - break - if value is None: - continue - if type(value) in (list, tuple): - value = " ; ".join([str(v) for v in value]) - section_name = field.section.name if field.section else None - if not sections or section_name != sections[-1][0]: - # if section name is identical it is the same - sections.append((section_name, [])) - sections[-1][1].append((field.name, value)) - return sections - - @classmethod - def refresh_cache(cls): - __, refreshed = get_cache(cls, ['cache_refreshed']) - if refreshed and time.time() - refreshed < 1: - return - cache_ckey, current_keys = get_cache(cls, ['_current_keys']) - if not current_keys: - return - for keys in current_keys: - if keys[0] == '__get_dynamic_choices': - cls._get_dynamic_choices(keys[1], force=True) - - @classmethod - def _get_dynamic_choices(cls, key, force=False): - """ - Get choice from existing values - :param key: data key - :param force: if set to True do not use cache - :return: tuple of choices (id, value) - """ - cache_key, value = get_cache(cls, ['__get_dynamic_choices', key]) - if not force and value: - return value - choices = set() - splitted_key = key[len('data__'):].split('__') - q = cls.objects.filter( - data__has_key=key[len('data__'):]).values_list('data', flat=True) - for value in q.all(): - for k in splitted_key: - value = value[k] - choices.add(value) - choices = [('', '')] + [(v, v) for v in sorted(list(choices))] - cache.set(cache_key, choices, settings.CACHE_SMALLTIMEOUT) - return choices - - -class Imported(models.Model): - imports = models.ManyToManyField( - Import, blank=True, - related_name="imported_%(app_label)s_%(class)s") - - class Meta: - abstract = True - - -class SearchAltName(object): - def __init__(self, search_key, search_query, extra_query=None, - distinct_query=False): - self.search_key = search_key - self.search_query = search_query - self.extra_query = extra_query or {} - self.distinct_query = distinct_query - - -class DynamicRequest(object): - def __init__(self, label, app_name, model_name, form_key, search_key, - type_query, search_query): - self.label = label - self.form_key = form_key - self.search_key = search_key - self.app_name = app_name - self.model_name = model_name - self.type_query = type_query - self.search_query = search_query - - def get_all_types(self): - model = apps.get_app_config(self.app_name).get_model(self.model_name) - return model.objects.filter(available=True) - - def get_form_fields(self): - fields = {} - for item in self.get_all_types().all(): - fields[self.form_key + "-" + item.txt_idx] = forms.CharField( - label=str(self.label) + " " + str(item), - required=False - ) - return fields - - def get_extra_query(self, slug): - return { - self.type_query: slug - } - - def get_alt_names(self): - alt_names = {} - for item in self.get_all_types().all(): - alt_names[self.form_key + "-" + item.txt_idx] = SearchAltName( - self.search_key + "-" + item.txt_idx, self.search_query, - self.get_extra_query(item.txt_idx), distinct_query=True - ) - return alt_names - - -class SearchVectorConfig(object): - def __init__(self, key, language=None, func=None): - self.key = key - if language: - self.language = language - if language == "local": - self.language = settings.ISHTAR_SEARCH_LANGUAGE - else: - self.language = "simple" - self.func = func - - def format(self, value): - if value == 'None': - value = '' - if not self.func: - return [value] - return self.func(value) - - -class FullSearch(models.Model): - search_vector = SearchVectorField(_("Search vector"), blank=True, null=True, - help_text=_("Auto filled at save")) - - EXTRA_REQUEST_KEYS = {} - DYNAMIC_REQUESTS = {} - ALT_NAMES = {} - - BASE_SEARCH_VECTORS = [] - PROPERTY_SEARCH_VECTORS = [] - INT_SEARCH_VECTORS = [] - M2M_SEARCH_VECTORS = [] - PARENT_SEARCH_VECTORS = [] - # prevent circular dependency - PARENT_ONLY_SEARCH_VECTORS = [] - - class Meta: - abstract = True - - @classmethod - def general_types(cls): - for k in get_all_field_names(cls): - field = cls._meta.get_field(k) - if not hasattr(field, 'rel') or not field.rel: - continue - rel_model = field.rel.to - if issubclass(rel_model, (GeneralType, HierarchicalType)): - yield k - - @classmethod - def get_alt_names(cls): - alt_names = cls.ALT_NAMES.copy() - for dr_k in cls.DYNAMIC_REQUESTS: - alt_names.update(cls.DYNAMIC_REQUESTS[dr_k].get_alt_names()) - return alt_names - - @classmethod - def get_query_parameters(cls): - query_parameters = {} - for v in cls.get_alt_names().values(): - for language_code, language_lbl in settings.LANGUAGES: - activate(language_code) - query_parameters[str(v.search_key)] = v - deactivate() - return query_parameters - - def _update_search_field(self, search_vector_conf, search_vectors, data): - for value in search_vector_conf.format(data): - with connection.cursor() as cursor: - cursor.execute("SELECT to_tsvector(%s, %s)", [ - search_vector_conf.language, value]) - row = cursor.fetchone() - search_vectors.append(row[0]) - - def _update_search_number_field(self, search_vectors, val): - search_vectors.append("'{}':1".format(val)) - - def update_search_vector(self, save=True, exclude_parent=False): - """ - Update the search vector - :param save: True if you want to save the object immediately - :return: True if modified - """ - if not hasattr(self, 'search_vector'): - return - if not self.pk: - # logger.warning("Cannot update search vector before save or " - # "after deletion.") - return - if not self.BASE_SEARCH_VECTORS and not self.M2M_SEARCH_VECTORS \ - and not self.INT_SEARCH_VECTORS \ - and not self.PROPERTY_SEARCH_VECTORS \ - and not self.PARENT_SEARCH_VECTORS: - logger.warning("No search_vectors defined for {}".format( - self.__class__)) - return - if getattr(self, '_search_updated', None): - return - self._search_updated = True - - old_search = "" - if self.search_vector: - old_search = self.search_vector[:] - search_vectors = [] - base_q = self.__class__.objects.filter(pk=self.pk) - - # many to many have to be queried one by one otherwise only one is fetch - for m2m_search_vector in self.M2M_SEARCH_VECTORS: - key = m2m_search_vector.key.split('__')[0] - rel_key = getattr(self, key) - for item in rel_key.values('pk').all(): - query_dct = {key + "__pk": item['pk']} - q = copy.copy(base_q).filter(**query_dct) - q = q.annotate( - search=SearchVector( - m2m_search_vector.key, - config=m2m_search_vector.language) - ).values('search') - search_vectors.append(q.all()[0]['search']) - - # int/float are not well managed by the SearchVector - for int_search_vector in self.INT_SEARCH_VECTORS: - q = base_q.values(int_search_vector.key) - for val in int_search_vector.format( - q.all()[0][int_search_vector.key]): - self._update_search_number_field(search_vectors, val) - - if not exclude_parent: - # copy parent vector fields - for PARENT_SEARCH_VECTOR in self.PARENT_SEARCH_VECTORS: - parent = getattr(self, PARENT_SEARCH_VECTOR) - if hasattr(parent, 'all'): # m2m - for p in parent.all(): - search_vectors.append(p.search_vector) - elif parent: - search_vectors.append(parent.search_vector) - - for PARENT_ONLY_SEARCH_VECTOR in self.PARENT_ONLY_SEARCH_VECTORS: - parent = getattr(self, PARENT_ONLY_SEARCH_VECTOR) - if hasattr(parent, 'all'): # m2m - for p in parent.all(): - search_vectors.append( - p.update_search_vector(save=False, exclude_parent=True) - ) - elif parent: - search_vectors.append( - parent.update_search_vector(save=False, exclude_parent=True) - ) - - if self.BASE_SEARCH_VECTORS: - # query "simple" fields - q = base_q.values(*[sv.key for sv in self.BASE_SEARCH_VECTORS]) - res = q.all()[0] - for base_search_vector in self.BASE_SEARCH_VECTORS: - data = res[base_search_vector.key] - data = unidecode(str(data)) - self._update_search_field(base_search_vector, - search_vectors, data) - - if self.PROPERTY_SEARCH_VECTORS: - for property_search_vector in self.PROPERTY_SEARCH_VECTORS: - data = getattr(self, property_search_vector.key) - if callable(data): - data = data() - if not data: - continue - data = str(data) - self._update_search_field(property_search_vector, - search_vectors, data) - - if hasattr(self, 'data') and self.data: - content_type = ContentType.objects.get_for_model(self) - for json_field in JsonDataField.objects.filter( - content_type=content_type, - search_index=True).all(): - data = copy.deepcopy(self.data) - no_data = False - for key in json_field.key.split('__'): - if key not in data: - no_data = True - break - data = data[key] - if no_data or not data: - continue - - if json_field.value_type == 'B': - if data is True: - data = json_field.name - else: - continue - elif json_field.value_type in ('I', 'F'): - self._update_search_number_field(search_vectors, data) - continue - elif json_field.value_type == 'D': - # only index year - self._update_search_number_field(search_vectors, data.year) - continue - for lang in ("simple", settings.ISHTAR_SEARCH_LANGUAGE): - with connection.cursor() as cursor: - cursor.execute("SELECT to_tsvector(%s, %s)", - [lang, data]) - row = cursor.fetchone() - search_vectors.append(row[0]) - new_search_vector = merge_tsvectors(search_vectors) - changed = old_search != new_search_vector - self.search_vector = new_search_vector - if save and changed: - self.__class__.objects.filter(pk=self.pk).update( - search_vector=new_search_vector) - elif not save: - return new_search_vector - return changed - - -class FixAssociated(object): - ASSOCIATED = {} - - def fix_associated(self): - for key in self.ASSOCIATED: - item = getattr(self, key) - if not item: - continue - dct = self.ASSOCIATED[key] - for dct_key in dct: - subkey, ctype = dct_key - expected_values = dct[dct_key] - if not isinstance(expected_values, (list, tuple)): - expected_values = [expected_values] - if hasattr(ctype, "txt_idx"): - try: - expected_values = [ctype.objects.get(txt_idx=v) - for v in expected_values] - except ctype.DoesNotExist: - # type not yet initialized - return - current_vals = getattr(item, subkey) - is_many = False - if hasattr(current_vals, "all"): - is_many = True - current_vals = current_vals.all() - else: - current_vals = [current_vals] - is_ok = False - for current_val in current_vals: - if current_val in expected_values: - is_ok = True - break - if is_ok: - continue - # the first value is used - new_value = expected_values[0] - if is_many: - getattr(item, subkey).add(new_value) - else: - setattr(item, subkey, new_value) - - -class QRCodeItem(models.Model, ImageContainerModel): - HAS_QR_CODE = True - qrcode = models.ImageField(upload_to=get_image_path, blank=True, null=True, - max_length=255) - - class Meta: - abstract = True - - @property - def qrcode_path(self): - if not self.qrcode: - self.generate_qrcode() - if not self.qrcode: # error on qrcode generation - return "" - return self.qrcode.path - - def generate_qrcode(self, request=None, secure=True, tmpdir=None): - url = self.get_absolute_url() - site = Site.objects.get_current() - if request: - scheme = request.scheme - else: - if secure: - scheme = "https" - else: - scheme = "http" - url = scheme + "://" + site.domain + url - tiny_url = TinyUrl() - tiny_url.link = url - tiny_url.save() - short_url = scheme + "://" + site.domain + reverse( - 'tiny-redirect', args=[tiny_url.get_short_id()]) - qr = pyqrcode.create(short_url, version=settings.ISHTAR_QRCODE_VERSION) - tmpdir_created = False - if not tmpdir: - tmpdir = tempfile.mkdtemp("-qrcode") - tmpdir_created = True - filename = tmpdir + os.sep + 'qrcode.png' - qr.png(filename, scale=settings.ISHTAR_QRCODE_SCALE) - with open(filename, 'rb') as qrfile: - self.qrcode.save("qrcode.png", File(qrfile)) - self.skip_history_when_saving = True - self._no_move = True - self.save() - if tmpdir_created: - shutil.rmtree(tmpdir) - - -class DocumentItem(object): - ALT_NAMES = { - 'documents__image__isnull': - SearchAltName( - pgettext_lazy("key for text search", "has-image"), - 'documents__image__isnull'), - 'documents__associated_url__isnull': - SearchAltName( - pgettext_lazy("key for text search", "has-url"), - 'documents__associated_url__isnull'), - 'documents__associated_file__isnull': - SearchAltName( - pgettext_lazy("key for text search", "has-attached-file"), - 'documents__associated_file__isnull'), - } - - def public_representation(self): - images = [] - if getattr(self, "main_image", None): - images.append(self.main_image.public_representation()) - images += [ - image.public_representation() - for image in self.images_without_main_image.all() - ] - return {"images": images} - - @property - def images(self): - if not hasattr(self, 'documents'): - return Document.objects.none() - return self.documents.filter( - image__isnull=False).exclude(image="").order_by("pk") - - @property - def images_without_main_image(self): - if not hasattr(self, 'main_image') or not hasattr(self, 'documents'): - return self.images - if not self.main_image: - return self.documents.filter( - image__isnull=False).exclude( - image="").order_by("pk") - return self.documents.filter( - image__isnull=False).exclude( - image="").exclude(pk=self.main_image.pk).order_by("pk") - - def get_extra_actions(self, request): - """ - For sheet template: return "Add document / image" action - """ - # url, base_text, icon, extra_text, extra css class, is a quick action - try: - actions = super(DocumentItem, self).get_extra_actions(request) - except AttributeError: - actions = [] - - if not hasattr(self, 'SLUG'): - return actions - - can_add_doc = self.can_do(request, 'add_document') - if can_add_doc and ( - not hasattr(self, "is_locked") or - not self.is_locked(request.user)): - actions += [ - ( - reverse("create-document") + "?{}={}".format( - self.SLUG, self.pk), - _("Add document/image"), - "fa fa-plus", - _("doc./image"), - "", - False - ) - ] - return actions - - -class SpatialReferenceSystem(GeneralType): - order = models.IntegerField(_("Order"), default=10) - auth_name = models.CharField( - _("Authority name"), default=u'EPSG', max_length=256) - srid = models.IntegerField(_("Authority SRID")) - - class Meta: - verbose_name = _("Spatial reference system") - verbose_name_plural = _("Spatial reference systems") - ordering = ('label',) - - -post_save.connect(post_save_cache, sender=SpatialReferenceSystem) -post_delete.connect(post_save_cache, sender=SpatialReferenceSystem) - - -class GeoItem(models.Model): - GEO_SOURCE = ( - ('T', _("Town")), ('P', _("Precise")), ('M', _("Polygon")) - ) - - # gis - x = models.FloatField(_(u'X'), blank=True, null=True) - y = models.FloatField(_(u'Y'), blank=True, null=True) - z = models.FloatField(_(u'Z'), blank=True, null=True) - estimated_error_x = models.FloatField(_(u'Estimated error for X'), - blank=True, null=True) - estimated_error_y = models.FloatField(_(u'Estimated error for Y'), - blank=True, null=True) - estimated_error_z = models.FloatField(_(u'Estimated error for Z'), - blank=True, null=True) - spatial_reference_system = models.ForeignKey( - SpatialReferenceSystem, verbose_name=_("Spatial Reference System"), - blank=True, null=True) - point = models.PointField(_("Point"), blank=True, null=True, dim=3) - point_2d = models.PointField(_("Point (2D)"), blank=True, null=True) - point_source = models.CharField( - _("Point source"), choices=GEO_SOURCE, max_length=1, blank=True, - null=True) - point_source_item = models.CharField( - _("Point source item"), max_length=100, blank=True, null=True) - multi_polygon = models.MultiPolygonField(_("Multi polygon"), blank=True, - null=True) - multi_polygon_source = models.CharField( - _("Multi-polygon source"), choices=GEO_SOURCE, max_length=1, - blank=True, null=True) - multi_polygon_source_item = models.CharField( - _("Multi polygon source item"), max_length=100, blank=True, null=True) - - GEO_LABEL = "" - - class Meta: - abstract = True - - def get_town_centroid(self): - raise NotImplementedError - - def get_town_polygons(self): - raise NotImplementedError - - @property - def display_coordinates(self): - if not self.point_2d: - return "" - profile = get_current_profile() - if not profile.display_srs or not profile.display_srs.srid: - return self.x, self.y - point = self.point_2d.transform(profile.display_srs.srid, clone=True) - return round(point.x, 5), round(point.y, 5) - - @property - def display_spatial_reference_system(self): - profile = get_current_profile() - if not profile.display_srs or not profile.display_srs.srid: - return self.spatial_reference_system - return profile.display_srs - - def get_precise_points(self): - if self.point_source == 'P' and self.point_2d: - return self.point_2d, self.point, self.point_source_item - - def get_precise_polygons(self): - if self.multi_polygon_source == 'P' and self.multi_polygon: - return self.multi_polygon, self.multi_polygon_source_item - - def most_precise_geo(self): - if self.point_source == 'M': - return 'multi_polygon' - current_source = str(self.__class__._meta.verbose_name) - if self.multi_polygon_source_item == current_source \ - and (self.multi_polygon_source == "P" or - self.point_source_item != current_source): - return 'multi_polygon' - if self.point_source_item == current_source\ - and self.point_source == 'P': - return 'point' - if self.multi_polygon_source == 'P': - return 'multi_polygon' - if self.point_source == 'P': - return 'point' - if self.multi_polygon: - return 'multi_polygon' - if self.point_2d: - return 'point' - - def geo_point_source(self): - if not self.point_source: - return "" - src = "{} - {}".format( - dict(self.GEO_SOURCE)[self.point_source], - self.point_source_item - ) - return src - - def geo_polygon_source(self): - if not self.multi_polygon_source: - return "" - src = "{} - {}".format( - dict(self.GEO_SOURCE)[self.multi_polygon_source], - self.multi_polygon_source_item - ) - return src - - def _geojson_serialize(self, geom_attr): - if not hasattr(self, geom_attr): - return "" - cached_label_key = 'cached_label' - if self.GEO_LABEL: - cached_label_key = self.GEO_LABEL - if getattr(self, "CACHED_LABELS", None): - cached_label_key = self.CACHED_LABELS[-1] - geojson = serialize( - 'geojson', - self.__class__.objects.filter(pk=self.pk), - geometry_field=geom_attr, fields=(cached_label_key,)) - geojson_dct = json.loads(geojson) - profile = get_current_profile() - precision = profile.point_precision - - features = geojson_dct.pop('features') - for idx in range(len(features)): - feature = features[idx] - lbl = feature['properties'].pop(cached_label_key) - feature['properties']['name'] = lbl - feature['properties']['id'] = self.pk - if precision is not None: - geom_type = feature["geometry"].get("type", None) - if geom_type == "Point": - feature["geometry"]["coordinates"] = [ - round(coord, precision) - for coord in feature["geometry"]["coordinates"] - ] - geojson_dct['features'] = features - geojson_dct['link_template'] = simple_link_to_window(self).replace( - '999999', '<pk>' - ) - geojson = json.dumps(geojson_dct) - return geojson - - @property - def point_2d_geojson(self): - return self._geojson_serialize('point_2d') - - @property - def multi_polygon_geojson(self): - return self._geojson_serialize('multi_polygon') - - -class TemplateItem: - @classmethod - def _label_templates_q(cls): - model_name = "{}.{}".format( - cls.__module__, cls.__name__) - q = Q(associated_model__klass=model_name, - for_labels=True, available=True) - alt_model_name = model_name.replace( - "models_finds", "models").replace( - "models_treatments", "models") - if alt_model_name != model_name: - q |= Q(associated_model__klass=model_name, - for_labels=True, available=True) - return DocumentTemplate.objects.filter(q) - - @classmethod - def has_label_templates(cls): - return cls._label_templates_q().count() - - @classmethod - def label_templates(cls): - return cls._label_templates_q() - - def get_extra_templates(self, request): - cls = self.__class__ - templates = [] - name = str(cls.__name__) - module = str(cls.__module__) - if "archaeological_finds" in module: - if "models_finds" in name or "models_treatments" in name: - names = [ - name, - name.replace("models_finds", "models" - ).replace("models_treatments", "models") - ] - else: - names = [name, name.replace("models", "models_finds"), - name.replace("models", "models_treatments")] - else: - names = [name] - model_names = [ - "{}.{}".format(module, name) for name in names - ] - q = DocumentTemplate.objects.filter( - associated_model__klass__in=model_names, - for_labels=False, available=True) - for template in q.all(): - urlname = "generate-document" - templates.append( - (template.name, reverse( - urlname, args=[template.slug, self.pk])) - ) - return templates - - -class StatisticItem: - STATISTIC_MODALITIES = [] # example: "year", "operation_type__label" - STATISTIC_MODALITIES_OPTIONS = OrderedDict() # example: - # OrderedDict([('year', _("Year")), - # ("operation_type__label", _("Operation type"))]) - STATISTIC_SUM_VARIABLE = OrderedDict( - (("pk", (_("Number"), 1)),) - ) # example: "Price", "Volume" - the number is a multiplier - - -class CascasdeUpdate: - DOWN_MODEL_UPDATE = [] - - def cascade_update(self): - for down_model in self.DOWN_MODEL_UPDATE: - if not settings.USE_BACKGROUND_TASK: - rel = getattr(self, down_model) - if hasattr(rel.model, "need_update"): - rel.update(need_update=True) - continue - for item in getattr(self, down_model).all(): - cached_label_changed(item.__class__, instance=item) - if hasattr(item, "point_2d"): - post_save_geo(item.__class__, instance=item) - - -def duplicate_item(item, user=None, data=None): - model = item.__class__ - new = model.objects.get(pk=item.pk) - - for field in model._meta.fields: - # pk is in PRIVATE_FIELDS so: new.pk = None and a new - # item will be created on save - if field.name == "uuid": - new.uuid = uuid.uuid4() - elif field.name in PRIVATE_FIELDS: - setattr(new, field.name, None) - if user: - new.history_user = user - if data: - for k in data: - setattr(new, k, data[k]) - new.save() - - # m2m fields - m2m = [field.name for field in model._meta.many_to_many - if field.name not in PRIVATE_FIELDS] - for field in m2m: - for val in getattr(item, field).all(): - if val not in getattr(new, field).all(): - getattr(new, field).add(val) - return new - - -class BaseHistorizedItem(StatisticItem, TemplateItem, FullSearch, Imported, - JsonData, FixAssociated, CascasdeUpdate): - """ - Historized item with external ID management. - All historized items are searchable and have a data json field. - Historized items can be "locked" for edition. - """ - IS_BASKET = False - SHOW_URL = None - EXTERNAL_ID_KEY = '' - EXTERNAL_ID_DEPENDENCIES = [] - HISTORICAL_M2M = [] - - history_modifier = models.ForeignKey( - User, related_name='+', on_delete=models.SET_NULL, - verbose_name=_("Last editor"), blank=True, null=True) - history_creator = models.ForeignKey( - User, related_name='+', on_delete=models.SET_NULL, - verbose_name=_("Creator"), blank=True, null=True) - last_modified = models.DateTimeField(auto_now=True) - history_m2m = JSONField(default={}, blank=True) - need_update = models.BooleanField( - verbose_name=_("Need update"), default=False) - locked = models.BooleanField( - verbose_name=_("Item locked for edition"), default=False) - lock_user = models.ForeignKey( - User, related_name='+', on_delete=models.SET_NULL, - verbose_name=_("Locked by"), blank=True, null=True) - - ALT_NAMES = { - 'history_creator': SearchAltName( - pgettext_lazy("key for text search", u"created-by"), - 'history_creator__ishtaruser__person__cached_label__iexact' - ), - 'history_modifier': SearchAltName( - pgettext_lazy("key for text search", u"modified-by"), - 'history_modifier__ishtaruser__person__cached_label__iexact' - ), - 'modified_before': SearchAltName( - pgettext_lazy("key for text search", "modified-before"), - 'last_modified__lte' - ), - 'modified_after': SearchAltName( - pgettext_lazy("key for text search", "modified-after"), - 'last_modified__gte' - ), - } - - class Meta: - abstract = True - - @classmethod - def get_verbose_name(cls): - return cls._meta.verbose_name - - def is_locked(self, user=None): - if not user: - return self.locked - return self.locked and (not self.lock_user or self.lock_user != user) - - def merge(self, item, keep_old=False): - merge_model_objects(self, item, keep_old=keep_old) - - def public_representation(self): - return {} - - def duplicate(self, user=None, data=None): - return duplicate_item(self, user, data) - - def update_external_id(self, save=False): - if not self.EXTERNAL_ID_KEY or ( - self.external_id and - not getattr(self, 'auto_external_id', False)): - return - external_id = get_external_id(self.EXTERNAL_ID_KEY, self) - if external_id == self.external_id: - return - self.auto_external_id = True - self.external_id = external_id - self._cached_label_checked = False - if save: - self.skip_history_when_saving = True - self.save() - return external_id - - def get_last_history_date(self): - q = self.history.values("history_date").order_by('-history_date') - if not q.count(): - return - return q.all()[0]['history_date'] - - def get_previous(self, step=None, date=None, strict=False): - """ - Get a "step" previous state of the item - """ - assert step or date - historized = self.history.all() - item = None - if step: - if len(historized) <= step: - # silently return the last step if too far in the history - item = historized[len(historized) - 1] - else: - item = historized[step] - else: - for step, item in enumerate(historized): - if item.history_date == date: - break - # ended with no match - if item.history_date != date: - return - item._step = step - if len(historized) != (step + 1): - item._previous = historized[step + 1].history_date - else: - item._previous = None - if step > 0: - item._next = historized[step - 1].history_date - else: - item._next = None - item.history_date = historized[step].history_date - model = self.__class__ - for k in get_all_field_names(model): - field = model._meta.get_field(k) - if hasattr(field, 'rel') and field.rel: - if not hasattr(item, k + '_id'): - setattr(item, k, getattr(self, k)) - continue - val = getattr(item, k + '_id') - if not val: - setattr(item, k, None) - continue - try: - val = field.rel.to.objects.get(pk=val) - setattr(item, k, val) - except ObjectDoesNotExist: - if strict: - raise HistoryError("The class %s has no pk %d" % ( - str(field.rel.to), val)) - setattr(item, k, None) - item.pk = self.pk - return item - - @property - def last_edition_date(self): - try: - return self.history.order_by('-history_date').all()[0].history_date - except (AttributeError, IndexError): - return - - @property - def history_creation_date(self): - try: - return self.history.order_by('history_date').all()[0].history_date - except (AttributeError, IndexError): - return - - def rollback(self, date): - """ - Rollback to a previous state - """ - to_del, new_item = [], None - for item in self.history.all(): - if item.history_date == date: - new_item = item - break - to_del.append(item) - if not new_item: - raise HistoryError("The date to rollback to doesn't exist.") - try: - field_keys = [f.name for f in self._meta.fields] - for k in field_keys: - if k != 'id' and hasattr(self, k): - if not hasattr(new_item, k): - k = k + "_id" - setattr(self, k, getattr(new_item, k)) - - try: - self.history_modifier = User.objects.get( - pk=new_item.history_modifier_id) - except User.ObjectDoesNotExist: - pass - self.save() - saved_m2m = new_item.history_m2m.copy() - for hist_key in self.HISTORICAL_M2M: - # after each association m2m is rewrite - force the original - # to be reset - new_item.history_m2m = saved_m2m - values = new_item.m2m_listing(hist_key, create=True) or [] - hist_field = getattr(self, hist_key) - hist_field.clear() - for val in values: - hist_field.add(val) - # force label regeneration - self._cached_label_checked = False - self.save() - except ObjectDoesNotExist: - raise HistoryError("The rollback has failed.") - # clean the obsolete history - for historized_item in to_del: - historized_item.delete() - - def m2m_listing(self, key): - return getattr(self, key).all() - - def values(self): - values = {} - for f in self._meta.fields: - k = f.name - if k != 'id': - values[k] = getattr(self, k) - return values - - def get_absolute_url(self): - try: - return reverse('display-item', args=[self.SLUG, self.pk]) - except NoReverseMatch: - return - - def get_show_url(self): - show_url = self.SHOW_URL - if not show_url: - show_url = 'show-' + self.__class__.__name__.lower() - try: - return reverse(show_url, args=[self.pk, '']) - except NoReverseMatch: - return - - @property - def associated_filename(self): - if [True for attr in ('get_town_label', 'get_department', 'reference', - 'short_class_name') if not hasattr(self, attr)]: - return '' - items = [slugify(self.get_department()), - slugify(self.get_town_label()).upper(), - slugify(self.short_class_name), - slugify(self.reference), - slugify(self.name or '').replace('-', '_').capitalize()] - last_edition_date = self.last_edition_date - if last_edition_date: - items.append(last_edition_date.strftime('%Y%m%d')) - else: - items.append('00000000') - return "-".join([str(item) for item in items]) - - def save(self, *args, **kwargs): - created = not self.pk - if not getattr(self, 'skip_history_when_saving', False): - assert hasattr(self, 'history_modifier') - if created: - self.history_creator = self.history_modifier - # external ID can have related item not available before save - external_id_updated = kwargs.pop('external_id_updated') \ - if 'external_id_updated' in kwargs else False - if not created and not external_id_updated: - self.update_external_id() - super(BaseHistorizedItem, self).save(*args, **kwargs) - if created and self.update_external_id(): - # force resave for external ID creation - self.skip_history_when_saving = True - self._updated_id = True - return self.save(external_id_updated=True) - for dep in self.EXTERNAL_ID_DEPENDENCIES: - for obj in getattr(self, dep).all(): - obj.update_external_id(save=True) - self.fix_associated() - return True - - LOGICAL_TYPES = ( ('above', _("Above")), ('below', _("Below")), @@ -2623,207 +689,10 @@ class SearchQuery(models.Model): return str(self.label) -class ShortMenuItem(object): - """ - Item available in the short menu - """ - UP_MODEL_QUERY = {} - - @classmethod - def get_short_menu_class(cls, pk): - return '' - - @property - def short_class_name(self): - return "" - - -class QuickAction(object): - """ - Quick action available from tables - """ - - def __init__(self, url, icon_class='', text='', target=None, rights=None, - module=None): - self.url = url - self.icon_class = icon_class - self.text = text - self.rights = rights - self.target = target - self.module = module - assert self.target in ('one', 'many', None) - - def is_available(self, user, session=None, obj=None): - if self.module and not getattr(get_current_profile(), self.module): - return False - if not self.rights: # no restriction - return True - if not user or not hasattr(user, 'ishtaruser') or not user.ishtaruser: - return False - user = user.ishtaruser - - for right in self.rights: - if user.has_perm(right, session=session, obj=obj): - return True - return False - - @property - def rendered_icon(self): - if not self.icon_class: - return "" - return "<i class='{}' aria-hidden='true'></i>".format(self.icon_class) - - @property - def base_url(self): - if self.target is None: - url = reverse(self.url) - else: - # put arbitrary pk for the target - url = reverse(self.url, args=[0]) - url = url[:-2] # all quick action url have to finish with the - # pk of the selected item and a "/" - return url - - -class MainItem(ShortMenuItem): - """ - Item with quick actions available from tables - Extra actions are available from sheets - """ - QUICK_ACTIONS = [] - - @classmethod - def get_quick_actions(cls, user, session=None, obj=None): - """ - Get a list of (url, title, icon, target) actions for an user - """ - qas = [] - for action in cls.QUICK_ACTIONS: - if not action.is_available(user, session=session, obj=obj): - continue - qas.append([action.base_url, - mark_safe(action.text), - mark_safe(action.rendered_icon), - action.target or ""]) - return qas - - @classmethod - def get_quick_action_by_url(cls, url): - for action in cls.QUICK_ACTIONS: - if action.url == url: - return action - - def regenerate_external_id(self): - if not hasattr(self, "external_id"): - return - self.skip_history_when_saving = True - self._no_move = True - if hasattr(self, "auto_external_id"): - self.external_id = None - self.save() - - def get_extra_actions(self, request): - if not hasattr(self, 'SLUG'): - return [] - - actions = [] - if request.user.is_superuser and hasattr(self, "auto_external_id"): - actions += [ - ( - reverse("regenerate-external-id") + "?{}={}".format( - self.SLUG, self.pk), - _("Regenerate ID"), - "fa fa-key", - _("regen."), - "", - True - ) - ] - - return actions - - -class LightHistorizedItem(BaseHistorizedItem): - history_date = models.DateTimeField(default=datetime.datetime.now) - - class Meta: - abstract = True - - def save(self, *args, **kwargs): - super(LightHistorizedItem, self).save(*args, **kwargs) - return self - - -PARSE_FORMULA = re.compile("{([^}]*)}") - - -def _deduplicate(value): - new_values = [] - for v in value.split(u'-'): - if v not in new_values: - new_values.append(v) - return u'-'.join(new_values) - - -FORMULA_FILTERS = { - 'upper': lambda x: x.upper(), - 'lower': lambda x: x.lower(), - 'capitalize': lambda x: x.capitalize(), - 'slug': lambda x: slugify(x), - 'deduplicate': _deduplicate -} - - -def get_external_id(key, item): - profile = get_current_profile() - if not hasattr(profile, key): - return - formula = getattr(profile, key) - dct = {} - for fkey in PARSE_FORMULA.findall(formula): - filtered = fkey.split('|') - initial_key = fkey[:] - fkey = filtered[0] - filters = [] - for filtr in filtered[1:]: - if filtr in FORMULA_FILTERS: - filters.append(FORMULA_FILTERS[filtr]) - if fkey.startswith('settings__'): - dct[fkey] = getattr(settings, fkey[len('settings__'):]) or '' - continue - obj = item - for k in fkey.split('__'): - try: - obj = getattr(obj, k) - except ObjectDoesNotExist: - obj = None - if hasattr(obj, 'all') and hasattr(obj, 'count'): # query manager - if not obj.count(): - break - obj = obj.all()[0] - elif callable(obj): - obj = obj() - if obj is None: - break - if obj is None: - dct[initial_key] = '' - else: - dct[initial_key] = str(obj) - for filtr in filters: - dct[initial_key] = filtr(dct[initial_key]) - values = formula.format(**dct).split('||') - value = values[0] - for filtr in values[1:]: - if filtr not in FORMULA_FILTERS: - value += '||' + filtr - continue - value = FORMULA_FILTERS[filtr](value) - return value - - class Language(GeneralType): iso_code = models.CharField(_("ISO code"), null=True, blank=True, max_length=2) + class Meta: verbose_name = _("Language") verbose_name_plural = _("Languages") @@ -3109,10 +978,6 @@ class IshtarSiteProfile(models.Model, Cached): return obj -def get_current_profile(force=False): - return IshtarSiteProfile.get_current_profile(force=force) - - def _profile_mapping(): return get_current_profile().mapping @@ -3357,141 +1222,6 @@ class StatsCache(models.Model): verbose_name_plural = _("Caches for stats") -def update_stats(statscache, item, funcname): - if not settings.USE_BACKGROUND_TASK: - current_values = statscache.values - if not current_values: - current_values = {} - value = getattr(item, funcname)() - current_values[funcname] = value - statscache.values = current_values - statscache.updated = datetime.datetime.now() - statscache.save() - return current_values - - now = datetime.datetime.now() - app_name = item._meta.app_label - model_name = item._meta.model_name - statscache.update_requested = now.isoformat() - statscache.save() - _update_stats.delay(app_name, model_name, item.pk, funcname) - return statscache.values - - -def __get_stats_cache_values(model_name, model_pk): - q = StatsCache.objects.filter( - model=model_name, model_pk=model_pk - ) - nb = q.count() - if nb >= 1: - sc = q.all()[0] - for extra in q.order_by("-id").all()[1:]: - extra.delete() - else: - sc = StatsCache.objects.create( - model=model_name, model_pk=model_pk - ) - values = sc.values - if not values: - values = {} - return sc, values - - -@task() -def _update_stats(app, model, model_pk, funcname): - model_name = app + "." + model - model = apps.get_model(app, model) - try: - item = model.objects.get(pk=model_pk) - except model.DoesNotExist: - return - value = getattr(item, funcname)() - sc, current_values = __get_stats_cache_values(model_name, model_pk) - current_values[funcname] = value - sc.values = current_values - sc.update_requested = None - sc.updated = datetime.datetime.now() - sc.save() - - -class DashboardFormItem(object): - """ - Provide methods to manage statistics - """ - - def last_stats_update(self): - model_name = self._meta.app_label + "." + self._meta.model_name - q = StatsCache.objects.filter( - model=model_name, model_pk=self.pk).order_by("-updated") - if not q.count(): - return - return q.all()[0].updated - - def _get_or_set_stats(self, funcname, update=False, - expected_type=None): - model_name = self._meta.app_label + "." + self._meta.model_name - sc, __ = StatsCache.objects.get_or_create( - model=model_name, model_pk=self.pk - ) - if not update: - values = sc.values - if funcname not in values: - if expected_type is not None: - return expected_type() - return 0 - else: - values = update_stats(sc, self, funcname) - if funcname in values: - values = values[funcname] - else: - values = 0 - if expected_type is not None and not isinstance(values, expected_type): - return expected_type() - return values - - @classmethod - def get_periods(cls, slice='month', fltr={}, date_source='creation'): - date_var = date_source + '_date' - q = cls.objects.filter(**{date_var + '__isnull': False}) - if fltr: - q = q.filter(**fltr) - if slice == 'year': - return [res[date_var].year for res in list(q.values(date_var) - .annotate( - Count("id")).order_by())] - elif slice == 'month': - return [(res[date_var].year, res[date_var].month) - for res in list(q.values(date_var) - .annotate(Count("id")).order_by())] - return [] - - @classmethod - def get_by_year(cls, year, fltr={}, date_source='creation'): - date_var = date_source + '_date' - q = cls.objects.filter(**{date_var + '__isnull': False}) - if fltr: - q = q.filter(**fltr) - return q.filter( - **{date_var + '__year': year}).order_by('pk').distinct('pk') - - @classmethod - def get_by_month(cls, year, month, fltr={}, date_source='creation'): - date_var = date_source + '_date' - q = cls.objects.filter(**{date_var + '__isnull': False}) - if fltr: - q = q.filter(**fltr) - q = q.filter( - **{date_var + '__year': year, date_var + '__month': month}) - return q.order_by('pk').distinct('pk') - - @classmethod - def get_total_number(cls, fltr=None): - q = cls.objects - if fltr: - q = q.filter(**fltr) - return q.order_by('pk').distinct('pk').count() - - class Dashboard(object): def __init__(self, model, slice='year', date_source=None, show_detail=None, fltr=None): @@ -3780,254 +1510,6 @@ class DocumentTemplate(models.Model): return output_name -class NumberManager(models.Manager): - def get_by_natural_key(self, number): - return self.get(number=number) - - -class State(models.Model): - label = models.CharField(_("Label"), max_length=30) - number = models.CharField(_("Number"), unique=True, max_length=3) - objects = NumberManager() - - class Meta: - verbose_name = _("State") - ordering = ['number'] - - def __str__(self): - return self.label - - def natural_key(self): - return (self.number,) - - -class Department(models.Model): - label = models.CharField(_("Label"), max_length=30) - number = models.CharField(_("Number"), unique=True, max_length=3) - state = models.ForeignKey( - 'State', verbose_name=_("State"), blank=True, null=True, - on_delete=models.SET_NULL, - ) - objects = NumberManager() - - class Meta: - verbose_name = _("Department") - verbose_name_plural = _("Departments") - ordering = ['number'] - - def __str__(self): - return self.label - - def natural_key(self): - return (self.number,) - - def history_compress(self): - return self.number - - @classmethod - def history_decompress(cls, full_value, create=False): - if not full_value: - return [] - res = [] - for value in full_value: - try: - res.append(cls.objects.get(number=value)) - except cls.DoesNotExist: - continue - return res - - -class Arrondissement(models.Model): - name = models.CharField("Nom", max_length=30) - department = models.ForeignKey(Department, verbose_name="Département") - - def __str__(self): - return settings.JOINT.join((self.name, str(self.department))) - - -class Canton(models.Model): - name = models.CharField("Nom", max_length=30) - arrondissement = models.ForeignKey(Arrondissement, - verbose_name="Arrondissement") - - def __str__(self): - return settings.JOINT.join( - (self.name, str(self.arrondissement))) - - -class TownManager(models.GeoManager): - def get_by_natural_key(self, numero_insee, year): - return self.get(numero_insee=numero_insee, year=year) - - -class Town(Imported, models.Model): - name = models.CharField(_("Name"), max_length=100) - surface = models.IntegerField(_("Surface (m2)"), blank=True, null=True) - center = models.PointField(_("Localisation"), srid=settings.SRID, - blank=True, null=True) - limit = models.MultiPolygonField(_("Limit"), blank=True, null=True) - numero_insee = models.CharField("Code commune (numéro INSEE)", - max_length=120) - departement = models.ForeignKey( - Department, verbose_name=_("Department"), - on_delete=models.SET_NULL, null=True, blank=True) - year = models.IntegerField( - _("Year of creation"), null=True, blank=True, - help_text=_("Filling this field is relevant to distinguish old towns " - "from new towns.")) - children = models.ManyToManyField( - 'Town', verbose_name=_("Town children"), blank=True, - related_name='parents') - cached_label = models.CharField(_("Cached name"), max_length=500, - null=True, blank=True, db_index=True) - objects = TownManager() - - class Meta: - verbose_name = _("Town") - verbose_name_plural = _("Towns") - if settings.COUNTRY == 'fr': - ordering = ['numero_insee'] - unique_together = (('numero_insee', 'year'),) - - def natural_key(self): - return (self.numero_insee, self.year) - - def history_compress(self): - values = {'numero_insee': self.numero_insee, - 'year': self.year or ""} - return values - - def get_values(self, prefix='', no_values=False, filtr=None, **kwargs): - values = {} - if not filtr or prefix in filtr or "label" in filtr: - if prefix: - values[prefix] = str(self) - else: - values['label'] = str(self) - if not filtr or prefix + "name" in filtr: - values[prefix + "name"] = self.name - if not filtr or prefix + "numero_insee" in filtr: - values[prefix + "numero_insee"] = self.numero_insee - return values - - @classmethod - def history_decompress(cls, full_value, create=False): - if not full_value: - return [] - res = [] - for value in full_value: - try: - res.append( - cls.objects.get(numero_insee=value['numero_insee'], - year=value['year'] or None)) - except cls.DoesNotExist: - continue - return res - - def __str__(self): - return self.cached_label or "" - - @property - def label_with_areas(self): - label = [self.name] - if self.numero_insee: - label.append("({})".format(self.numero_insee)) - for area in self.areas.all(): - label.append(" - ") - label.append(area.full_label) - return " ".join(label) - - def generate_geo(self, force=False): - force = self.generate_limit(force=force) - self.generate_center(force=force) - self.generate_area(force=force) - - def generate_limit(self, force=False): - if not force and self.limit: - return - parents = None - if not self.parents.count(): - return - for parent in self.parents.all(): - if not parent.limit: - return - if not parents: - parents = parent.limit - else: - parents = parents.union(parent.limit) - # if union is a simple polygon make it a multi - if 'MULTI' not in parents.wkt: - parents = parents.wkt.replace('POLYGON', 'MULTIPOLYGON(') + ")" - if not parents: - return - self.limit = parents - self.save() - return True - - def generate_center(self, force=False): - if not force and (self.center or not self.limit): - return - self.center = self.limit.centroid - if not self.center: - return False - self.save() - return True - - def generate_area(self, force=False): - if not force and (self.surface or not self.limit): - return - surface = self.limit.transform(settings.SURFACE_SRID, - clone=True).area - if surface > 214748364 or not surface: - return False - self.surface = surface - self.save() - return True - - def update_town_code(self): - if not self.numero_insee or not self.children.count() or not self.year: - return - old_num = self.numero_insee[:] - numero = old_num.split('-')[0] - self.numero_insee = "{}-{}".format(numero, self.year) - if self.numero_insee != old_num: - return True - - def _generate_cached_label(self): - cached_label = self.name - if settings.COUNTRY == "fr" and self.numero_insee: - dpt_len = 2 - if self.numero_insee.startswith('97') or \ - self.numero_insee.startswith('98') or \ - self.numero_insee[0] not in ('0', '1', '2', '3', '4', '5', - '6', '7', '8', '9'): - dpt_len = 3 - cached_label = "%s - %s" % (self.name, self.numero_insee[:dpt_len]) - if self.year and self.children.count(): - cached_label += " ({})".format(self.year) - return cached_label - - -def post_save_town(sender, **kwargs): - cached_label_changed(sender, **kwargs) - town = kwargs['instance'] - town.generate_geo() - if town.update_town_code(): - town.save() - - -post_save.connect(post_save_town, sender=Town) - - -def town_child_changed(sender, **kwargs): - town = kwargs['instance'] - if town.update_town_code(): - town.save() - - -m2m_changed.connect(town_child_changed, sender=Town.children.through) - - class Area(HierarchicalType): towns = models.ManyToManyField(Town, verbose_name=_("Towns"), blank=True, related_name='areas') @@ -4057,261 +1539,6 @@ class Area(HierarchicalType): return " / ".join(label) -class Address(BaseHistorizedItem): - FIELDS = ( - "address", "address_complement", "postal_code", "town", - "precise_town", "country", - "alt_address", "alt_address_complement", "alt_postal_code", "alt_town", - "alt_country", - "phone", "phone_desc", "phone2", "phone_desc2", "phone3", "phone_desc3", - "raw_phone", "mobile_phone", "email", "alt_address_is_prefered" - ) - address = models.TextField(_("Address"), null=True, blank=True) - address_complement = models.TextField(_("Address complement"), null=True, - blank=True) - postal_code = models.CharField(_("Postal code"), max_length=10, null=True, - blank=True) - town = models.CharField(_("Town (freeform)"), max_length=150, null=True, - blank=True) - precise_town = models.ForeignKey( - Town, verbose_name=_("Town (precise)"), null=True, blank=True) - country = models.CharField(_("Country"), max_length=30, null=True, - blank=True) - alt_address = models.TextField(_("Other address: address"), null=True, - blank=True) - alt_address_complement = models.TextField( - _("Other address: address complement"), null=True, blank=True) - alt_postal_code = models.CharField(_("Other address: postal code"), - max_length=10, null=True, blank=True) - alt_town = models.CharField(_("Other address: town"), max_length=70, - null=True, blank=True) - alt_country = models.CharField(_("Other address: country"), - max_length=30, null=True, blank=True) - phone = models.CharField(_("Phone"), max_length=18, null=True, blank=True) - phone_desc = models.CharField(_("Phone description"), max_length=300, - null=True, blank=True) - phone2 = models.CharField(_("Phone description 2"), max_length=18, - null=True, blank=True) - phone_desc2 = models.CharField(_("Phone description 2"), max_length=300, - null=True, blank=True) - phone3 = models.CharField(_("Phone 3"), max_length=18, null=True, - blank=True) - phone_desc3 = models.CharField(_("Phone description 3"), max_length=300, - null=True, blank=True) - raw_phone = models.TextField(_("Raw phone"), blank=True, null=True) - mobile_phone = models.CharField(_("Mobile phone"), max_length=18, - null=True, blank=True) - email = models.EmailField( - _("Email"), max_length=300, blank=True, null=True) - alt_address_is_prefered = models.BooleanField( - _("Alternative address is prefered"), default=False) - SUB_ADDRESSES = [] - - class Meta: - abstract = True - - def get_short_html_items(self): - items = [] - if self.address: - items.append( - """<span class="subadress">{}</span>""".format(self.address)) - if self.address_complement: - items.append( - """<span class="subadress-complement">{}</span>""".format( - self.address_complement)) - if self.postal_code: - items.append( - """<span class="postal-code">{}</span>""".format( - self.postal_code)) - if self.precise_town: - items.append( - """<span class="town">{}</span>""".format( - self.precise_town.name)) - elif self.town: - items.append( - """<span class="town">{}</span>""".format( - self.town)) - if self.country: - items.append( - """<span class="country">{}</span>""".format( - self.country)) - return items - - def get_short_html_detail(self): - html = """<div class="address">""" - items = self.get_short_html_items() - if not items: - items = [ - "<span class='no-address'>{}</span>".format( - _("No associated address") - ) - ] - html += "".join(items) - html += """</div>""" - return html - - def get_town_centroid(self): - if self.precise_town: - return self.precise_town.center, self._meta.verbose_name - for sub_address in self.SUB_ADDRESSES: - sub_item = getattr(self, sub_address) - if sub_item and sub_item.precise_town: - return sub_item.precise_town.center, sub_item._meta.verbose_name - - def get_town_polygons(self): - if self.precise_town: - return self.precise_town.limit, self._meta.verbose_name - for sub_address in self.SUB_ADDRESSES: - sub_item = getattr(self, sub_address) - if sub_item and sub_item.precise_town: - return sub_item.precise_town.limit, sub_item._meta.verbose_name - - def get_attribute(self, attr): - if self.town or self.precise_town: - return getattr(self, attr) - for sub_address in self.SUB_ADDRESSES: - sub_item = getattr(self, sub_address) - if not sub_item: - continue - if sub_item.town or sub_item.precise_town: - return getattr(sub_item, attr) - return getattr(self, attr) - - def get_address(self): - return self.get_attribute("address") - - def get_address_complement(self): - return self.get_attribute("address_complement") - - def get_postal_code(self): - return self.get_attribute("postal_code") - - def get_town(self): - return self.get_attribute("town") - - def get_precise_town(self): - return self.get_attribute("precise_town") - - def get_country(self): - return self.get_attribute("country") - - def simple_lbl(self): - return str(self) - - def full_address(self): - lbl = self.simple_lbl() - if lbl: - lbl += "\n" - lbl += self.address_lbl() - return lbl - - def address_lbl(self): - lbl = u'' - prefix = '' - if self.alt_address_is_prefered: - prefix = 'alt_' - if getattr(self, prefix + 'address'): - lbl += getattr(self, prefix + 'address') - if getattr(self, prefix + 'address_complement'): - if lbl: - lbl += "\n" - lbl += getattr(self, prefix + 'address_complement') - postal_code = getattr(self, prefix + 'postal_code') - town = getattr(self, prefix + 'town') - if postal_code or town: - if lbl: - lbl += "\n" - lbl += "{}{}{}".format( - postal_code or '', - " " if postal_code and town else '', - town or '') - if self.phone: - if lbl: - lbl += "\n" - lbl += "{} {}".format(str(_("Tel: ")), self.phone) - if self.mobile_phone: - if lbl: - lbl += "\n" - lbl += "{} {}".format(str(_("Mobile: ")), self.mobile_phone) - if self.email: - if lbl: - lbl += "\n" - lbl += "{} {}".format(str(_("Email: ")), self.email) - return lbl - - -class Merge(models.Model): - merge_key = models.TextField(_("Merge key"), blank=True, null=True) - merge_candidate = models.ManyToManyField("self", blank=True) - merge_exclusion = models.ManyToManyField("self", blank=True) - archived = models.NullBooleanField(default=False, - blank=True, null=True) - # 1 for one word similarity, 2 for two word similarity, etc. - MERGE_CLEMENCY = None - EMPTY_MERGE_KEY = '--' - MERGE_ATTRIBUTE = "name" - - class Meta: - abstract = True - - def generate_merge_key(self): - if self.archived: - return - merge_attr = getattr(self, self.MERGE_ATTRIBUTE) - self.merge_key = slugify(merge_attr if merge_attr else '') - if not self.merge_key: - self.merge_key = self.EMPTY_MERGE_KEY - self.merge_key = self.merge_key - - def generate_merge_candidate(self): - if self.archived: - return - if not self.merge_key: - self.generate_merge_key() - self.save(merge_key_generated=True) - if not self.pk or self.merge_key == self.EMPTY_MERGE_KEY: - return - q = self.__class__.objects \ - .exclude(pk=self.pk) \ - .exclude(merge_exclusion=self) \ - .exclude(merge_candidate=self) \ - .exclude(archived=True) - if not self.MERGE_CLEMENCY: - q = q.filter(merge_key=self.merge_key) - else: - subkeys_front = "-".join( - self.merge_key.split('-')[:self.MERGE_CLEMENCY]) - subkeys_back = "-".join( - self.merge_key.split('-')[-self.MERGE_CLEMENCY:]) - q = q.filter(Q(merge_key__istartswith=subkeys_front) | - Q(merge_key__iendswith=subkeys_back)) - for item in q.all(): - self.merge_candidate.add(item) - - def save(self, *args, **kwargs): - # prevent circular save - merge_key_generated = False - if 'merge_key_generated' in kwargs: - merge_key_generated = kwargs.pop('merge_key_generated') - self.generate_merge_key() - item = super(Merge, self).save(*args, **kwargs) - if not merge_key_generated: - self.merge_candidate.clear() - self.generate_merge_candidate() - return item - - def archive(self): - self.archived = True - self.save() - self.merge_candidate.clear() - self.merge_exclusion.clear() - - def merge(self, item, keep_old=False, exclude_fields=None): - merge_model_objects(self, item, keep_old=keep_old, - exclude_fields=exclude_fields) - self.generate_merge_candidate() - - class OrganizationType(GeneralType): class Meta: verbose_name = _("Organization type") @@ -5704,15 +2931,12 @@ class Document(BaseHistorizedItem, QRCodeItem, OwnPerms, ImageModel, verbose_name=_("Receipt date in documentation")) item_number = models.IntegerField(_("Number of items"), default=1) description = models.TextField(_("Description"), blank=True, null=True) - container = models.ForeignKey( - "archaeological_warehouse.Container", verbose_name=_("Container"), - blank=True, null=True, related_name='contained_documents', - on_delete=models.SET_NULL) - container_ref = models.ForeignKey( - "archaeological_warehouse.Container", - verbose_name=_("Reference container"), - blank=True, null=True, - related_name='contained_documents_ref', on_delete=models.SET_NULL) + container_id = models.PositiveIntegerField( + verbose_name=_("Container ID"), blank=True, null=True) + # container = models.ForeignKey("archaeological_warehouse.Container") + container_ref_id = models.PositiveIntegerField( + verbose_name=_("Container ID"), blank=True, null=True) + # container_ref = models.ForeignKey("archaeological_warehouse.Container") comment = models.TextField(_("Comment"), blank=True, null=True) additional_information = models.TextField(_("Additional information"), blank=True, null=True) @@ -5750,6 +2974,26 @@ class Document(BaseHistorizedItem, QRCodeItem, OwnPerms, ImageModel, def natural_key(self): return (self.external_id,) + @property + def container(self): + if not self.container_id: + return + Container = apps.get_model("archaeological_warehouse", "Container") + try: + return Container.objects.get(pk=self.container_id) + except Container.DoesNotExist: + return + + @property + def container_ref(self): + if not self.container_ref_id: + return + Container = apps.get_model("archaeological_warehouse", "Container") + try: + return Container.objects.get(pk=self.container_ref_id) + except Container.DoesNotExist: + return + """ @property def code(self): @@ -6177,6 +3421,16 @@ class Document(BaseHistorizedItem, QRCodeItem, OwnPerms, ImageModel, self.set_index() if not self.associated_url: self.associated_url = None + container = self.container + if self.container_id and not container: + self.container_id = None + if container and container.pk != self.container_id: + self.container_id = container.pk + container_ref = self.container_ref + if self.container_ref_id and not container_ref: + self.container_ref_id = None + if container_ref and container_ref.pk != self.container_ref_id: + self.container_ref_id = container_ref.pk super(Document, self).save(*args, **kwargs) if self.image and not no_path_change and \ @@ -6190,42 +3444,6 @@ class Document(BaseHistorizedItem, QRCodeItem, OwnPerms, ImageModel, self.save(no_path_change=True) -def document_attached_changed(sender, **kwargs): - # associate a default main image - instance = kwargs.get("instance", None) - model = kwargs.get("model", None) - pk_set = kwargs.get("pk_set", None) - if not instance or not model: - return - - if hasattr(instance, "documents"): - items = [instance] - else: - if not pk_set: - return - try: - items = [model.objects.get(pk=pk) for pk in pk_set] - except model.DoesNotExist: - return - - for item in items: - q = item.documents.filter( - image__isnull=False).exclude(image='') - if item.main_image: - if q.filter(pk=item.main_image.pk).count(): - return - # the association has disappear not the main image anymore - item.main_image = None - item.skip_history_when_saving = True - item.save() - if not q.count(): - return - # by default get the lowest pk - item.main_image = q.order_by('pk').all()[0] - item.skip_history_when_saving = True - item.save() - - post_save.connect(cached_label_changed, sender=Document) |