summaryrefslogtreecommitdiff
path: root/ishtar_common/models_common.py
diff options
context:
space:
mode:
authorÉtienne Loks <etienne.loks@iggdrasil.net>2020-10-07 19:09:30 +0200
committerÉtienne Loks <etienne.loks@iggdrasil.net>2021-02-28 12:15:21 +0100
commit9d5f0791187ff6b18d3ffa4db4d593fe96834e8d (patch)
tree9cd21bf7e51d271b958a9a4b2b85367adbb97992 /ishtar_common/models_common.py
parente5c0a159929fc64d63db37ebd85a5a810faf2534 (diff)
downloadIshtar-9d5f0791187ff6b18d3ffa4db4d593fe96834e8d.tar.bz2
Ishtar-9d5f0791187ff6b18d3ffa4db4d593fe96834e8d.zip
Refactoring of models. Document container - declare only id
Diffstat (limited to 'ishtar_common/models_common.py')
-rw-r--r--ishtar_common/models_common.py2777
1 files changed, 2777 insertions, 0 deletions
diff --git a/ishtar_common/models_common.py b/ishtar_common/models_common.py
new file mode 100644
index 000000000..b7685b8b5
--- /dev/null
+++ b/ishtar_common/models_common.py
@@ -0,0 +1,2777 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+"""
+Generic models and tools for models
+"""
+
+"""
+from ishtar_common.models import GeneralType, get_external_id, \
+ LightHistorizedItem, OwnPerms, Address, post_save_cache, \
+ DashboardFormItem, document_attached_changed, SearchAltName, \
+ DynamicRequest, GeoItem, QRCodeItem, SearchVectorConfig, DocumentItem, \
+ QuickAction, MainItem, Merge
+
+
+"""
+
+import copy
+from collections import OrderedDict
+import datetime
+import json
+import logging
+import os
+import pyqrcode
+import shutil
+import tempfile
+import time
+
+from django import forms
+from django.apps import apps
+from django.conf import settings
+from django.contrib.auth.models import User, Group
+from django.contrib.contenttypes.models import ContentType
+from django.contrib.gis.db import models
+from django.contrib.postgres.fields import JSONField
+from django.contrib.postgres.search import SearchVectorField, SearchVector
+from django.contrib.sites.models import Site
+from django.core.cache import cache
+from django.core.exceptions import ObjectDoesNotExist
+from django.core.files import File
+from django.core.serializers import serialize
+from django.core.urlresolvers import reverse, NoReverseMatch
+from django.core.validators import validate_slug
+from django.db import connection
+from django.db.models import Q, Count
+from django.db.models.signals import post_save, post_delete, m2m_changed
+from django.template.defaultfilters import slugify
+from django.utils.safestring import SafeText, mark_safe
+from django.utils.translation import activate, deactivate
+from ishtar_common.utils import ugettext_lazy as _, \
+ pgettext_lazy, get_image_path
+from simple_history.models import HistoricalRecords as BaseHistoricalRecords
+from simple_history.signals import post_create_historical_record, \
+ pre_create_historical_record
+from unidecode import unidecode
+
+from ishtar_common.model_managers import TypeManager
+from ishtar_common.model_merging import merge_model_objects
+from ishtar_common.models_imports import Import
+from ishtar_common.templatetags.link_to_window import simple_link_to_window
+from ishtar_common.utils import get_cache, disable_for_loaddata, \
+ get_all_field_names, merge_tsvectors, cached_label_changed, post_save_geo, \
+ task, duplicate_item, get_external_id, get_current_profile
+
+"""
+from ishtar_common.models import get_external_id, \
+ LightHistorizedItem, OwnPerms, Address, post_save_cache, \
+ DashboardFormItem, document_attached_changed, SearchAltName, \
+ DynamicRequest, GeoItem, QRCodeItem, SearchVectorConfig, DocumentItem, \
+ QuickAction, MainItem, Merge
+
+
+"""
+
+logger = logging.getLogger(__name__)
+
+
+class CachedGen(object):
+ @classmethod
+ def refresh_cache(cls):
+ raise NotImplementedError()
+
+ @classmethod
+ def _add_cache_key_to_refresh(cls, keys):
+ cache_ckey, current_keys = get_cache(cls, ['_current_keys'])
+ if type(current_keys) != list:
+ current_keys = []
+ if keys not in current_keys:
+ current_keys.append(keys)
+ cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT)
+
+
+class Cached(CachedGen):
+ slug_field = 'txt_idx'
+
+ @classmethod
+ def refresh_cache(cls):
+ cache_ckey, current_keys = get_cache(cls, ['_current_keys'])
+ if not current_keys:
+ return
+ for keys in current_keys:
+ if len(keys) == 2 and keys[0] == '__slug':
+ cls.get_cache(keys[1], force=True)
+ elif keys[0] == '__get_types':
+ default = None
+ empty_first = True
+ exclude = []
+ if len(keys) >= 2:
+ default = keys.pop()
+ if len(keys) > 1:
+ empty_first = bool(keys.pop())
+ exclude = keys[1:]
+ cls.get_types(
+ exclude=exclude, empty_first=empty_first, default=default,
+ force=True)
+ elif keys[0] == '__get_help':
+ cls.get_help(force=True)
+
+ @classmethod
+ def _add_cache_key_to_refresh(cls, keys):
+ cache_ckey, current_keys = get_cache(cls, ['_current_keys'])
+ if type(current_keys) != list:
+ current_keys = []
+ if keys not in current_keys:
+ current_keys.append(keys)
+ cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT)
+
+ @classmethod
+ def get_cache(cls, slug, force=False):
+ cache_key, value = get_cache(cls, ['__slug', slug])
+ if not force and value:
+ return value
+ try:
+ k = {cls.slug_field: slug}
+ obj = cls.objects.get(**k)
+ cache.set(cache_key, obj, settings.CACHE_TIMEOUT)
+ return obj
+ except cls.DoesNotExist:
+ cache.set(cache_key, None, settings.CACHE_TIMEOUT)
+ return None
+
+
+@disable_for_loaddata
+def post_save_cache(sender, **kwargs):
+ sender.refresh_cache()
+
+
+class GeneralType(Cached, models.Model):
+ """
+ Abstract class for "types"
+ """
+ label = models.TextField(_("Label"))
+ txt_idx = models.TextField(
+ _("Textual ID"), validators=[validate_slug],
+ unique=True,
+ help_text=_(
+ "The slug is the standardized version of the name. It contains "
+ "only lowercase letters, numbers and hyphens. Each slug must "
+ "be unique."))
+ comment = models.TextField(_("Comment"), blank=True, null=True)
+ available = models.BooleanField(_("Available"), default=True)
+ HELP_TEXT = ""
+ objects = TypeManager()
+
+ class Meta:
+ abstract = True
+
+ def __str__(self):
+ return self.label
+
+ def natural_key(self):
+ return (self.txt_idx,)
+
+ def history_compress(self):
+ return self.txt_idx
+
+ @classmethod
+ def history_decompress(cls, value, create=False):
+ if not value:
+ return []
+ res = []
+ for txt_idx in value:
+ try:
+ res.append(cls.objects.get(txt_idx=txt_idx))
+ except cls.DoesNotExist:
+ continue
+ return res
+
+ @property
+ def explicit_label(self):
+ return "{} ({})".format(self.label, self._meta.verbose_name)
+
+ @classmethod
+ def create_default_for_test(cls):
+ return [cls.objects.create(label='Test %d' % i) for i in range(5)]
+
+ @property
+ def short_label(self):
+ return self.label
+
+ @property
+ def name(self):
+ return self.label
+
+ @classmethod
+ def get_or_create(cls, slug, label=''):
+ """
+ Get or create a new item.
+
+ :param slug: textual id
+ :param label: label for initialization if the item doesn't exist (not
+ mandatory)
+
+ :return: instancied item of the base class
+ """
+
+ item = cls.get_cache(slug)
+ if item:
+ return item
+ item, created = cls.objects.get_or_create(
+ txt_idx=slug, defaults={'label': label})
+ return item
+
+ @classmethod
+ def get_or_create_pk(cls, slug):
+ """
+ Get an id from a slug. Create the associated item if needed.
+
+ :param slug: textual id
+
+ :return: id of the item (string)
+ """
+ return str(cls.get_or_create(slug).pk)
+
+ @classmethod
+ def get_or_create_pks(cls, slugs):
+ """
+ Get and merge a list of ids from a slug list. Create the associated
+ items if needed.
+
+ :param slugs: textual ids
+
+ :return: string with ids separated by "_"
+ """
+ items = []
+ for slug in slugs:
+ items.append(str(cls.get_or_create(slug).pk))
+ return "_".join(items)
+
+ @classmethod
+ def get_help(cls, dct=None, exclude=None, force=False, full_hierarchy=None):
+ if not dct:
+ dct = {}
+ if not exclude:
+ exclude = []
+ keys = ['__get_help']
+ keys += ["{}".format(ex) for ex in exclude]
+ keys += ['{}-{}'.format(str(k), dct[k]) for k in dct]
+ cache_key, value = get_cache(cls, keys)
+ if value and not force:
+ return mark_safe(value)
+ help_text = cls.HELP_TEXT
+ c_rank = -1
+ help_items = "\n"
+ for item in cls.get_types(dct=dct, instances=True, exclude=exclude):
+ if hasattr(item, '__iter__'):
+ pk = item[0]
+ item = cls.objects.get(pk=pk)
+ item.rank = c_rank + 1
+ if hasattr(item, 'parent'):
+ c_item = item
+ parents = []
+ while c_item.parent:
+ parents.append(c_item.parent.label)
+ c_item = c_item.parent
+ parents.reverse()
+ parents.append(item.label)
+ item.label = " / ".join(parents)
+ if not item.comment:
+ continue
+ if c_rank > item.rank:
+ help_items += "</dl>\n"
+ elif c_rank < item.rank:
+ help_items += "<dl>\n"
+ c_rank = item.rank
+ help_items += "<dt>%s</dt><dd>%s</dd>" % (
+ item.label, "<br/>".join(item.comment.split('\n')))
+ c_rank += 1
+ if c_rank:
+ help_items += c_rank * "</dl>"
+ if help_text or help_items != u'\n':
+ help_text = help_text + help_items
+ else:
+ help_text = ""
+ cache.set(cache_key, help_text, settings.CACHE_TIMEOUT)
+ return mark_safe(help_text)
+
+ @classmethod
+ def _get_initial_types(cls, initial, type_pks, instance=False):
+ new_vals = []
+ if not initial:
+ return []
+ if type(initial) not in (list, tuple):
+ initial = [initial]
+ for value in initial:
+ try:
+ pk = int(value)
+ except (ValueError, TypeError):
+ continue
+ if pk in type_pks:
+ continue
+ try:
+ extra_type = cls.objects.get(pk=pk)
+ if instance:
+ new_vals.append(extra_type)
+ else:
+ new_vals.append((extra_type.pk, str(extra_type)))
+ except cls.DoesNotExist:
+ continue
+ return new_vals
+
+ @classmethod
+ def get_types(cls, dct=None, instances=False, exclude=None,
+ empty_first=True, default=None, initial=None, force=False,
+ full_hierarchy=False):
+ if not dct:
+ dct = {}
+ if not exclude:
+ exclude = []
+ types = []
+ if not instances and empty_first and not default:
+ types = [('', '--')]
+ types += cls._pre_get_types(dct, instances, exclude,
+ default, force,
+ get_full_hierarchy=full_hierarchy)
+ if not initial:
+ return types
+ new_vals = cls._get_initial_types(initial, [idx for idx, lbl in types])
+ types += new_vals
+ return types
+
+ @classmethod
+ def _pre_get_types(cls, dct=None, instances=False, exclude=None,
+ default=None, force=False, get_full_hierarchy=False):
+ if not dct:
+ dct = {}
+ if not exclude:
+ exclude = []
+ # cache
+ cache_key = None
+ if not instances:
+ keys = ['__get_types']
+ keys += ["{}".format(ex) for ex in exclude] + \
+ ["{}".format(default)]
+ keys += ['{}-{}'.format(str(k), dct[k]) for k in dct]
+ cache_key, value = get_cache(cls, keys)
+ if value and not force:
+ return value
+ base_dct = dct.copy()
+ if hasattr(cls, 'parent'):
+ if not cache_key:
+ return cls._get_parent_types(
+ base_dct, instances, exclude=exclude,
+ default=default, get_full_hierarchy=get_full_hierarchy)
+ vals = [v for v in cls._get_parent_types(
+ base_dct, instances, exclude=exclude,
+ default=default, get_full_hierarchy=get_full_hierarchy)]
+ cache.set(cache_key, vals, settings.CACHE_TIMEOUT)
+ return vals
+
+ if not cache_key:
+ return cls._get_types(base_dct, instances, exclude=exclude,
+ default=default)
+ vals = [
+ v for v in cls._get_types(base_dct, instances, exclude=exclude,
+ default=default)
+ ]
+ cache.set(cache_key, vals, settings.CACHE_TIMEOUT)
+ return vals
+
+ @classmethod
+ def _get_types(cls, dct=None, instances=False, exclude=None, default=None):
+ if not dct:
+ dct = {}
+ if not exclude:
+ exclude = []
+ dct['available'] = True
+ if default:
+ try:
+ default = cls.objects.get(txt_idx=default)
+ yield (default.pk, _(str(default)))
+ except cls.DoesNotExist:
+ pass
+ items = cls.objects.filter(**dct)
+ if default and default != "None":
+ if hasattr(default, 'txt_idx'):
+ exclude.append(default.txt_idx)
+ else:
+ exclude.append(default)
+ if exclude:
+ items = items.exclude(txt_idx__in=exclude)
+ for item in items.order_by(*cls._meta.ordering).all():
+ if instances:
+ item.rank = 0
+ yield item
+ else:
+ yield (item.pk, _(str(item)) if item and str(item) else '')
+
+ @classmethod
+ def _get_childs_list(cls, dct=None, exclude=None, instances=False):
+ if not dct:
+ dct = {}
+ if not exclude:
+ exclude = []
+ if 'parent' in dct:
+ dct.pop('parent')
+ childs = cls.objects.filter(**dct)
+ if exclude:
+ childs = childs.exclude(txt_idx__in=exclude)
+ if hasattr(cls, 'order'):
+ childs = childs.order_by('order')
+ res = {}
+ if instances:
+ for item in childs.all():
+ parent_id = item.parent_id or 0
+ if parent_id not in res:
+ res[parent_id] = []
+ res[parent_id].append(item)
+ else:
+ for item in childs.values("id", "parent_id", "label").all():
+ parent_id = item["parent_id"] or 0
+ if item["id"] == item["parent_id"]:
+ parent_id = 0
+ if parent_id not in res:
+ res[parent_id] = []
+ res[parent_id].append((item["id"], item["label"]))
+ return res
+
+ PREFIX = "&#x2502; "
+ PREFIX_EMPTY = "&nbsp; "
+ PREFIX_MEDIUM = "&#x251C; "
+ PREFIX_LAST = "&#x2514; "
+ PREFIX_CODES = ["\u2502", "\u251C", "\u2514"]
+
+ @classmethod
+ def _get_childs(cls, item, child_list, prefix=0, instances=False,
+ is_last=False, last_of=None, get_full_hierarchy=False):
+ if not last_of:
+ last_of = []
+
+ prefix += 1
+ current_child_lst = []
+ if item in child_list:
+ current_child_lst = child_list[item]
+
+ lst = []
+ total = len(current_child_lst)
+ full_hierarchy_initial = get_full_hierarchy
+ for idx, child in enumerate(current_child_lst):
+ mylast_of = last_of[:]
+ p = ''
+ if instances:
+ child.rank = prefix
+ lst.append(child)
+ else:
+ if full_hierarchy_initial:
+ if isinstance(full_hierarchy_initial, str):
+ p = full_hierarchy_initial + " > "
+ else:
+ p = ""
+ else:
+ cprefix = prefix
+ while cprefix:
+ cprefix -= 1
+ if not cprefix:
+ if (idx + 1) == total:
+ p += cls.PREFIX_LAST
+ else:
+ p += cls.PREFIX_MEDIUM
+ elif is_last:
+ if mylast_of:
+ clast = mylast_of.pop(0)
+ if clast:
+ p += cls.PREFIX_EMPTY
+ else:
+ p += cls.PREFIX
+ else:
+ p += cls.PREFIX_EMPTY
+ else:
+ p += cls.PREFIX
+ lst.append((
+ child[0], SafeText(p + str(_(child[1])))
+ ))
+ clast_of = last_of[:]
+ clast_of.append(idx + 1 == total)
+ if instances:
+ child_id = child.id
+ else:
+ child_id = child[0]
+ if get_full_hierarchy:
+ if p:
+ if not p.endswith(" > "):
+ p += " > "
+ get_full_hierarchy = p + child[1]
+ else:
+ get_full_hierarchy = child[1]
+ for sub_child in cls._get_childs(
+ child_id, child_list, prefix, instances,
+ is_last=((idx + 1) == total), last_of=clast_of,
+ get_full_hierarchy=get_full_hierarchy):
+ lst.append(sub_child)
+ return lst
+
+ @classmethod
+ def _get_parent_types(cls, dct=None, instances=False, exclude=None,
+ default=None, get_full_hierarchy=False):
+ if not dct:
+ dct = {}
+ if not exclude:
+ exclude = []
+ dct['available'] = True
+ child_list = cls._get_childs_list(dct, exclude, instances)
+
+ if 0 in child_list:
+ for item in child_list[0]:
+ if instances:
+ item.rank = 0
+ item_id = item.pk
+ yield item
+ else:
+ item_id = item[0]
+ yield item
+ if get_full_hierarchy:
+ get_full_hierarchy = item[1]
+ for child in cls._get_childs(
+ item_id, child_list, instances=instances,
+ get_full_hierarchy=get_full_hierarchy):
+ yield child
+
+ def save(self, *args, **kwargs):
+ ItemKey = apps.get_model("ishtar_common", "ItemKey")
+ if not self.id and not self.label:
+ txt_idx = self.txt_idx
+ if isinstance(txt_idx, list):
+ txt_idx = txt_idx[0]
+ self.txt_idx = txt_idx
+ self.label = " ".join(" ".join(self.txt_idx.split('-'))
+ .split('_')).title()
+ if not self.txt_idx:
+ self.txt_idx = slugify(self.label)[:100]
+
+ # clean old keys
+ if self.pk:
+ old = self.__class__.objects.get(pk=self.pk)
+ content_type = ContentType.objects.get_for_model(self.__class__)
+ if slugify(self.label) != slugify(old.label):
+ ItemKey.objects.filter(
+ object_id=self.pk, key=slugify(old.label),
+ content_type=content_type).delete()
+ if self.txt_idx != old.txt_idx:
+ ItemKey.objects.filter(
+ object_id=self.pk, key=old.txt_idx,
+ content_type=content_type).delete()
+
+ obj = super(GeneralType, self).save(*args, **kwargs)
+ self.generate_key(force=True)
+ return obj
+
+ def add_key(self, key, force=False, importer=None, group=None,
+ user=None):
+ ItemKey = apps.get_model("ishtar_common", "ItemKey")
+ content_type = ContentType.objects.get_for_model(self.__class__)
+ if not importer and not force and ItemKey.objects.filter(
+ key=key, content_type=content_type).count():
+ return
+ filtr = {'key': key, 'content_type': content_type}
+ if group:
+ filtr['group'] = group
+ elif user:
+ filtr['user'] = user
+ else:
+ filtr['importer'] = importer
+ if force:
+ ItemKey.objects.filter(**filtr).exclude(object_id=self.pk).delete()
+ filtr['object_id'] = self.pk
+ ItemKey.objects.get_or_create(**filtr)
+
+ def generate_key(self, force=False):
+ for key in (slugify(self.label), self.txt_idx):
+ self.add_key(key)
+
+ def get_keys(self, importer):
+ ItemKey = apps.get_model("ishtar_common", "ItemKey")
+ keys = [self.txt_idx]
+ content_type = ContentType.objects.get_for_model(self.__class__)
+ base_q = Q(content_type=content_type, object_id=self.pk)
+ subquery = Q(importer__isnull=True, user__isnull=True,
+ group__isnull=True)
+ subquery |= Q(user__isnull=True, group__isnull=True,
+ importer=importer)
+ if importer.user:
+ subquery |= Q(user=importer.user, group__isnull=True,
+ importer=importer)
+ if importer.associated_group:
+ subquery |= Q(user__isnull=True, group=importer.associated_group,
+ importer=importer)
+ q = ItemKey.objects.filter(base_q & subquery)
+ for ik in q.exclude(key=self.txt_idx).all():
+ keys.append(ik.key)
+ return keys
+
+ @classmethod
+ def generate_keys(cls):
+ # content_type = ContentType.objects.get_for_model(cls)
+ for item in cls.objects.all():
+ item.generate_key()
+
+
+class HierarchicalType(GeneralType):
+ parent = models.ForeignKey('self', blank=True, null=True,
+ on_delete=models.SET_NULL,
+ verbose_name=_("Parent"))
+
+ class Meta:
+ abstract = True
+
+ def full_label(self):
+ lbls = [self.label]
+ item = self
+ while item.parent:
+ item = item.parent
+ lbls.append(item.label)
+ return " > ".join(reversed(lbls))
+
+
+class StatisticItem:
+ STATISTIC_MODALITIES = [] # example: "year", "operation_type__label"
+ STATISTIC_MODALITIES_OPTIONS = OrderedDict() # example:
+ # OrderedDict([('year', _("Year")),
+ # ("operation_type__label", _("Operation type"))])
+ STATISTIC_SUM_VARIABLE = OrderedDict(
+ (("pk", (_("Number"), 1)),)
+ ) # example: "Price", "Volume" - the number is a multiplier
+
+
+class TemplateItem:
+ @classmethod
+ def _label_templates_q(cls):
+ model_name = "{}.{}".format(
+ cls.__module__, cls.__name__)
+ q = Q(associated_model__klass=model_name,
+ for_labels=True, available=True)
+ alt_model_name = model_name.replace(
+ "models_finds", "models").replace(
+ "models_treatments", "models")
+ if alt_model_name != model_name:
+ q |= Q(associated_model__klass=model_name,
+ for_labels=True, available=True)
+ DocumentTemplate = apps.get_model("ishtar_common", "DocumentTemplate")
+ return DocumentTemplate.objects.filter(q)
+
+ @classmethod
+ def has_label_templates(cls):
+ return cls._label_templates_q().count()
+
+ @classmethod
+ def label_templates(cls):
+ return cls._label_templates_q()
+
+ def get_extra_templates(self, request):
+ cls = self.__class__
+ templates = []
+ name = str(cls.__name__)
+ module = str(cls.__module__)
+ if "archaeological_finds" in module:
+ if "models_finds" in name or "models_treatments" in name:
+ names = [
+ name,
+ name.replace("models_finds", "models"
+ ).replace("models_treatments", "models")
+ ]
+ else:
+ names = [name, name.replace("models", "models_finds"),
+ name.replace("models", "models_treatments")]
+ else:
+ names = [name]
+ model_names = [
+ "{}.{}".format(module, name) for name in names
+ ]
+ DocumentTemplate = apps.get_model("ishtar_common", "DocumentTemplate")
+ q = DocumentTemplate.objects.filter(
+ associated_model__klass__in=model_names,
+ for_labels=False, available=True)
+ for template in q.all():
+ urlname = "generate-document"
+ templates.append(
+ (template.name, reverse(
+ urlname, args=[template.slug, self.pk]))
+ )
+ return templates
+
+
+class FullSearch(models.Model):
+ search_vector = SearchVectorField(_("Search vector"), blank=True, null=True,
+ help_text=_("Auto filled at save"))
+
+ EXTRA_REQUEST_KEYS = {}
+ DYNAMIC_REQUESTS = {}
+ ALT_NAMES = {}
+
+ BASE_SEARCH_VECTORS = []
+ PROPERTY_SEARCH_VECTORS = []
+ INT_SEARCH_VECTORS = []
+ M2M_SEARCH_VECTORS = []
+ PARENT_SEARCH_VECTORS = []
+ # prevent circular dependency
+ PARENT_ONLY_SEARCH_VECTORS = []
+
+ class Meta:
+ abstract = True
+
+ @classmethod
+ def general_types(cls):
+ for k in get_all_field_names(cls):
+ field = cls._meta.get_field(k)
+ if not hasattr(field, 'rel') or not field.rel:
+ continue
+ rel_model = field.rel.to
+ if issubclass(rel_model, (GeneralType, HierarchicalType)):
+ yield k
+
+ @classmethod
+ def get_alt_names(cls):
+ alt_names = cls.ALT_NAMES.copy()
+ for dr_k in cls.DYNAMIC_REQUESTS:
+ alt_names.update(cls.DYNAMIC_REQUESTS[dr_k].get_alt_names())
+ return alt_names
+
+ @classmethod
+ def get_query_parameters(cls):
+ query_parameters = {}
+ for v in cls.get_alt_names().values():
+ for language_code, language_lbl in settings.LANGUAGES:
+ activate(language_code)
+ query_parameters[str(v.search_key)] = v
+ deactivate()
+ return query_parameters
+
+ def _update_search_field(self, search_vector_conf, search_vectors, data):
+ for value in search_vector_conf.format(data):
+ with connection.cursor() as cursor:
+ cursor.execute("SELECT to_tsvector(%s, %s)", [
+ search_vector_conf.language, value])
+ row = cursor.fetchone()
+ search_vectors.append(row[0])
+
+ def _update_search_number_field(self, search_vectors, val):
+ search_vectors.append("'{}':1".format(val))
+
+ def update_search_vector(self, save=True, exclude_parent=False):
+ """
+ Update the search vector
+ :param save: True if you want to save the object immediately
+ :return: True if modified
+ """
+ if not hasattr(self, 'search_vector'):
+ return
+ if not self.pk:
+ # logger.warning("Cannot update search vector before save or "
+ # "after deletion.")
+ return
+ if not self.BASE_SEARCH_VECTORS and not self.M2M_SEARCH_VECTORS \
+ and not self.INT_SEARCH_VECTORS \
+ and not self.PROPERTY_SEARCH_VECTORS \
+ and not self.PARENT_SEARCH_VECTORS:
+ logger.warning("No search_vectors defined for {}".format(
+ self.__class__))
+ return
+ if getattr(self, '_search_updated', None):
+ return
+ JsonDataField = apps.get_model("ishtar_common", "JsonDataField")
+ self._search_updated = True
+
+ old_search = ""
+ if self.search_vector:
+ old_search = self.search_vector[:]
+ search_vectors = []
+ base_q = self.__class__.objects.filter(pk=self.pk)
+
+ # many to many have to be queried one by one otherwise only one is fetch
+ for m2m_search_vector in self.M2M_SEARCH_VECTORS:
+ key = m2m_search_vector.key.split('__')[0]
+ rel_key = getattr(self, key)
+ for item in rel_key.values('pk').all():
+ query_dct = {key + "__pk": item['pk']}
+ q = copy.copy(base_q).filter(**query_dct)
+ q = q.annotate(
+ search=SearchVector(
+ m2m_search_vector.key,
+ config=m2m_search_vector.language)
+ ).values('search')
+ search_vectors.append(q.all()[0]['search'])
+
+ # int/float are not well managed by the SearchVector
+ for int_search_vector in self.INT_SEARCH_VECTORS:
+ q = base_q.values(int_search_vector.key)
+ for val in int_search_vector.format(
+ q.all()[0][int_search_vector.key]):
+ self._update_search_number_field(search_vectors, val)
+
+ if not exclude_parent:
+ # copy parent vector fields
+ for PARENT_SEARCH_VECTOR in self.PARENT_SEARCH_VECTORS:
+ parent = getattr(self, PARENT_SEARCH_VECTOR)
+ if hasattr(parent, 'all'): # m2m
+ for p in parent.all():
+ search_vectors.append(p.search_vector)
+ elif parent:
+ search_vectors.append(parent.search_vector)
+
+ for PARENT_ONLY_SEARCH_VECTOR in self.PARENT_ONLY_SEARCH_VECTORS:
+ parent = getattr(self, PARENT_ONLY_SEARCH_VECTOR)
+ if hasattr(parent, 'all'): # m2m
+ for p in parent.all():
+ search_vectors.append(
+ p.update_search_vector(save=False, exclude_parent=True)
+ )
+ elif parent:
+ search_vectors.append(
+ parent.update_search_vector(save=False, exclude_parent=True)
+ )
+
+ if self.BASE_SEARCH_VECTORS:
+ # query "simple" fields
+ q = base_q.values(*[sv.key for sv in self.BASE_SEARCH_VECTORS])
+ res = q.all()[0]
+ for base_search_vector in self.BASE_SEARCH_VECTORS:
+ data = res[base_search_vector.key]
+ data = unidecode(str(data))
+ self._update_search_field(base_search_vector,
+ search_vectors, data)
+
+ if self.PROPERTY_SEARCH_VECTORS:
+ for property_search_vector in self.PROPERTY_SEARCH_VECTORS:
+ data = getattr(self, property_search_vector.key)
+ if callable(data):
+ data = data()
+ if not data:
+ continue
+ data = str(data)
+ self._update_search_field(property_search_vector,
+ search_vectors, data)
+
+ if hasattr(self, 'data') and self.data:
+ content_type = ContentType.objects.get_for_model(self)
+ for json_field in JsonDataField.objects.filter(
+ content_type=content_type,
+ search_index=True).all():
+ data = copy.deepcopy(self.data)
+ no_data = False
+ for key in json_field.key.split('__'):
+ if key not in data:
+ no_data = True
+ break
+ data = data[key]
+ if no_data or not data:
+ continue
+
+ if json_field.value_type == 'B':
+ if data is True:
+ data = json_field.name
+ else:
+ continue
+ elif json_field.value_type in ('I', 'F'):
+ self._update_search_number_field(search_vectors, data)
+ continue
+ elif json_field.value_type == 'D':
+ # only index year
+ self._update_search_number_field(search_vectors, data.year)
+ continue
+ for lang in ("simple", settings.ISHTAR_SEARCH_LANGUAGE):
+ with connection.cursor() as cursor:
+ cursor.execute("SELECT to_tsvector(%s, %s)",
+ [lang, data])
+ row = cursor.fetchone()
+ search_vectors.append(row[0])
+ new_search_vector = merge_tsvectors(search_vectors)
+ changed = old_search != new_search_vector
+ self.search_vector = new_search_vector
+ if save and changed:
+ self.__class__.objects.filter(pk=self.pk).update(
+ search_vector=new_search_vector)
+ elif not save:
+ return new_search_vector
+ return changed
+
+
+class Imported(models.Model):
+ imports = models.ManyToManyField(
+ Import, blank=True,
+ related_name="imported_%(app_label)s_%(class)s")
+
+ class Meta:
+ abstract = True
+
+
+class JsonData(models.Model, CachedGen):
+ data = JSONField(default={}, blank=True)
+
+ class Meta:
+ abstract = True
+
+ def pre_save(self):
+ if not self.data:
+ self.data = {}
+
+ @property
+ def json_sections(self):
+ sections = []
+ try:
+ content_type = ContentType.objects.get_for_model(self)
+ except ContentType.DoesNotExists:
+ return sections
+ JsonDataField = apps.get_model("ishtar_common", "JsonDataField")
+ fields = list(JsonDataField.objects.filter(
+ content_type=content_type, display=True, section__isnull=True
+ ).all()) # no section fields
+
+ fields += list(JsonDataField.objects.filter(
+ content_type=content_type, display=True, section__isnull=False
+ ).order_by('section__order', 'order').all())
+
+ for field in fields:
+ value = None
+ data = self.data.copy()
+ for key in field.key.split('__'):
+ if key in data:
+ value = copy.copy(data[key])
+ data = data[key]
+ else:
+ value = None
+ break
+ if value is None:
+ continue
+ if type(value) in (list, tuple):
+ value = " ; ".join([str(v) for v in value])
+ section_name = field.section.name if field.section else None
+ if not sections or section_name != sections[-1][0]:
+ # if section name is identical it is the same
+ sections.append((section_name, []))
+ sections[-1][1].append((field.name, value))
+ return sections
+
+ @classmethod
+ def refresh_cache(cls):
+ __, refreshed = get_cache(cls, ['cache_refreshed'])
+ if refreshed and time.time() - refreshed < 1:
+ return
+ cache_ckey, current_keys = get_cache(cls, ['_current_keys'])
+ if not current_keys:
+ return
+ for keys in current_keys:
+ if keys[0] == '__get_dynamic_choices':
+ cls._get_dynamic_choices(keys[1], force=True)
+
+ @classmethod
+ def _get_dynamic_choices(cls, key, force=False):
+ """
+ Get choice from existing values
+ :param key: data key
+ :param force: if set to True do not use cache
+ :return: tuple of choices (id, value)
+ """
+ cache_key, value = get_cache(cls, ['__get_dynamic_choices', key])
+ if not force and value:
+ return value
+ choices = set()
+ splitted_key = key[len('data__'):].split('__')
+ q = cls.objects.filter(
+ data__has_key=key[len('data__'):]).values_list('data', flat=True)
+ for value in q.all():
+ for k in splitted_key:
+ value = value[k]
+ choices.add(value)
+ choices = [('', '')] + [(v, v) for v in sorted(list(choices))]
+ cache.set(cache_key, choices, settings.CACHE_SMALLTIMEOUT)
+ return choices
+
+
+class FixAssociated:
+ ASSOCIATED = {}
+
+ def fix_associated(self):
+ for key in self.ASSOCIATED:
+ item = getattr(self, key)
+ if not item:
+ continue
+ dct = self.ASSOCIATED[key]
+ for dct_key in dct:
+ subkey, ctype = dct_key
+ expected_values = dct[dct_key]
+ if not isinstance(expected_values, (list, tuple)):
+ expected_values = [expected_values]
+ if hasattr(ctype, "txt_idx"):
+ try:
+ expected_values = [ctype.objects.get(txt_idx=v)
+ for v in expected_values]
+ except ctype.DoesNotExist:
+ # type not yet initialized
+ return
+ current_vals = getattr(item, subkey)
+ is_many = False
+ if hasattr(current_vals, "all"):
+ is_many = True
+ current_vals = current_vals.all()
+ else:
+ current_vals = [current_vals]
+ is_ok = False
+ for current_val in current_vals:
+ if current_val in expected_values:
+ is_ok = True
+ break
+ if is_ok:
+ continue
+ # the first value is used
+ new_value = expected_values[0]
+ if is_many:
+ getattr(item, subkey).add(new_value)
+ else:
+ setattr(item, subkey, new_value)
+
+
+class CascasdeUpdate:
+ DOWN_MODEL_UPDATE = []
+
+ def cascade_update(self):
+ for down_model in self.DOWN_MODEL_UPDATE:
+ if not settings.USE_BACKGROUND_TASK:
+ rel = getattr(self, down_model)
+ if hasattr(rel.model, "need_update"):
+ rel.update(need_update=True)
+ continue
+ for item in getattr(self, down_model).all():
+ cached_label_changed(item.__class__, instance=item)
+ if hasattr(item, "point_2d"):
+ post_save_geo(item.__class__, instance=item)
+
+
+class SearchAltName(object):
+ def __init__(self, search_key, search_query, extra_query=None,
+ distinct_query=False):
+ self.search_key = search_key
+ self.search_query = search_query
+ self.extra_query = extra_query or {}
+ self.distinct_query = distinct_query
+
+
+class HistoryError(Exception):
+ def __init__(self, value):
+ self.value = value
+
+ def __str__(self):
+ return repr(self.value)
+
+
+class HistoricalRecords(BaseHistoricalRecords):
+ def _save_historic(self, manager, instance, history_date, history_type,
+ history_user, history_change_reason, using, attrs):
+ history_instance = manager.model(
+ history_date=history_date,
+ history_type=history_type,
+ history_user=history_user,
+ history_change_reason=history_change_reason,
+ **attrs
+ )
+
+ pre_create_historical_record.send(
+ sender=manager.model,
+ instance=instance,
+ history_date=history_date,
+ history_user=history_user,
+ history_change_reason=history_change_reason,
+ history_instance=history_instance,
+ using=using,
+ )
+
+ history_instance.save(using=using)
+
+ post_create_historical_record.send(
+ sender=manager.model,
+ instance=instance,
+ history_instance=history_instance,
+ history_date=history_date,
+ history_user=history_user,
+ history_change_reason=history_change_reason,
+ using=using,
+ )
+
+ def create_historical_record(self, instance, history_type, using=None):
+ try:
+ history_modifier = getattr(instance, 'history_modifier', None)
+ assert history_modifier
+ except (User.DoesNotExist, AssertionError):
+ # on batch removing of users, user could have disappeared
+ return
+ history_date = getattr(instance, "_history_date",
+ datetime.datetime.now())
+ history_change_reason = getattr(instance, "changeReason", None)
+ force = getattr(instance, "_force_history", False)
+ manager = getattr(instance, self.manager_name)
+ attrs = {}
+ for field in instance._meta.fields:
+ attrs[field.attname] = getattr(instance, field.attname)
+ q_history = instance.history \
+ .filter(history_modifier_id=history_modifier.pk) \
+ .order_by('-history_date', '-history_id')
+ # instance.skip_history_when_saving = True
+ if not q_history.count():
+ if force:
+ delattr(instance, '_force_history')
+ self._save_historic(
+ manager, instance, history_date, history_type, history_modifier,
+ history_change_reason, using, attrs)
+ return
+ old_instance = q_history.all()[0]
+ # multiple saving by the same user in a very short time are generaly
+ # caused by post_save signals it is not relevant to keep them
+ min_history_date = datetime.datetime.now() \
+ - datetime.timedelta(seconds=5)
+ q = q_history.filter(history_date__isnull=False,
+ history_date__gt=min_history_date) \
+ .order_by('-history_date', '-history_id')
+ if not force and q.count():
+ return
+
+ if force:
+ delattr(instance, '_force_history')
+
+ # record a new version only if data have been changed
+ for field in instance._meta.fields:
+ if getattr(old_instance, field.attname) != attrs[field.attname]:
+ self._save_historic(manager, instance, history_date,
+ history_type, history_modifier,
+ history_change_reason, using, attrs)
+ return
+
+
+class BaseHistorizedItem(StatisticItem, TemplateItem, FullSearch, Imported,
+ JsonData, FixAssociated, CascasdeUpdate):
+ """
+ Historized item with external ID management.
+ All historized items are searchable and have a data json field.
+ Historized items can be "locked" for edition.
+ """
+ IS_BASKET = False
+ SHOW_URL = None
+ EXTERNAL_ID_KEY = ''
+ EXTERNAL_ID_DEPENDENCIES = []
+ HISTORICAL_M2M = []
+
+ history_modifier = models.ForeignKey(
+ User, related_name='+', on_delete=models.SET_NULL,
+ verbose_name=_("Last editor"), blank=True, null=True)
+ history_creator = models.ForeignKey(
+ User, related_name='+', on_delete=models.SET_NULL,
+ verbose_name=_("Creator"), blank=True, null=True)
+ last_modified = models.DateTimeField(auto_now=True)
+ history_m2m = JSONField(default={}, blank=True)
+ need_update = models.BooleanField(
+ verbose_name=_("Need update"), default=False)
+ locked = models.BooleanField(
+ verbose_name=_("Item locked for edition"), default=False)
+ lock_user = models.ForeignKey(
+ User, related_name='+', on_delete=models.SET_NULL,
+ verbose_name=_("Locked by"), blank=True, null=True)
+
+ ALT_NAMES = {
+ 'history_creator': SearchAltName(
+ pgettext_lazy("key for text search", u"created-by"),
+ 'history_creator__ishtaruser__person__cached_label__iexact'
+ ),
+ 'history_modifier': SearchAltName(
+ pgettext_lazy("key for text search", u"modified-by"),
+ 'history_modifier__ishtaruser__person__cached_label__iexact'
+ ),
+ 'modified_before': SearchAltName(
+ pgettext_lazy("key for text search", "modified-before"),
+ 'last_modified__lte'
+ ),
+ 'modified_after': SearchAltName(
+ pgettext_lazy("key for text search", "modified-after"),
+ 'last_modified__gte'
+ ),
+ }
+
+ class Meta:
+ abstract = True
+
+ @classmethod
+ def get_verbose_name(cls):
+ return cls._meta.verbose_name
+
+ def is_locked(self, user=None):
+ if not user:
+ return self.locked
+ return self.locked and (not self.lock_user or self.lock_user != user)
+
+ def merge(self, item, keep_old=False):
+ merge_model_objects(self, item, keep_old=keep_old)
+
+ def public_representation(self):
+ return {}
+
+ def duplicate(self, user=None, data=None):
+ return duplicate_item(self, user, data)
+
+ def update_external_id(self, save=False):
+ if not self.EXTERNAL_ID_KEY or (
+ self.external_id and
+ not getattr(self, 'auto_external_id', False)):
+ return
+ external_id = get_external_id(self.EXTERNAL_ID_KEY, self)
+ if external_id == self.external_id:
+ return
+ self.auto_external_id = True
+ self.external_id = external_id
+ self._cached_label_checked = False
+ if save:
+ self.skip_history_when_saving = True
+ self.save()
+ return external_id
+
+ def get_last_history_date(self):
+ q = self.history.values("history_date").order_by('-history_date')
+ if not q.count():
+ return
+ return q.all()[0]['history_date']
+
+ def get_previous(self, step=None, date=None, strict=False):
+ """
+ Get a "step" previous state of the item
+ """
+ assert step or date
+ historized = self.history.all()
+ item = None
+ if step:
+ if len(historized) <= step:
+ # silently return the last step if too far in the history
+ item = historized[len(historized) - 1]
+ else:
+ item = historized[step]
+ else:
+ for step, item in enumerate(historized):
+ if item.history_date == date:
+ break
+ # ended with no match
+ if item.history_date != date:
+ return
+ item._step = step
+ if len(historized) != (step + 1):
+ item._previous = historized[step + 1].history_date
+ else:
+ item._previous = None
+ if step > 0:
+ item._next = historized[step - 1].history_date
+ else:
+ item._next = None
+ item.history_date = historized[step].history_date
+ model = self.__class__
+ for k in get_all_field_names(model):
+ field = model._meta.get_field(k)
+ if hasattr(field, 'rel') and field.rel:
+ if not hasattr(item, k + '_id'):
+ setattr(item, k, getattr(self, k))
+ continue
+ val = getattr(item, k + '_id')
+ if not val:
+ setattr(item, k, None)
+ continue
+ try:
+ val = field.rel.to.objects.get(pk=val)
+ setattr(item, k, val)
+ except ObjectDoesNotExist:
+ if strict:
+ raise HistoryError("The class %s has no pk %d" % (
+ str(field.rel.to), val))
+ setattr(item, k, None)
+ item.pk = self.pk
+ return item
+
+ @property
+ def last_edition_date(self):
+ try:
+ return self.history.order_by('-history_date').all()[0].history_date
+ except (AttributeError, IndexError):
+ return
+
+ @property
+ def history_creation_date(self):
+ try:
+ return self.history.order_by('history_date').all()[0].history_date
+ except (AttributeError, IndexError):
+ return
+
+ def rollback(self, date):
+ """
+ Rollback to a previous state
+ """
+ to_del, new_item = [], None
+ for item in self.history.all():
+ if item.history_date == date:
+ new_item = item
+ break
+ to_del.append(item)
+ if not new_item:
+ raise HistoryError("The date to rollback to doesn't exist.")
+ try:
+ field_keys = [f.name for f in self._meta.fields]
+ for k in field_keys:
+ if k != 'id' and hasattr(self, k):
+ if not hasattr(new_item, k):
+ k = k + "_id"
+ setattr(self, k, getattr(new_item, k))
+
+ try:
+ self.history_modifier = User.objects.get(
+ pk=new_item.history_modifier_id)
+ except User.ObjectDoesNotExist:
+ pass
+ self.save()
+ saved_m2m = new_item.history_m2m.copy()
+ for hist_key in self.HISTORICAL_M2M:
+ # after each association m2m is rewrite - force the original
+ # to be reset
+ new_item.history_m2m = saved_m2m
+ values = new_item.m2m_listing(hist_key, create=True) or []
+ hist_field = getattr(self, hist_key)
+ hist_field.clear()
+ for val in values:
+ hist_field.add(val)
+ # force label regeneration
+ self._cached_label_checked = False
+ self.save()
+ except ObjectDoesNotExist:
+ raise HistoryError("The rollback has failed.")
+ # clean the obsolete history
+ for historized_item in to_del:
+ historized_item.delete()
+
+ def m2m_listing(self, key):
+ return getattr(self, key).all()
+
+ def values(self):
+ values = {}
+ for f in self._meta.fields:
+ k = f.name
+ if k != 'id':
+ values[k] = getattr(self, k)
+ return values
+
+ def get_absolute_url(self):
+ try:
+ return reverse('display-item', args=[self.SLUG, self.pk])
+ except NoReverseMatch:
+ return
+
+ def get_show_url(self):
+ show_url = self.SHOW_URL
+ if not show_url:
+ show_url = 'show-' + self.__class__.__name__.lower()
+ try:
+ return reverse(show_url, args=[self.pk, ''])
+ except NoReverseMatch:
+ return
+
+ @property
+ def associated_filename(self):
+ if [True for attr in ('get_town_label', 'get_department', 'reference',
+ 'short_class_name') if not hasattr(self, attr)]:
+ return ''
+ items = [slugify(self.get_department()),
+ slugify(self.get_town_label()).upper(),
+ slugify(self.short_class_name),
+ slugify(self.reference),
+ slugify(self.name or '').replace('-', '_').capitalize()]
+ last_edition_date = self.last_edition_date
+ if last_edition_date:
+ items.append(last_edition_date.strftime('%Y%m%d'))
+ else:
+ items.append('00000000')
+ return "-".join([str(item) for item in items])
+
+ def save(self, *args, **kwargs):
+ created = not self.pk
+ if not getattr(self, 'skip_history_when_saving', False):
+ assert hasattr(self, 'history_modifier')
+ if created:
+ self.history_creator = self.history_modifier
+ # external ID can have related item not available before save
+ external_id_updated = kwargs.pop('external_id_updated') \
+ if 'external_id_updated' in kwargs else False
+ if not created and not external_id_updated:
+ self.update_external_id()
+ super(BaseHistorizedItem, self).save(*args, **kwargs)
+ if created and self.update_external_id():
+ # force resave for external ID creation
+ self.skip_history_when_saving = True
+ self._updated_id = True
+ return self.save(external_id_updated=True)
+ for dep in self.EXTERNAL_ID_DEPENDENCIES:
+ for obj in getattr(self, dep).all():
+ obj.update_external_id(save=True)
+ self.fix_associated()
+ return True
+
+
+class LightHistorizedItem(BaseHistorizedItem):
+ history_date = models.DateTimeField(default=datetime.datetime.now)
+
+ class Meta:
+ abstract = True
+
+ def save(self, *args, **kwargs):
+ super(LightHistorizedItem, self).save(*args, **kwargs)
+ return self
+
+
+class OwnPerms(object):
+ """
+ Manage special permissions for object's owner
+ """
+
+ @classmethod
+ def get_query_owns(cls, ishtaruser):
+ """
+ Query object to get own items
+ """
+ return None # implement for each object
+
+ def can_view(self, request):
+ if hasattr(self, "LONG_SLUG"):
+ perm = "view_" + self.LONG_SLUG
+ else:
+ perm = "view_" + self.SLUG
+ return self.can_do(request, perm)
+
+ def can_do(self, request, action_name):
+ """
+ Check permission availability for the current object.
+ :param request: request object
+ :param action_name: action name eg: "change_find" - "own" variation is
+ checked
+ :return: boolean
+ """
+ if not getattr(request.user, 'ishtaruser', None):
+ return False
+ splited = action_name.split('_')
+ action_own_name = splited[0] + '_own_' + '_'.join(splited[1:])
+ user = request.user
+ if action_own_name == "view_own_findbasket":
+ action_own_name = "view_own_find"
+ return user.ishtaruser.has_right(action_name, request.session) or \
+ (user.ishtaruser.has_right(action_own_name, request.session)
+ and self.is_own(user.ishtaruser))
+
+ def is_own(self, user, alt_query_own=None):
+ """
+ Check if the current object is owned by the user
+ """
+ IshtarUser = apps.get_model("ishtar_common", "IshtarUser")
+ if isinstance(user, IshtarUser):
+ ishtaruser = user
+ elif hasattr(user, 'ishtaruser'):
+ ishtaruser = user.ishtaruser
+ else:
+ return False
+ if not alt_query_own:
+ query = self.get_query_owns(ishtaruser)
+ else:
+ query = getattr(self, alt_query_own)(ishtaruser)
+ if not query:
+ return False
+ query &= Q(pk=self.pk)
+ return self.__class__.objects.filter(query).count()
+
+ @classmethod
+ def has_item_of(cls, user):
+ """
+ Check if the user own some items
+ """
+ IshtarUser = apps.get_model("ishtar_common", "IshtarUser")
+ if isinstance(user, IshtarUser):
+ ishtaruser = user
+ elif hasattr(user, 'ishtaruser'):
+ ishtaruser = user.ishtaruser
+ else:
+ return False
+ query = cls.get_query_owns(ishtaruser)
+ if not query:
+ return False
+ return cls.objects.filter(query).count()
+
+ @classmethod
+ def _return_get_owns(cls, owns, values, get_short_menu_class,
+ label_key='cached_label'):
+ if not owns:
+ return []
+ sorted_values = []
+ if hasattr(cls, 'BASKET_MODEL'):
+ owns_len = len(owns)
+ for idx, item in enumerate(reversed(owns)):
+ if get_short_menu_class:
+ item = item[0]
+ if type(item) == cls.BASKET_MODEL:
+ basket = owns.pop(owns_len - idx - 1)
+ sorted_values.append(basket)
+ sorted_values = list(reversed(sorted_values))
+ if not values:
+ if not get_short_menu_class:
+ return sorted_values + list(
+ sorted(owns, key=lambda x: getattr(x, label_key) or ""))
+ return sorted_values + list(
+ sorted(owns, key=lambda x: getattr(x[0], label_key) or ""))
+ if not get_short_menu_class:
+ return sorted_values + list(
+ sorted(owns, key=lambda x: x[label_key] or ""))
+ return sorted_values + list(
+ sorted(owns, key=lambda x: x[0][label_key] or ""))
+
+ @classmethod
+ def get_owns(cls, user, replace_query=None, limit=None, values=None,
+ get_short_menu_class=False, menu_filtr=None):
+ """
+ Get Own items
+ """
+ if not replace_query:
+ replace_query = {}
+ if hasattr(user, 'is_authenticated') and not user.is_authenticated():
+ returned = cls.objects.filter(pk__isnull=True)
+ if values:
+ returned = []
+ return returned
+ IshtarUser = apps.get_model("ishtar_common", "IshtarUser")
+ if isinstance(user, User):
+ try:
+ ishtaruser = IshtarUser.objects.get(user_ptr=user)
+ except IshtarUser.DoesNotExist:
+ returned = cls.objects.filter(pk__isnull=True)
+ if values:
+ returned = []
+ return returned
+ elif isinstance(user, IshtarUser):
+ ishtaruser = user
+ else:
+ if values:
+ return []
+ return cls.objects.filter(pk__isnull=True)
+ items = []
+ if hasattr(cls, 'BASKET_MODEL'):
+ items = list(cls.BASKET_MODEL.objects.filter(user=ishtaruser).all())
+ query = cls.get_query_owns(ishtaruser)
+ if not query and not replace_query:
+ returned = cls.objects.filter(pk__isnull=True)
+ if values:
+ returned = []
+ return returned
+ if query:
+ q = cls.objects.filter(query)
+ else: # replace_query
+ q = cls.objects.filter(replace_query)
+ if values:
+ q = q.values(*values)
+ if limit:
+ items += list(q.order_by('-pk')[:limit])
+ else:
+ items += list(q.order_by(*cls._meta.ordering).all())
+ if get_short_menu_class:
+ if values:
+ if 'id' not in values:
+ raise NotImplementedError(
+ "Call of get_owns with get_short_menu_class option and"
+ " no 'id' in values is not implemented")
+ my_items = []
+ for i in items:
+ if hasattr(cls, 'BASKET_MODEL') and \
+ type(i) == cls.BASKET_MODEL:
+ dct = dict([(k, getattr(i, k)) for k in values])
+ my_items.append(
+ (dct, cls.BASKET_MODEL.get_short_menu_class(i.pk)))
+ else:
+ my_items.append((i, cls.get_short_menu_class(i['id'])))
+ items = my_items
+ else:
+ items = [(i, cls.get_short_menu_class(i.pk)) for i in items]
+ return items
+
+ @classmethod
+ def _get_query_owns_dicts(cls, ishtaruser):
+ """
+ List of query own dict to construct the query.
+ Each dict are join with an AND operator, each dict key, values are
+ joined with OR operator
+ """
+ return []
+
+ @classmethod
+ def _construct_query_own(cls, prefix, dct_list):
+ q = None
+ for subquery_dict in dct_list:
+ subquery = None
+ for k in subquery_dict:
+ subsubquery = Q(**{prefix + k: subquery_dict[k]})
+ if subquery:
+ subquery |= subsubquery
+ else:
+ subquery = subsubquery
+ if not subquery:
+ continue
+ if q:
+ q &= subquery
+ else:
+ q = subquery
+ return q
+
+
+class NumberManager(models.Manager):
+ def get_by_natural_key(self, number):
+ return self.get(number=number)
+
+
+class State(models.Model):
+ label = models.CharField(_("Label"), max_length=30)
+ number = models.CharField(_("Number"), unique=True, max_length=3)
+ objects = NumberManager()
+
+ class Meta:
+ verbose_name = _("State")
+ ordering = ['number']
+
+ def __str__(self):
+ return self.label
+
+ def natural_key(self):
+ return (self.number,)
+
+
+class Department(models.Model):
+ label = models.CharField(_("Label"), max_length=30)
+ number = models.CharField(_("Number"), unique=True, max_length=3)
+ state = models.ForeignKey(
+ 'State', verbose_name=_("State"), blank=True, null=True,
+ on_delete=models.SET_NULL,
+ )
+ objects = NumberManager()
+
+ class Meta:
+ verbose_name = _("Department")
+ verbose_name_plural = _("Departments")
+ ordering = ['number']
+
+ def __str__(self):
+ return self.label
+
+ def natural_key(self):
+ return (self.number,)
+
+ def history_compress(self):
+ return self.number
+
+ @classmethod
+ def history_decompress(cls, full_value, create=False):
+ if not full_value:
+ return []
+ res = []
+ for value in full_value:
+ try:
+ res.append(cls.objects.get(number=value))
+ except cls.DoesNotExist:
+ continue
+ return res
+
+
+class Arrondissement(models.Model):
+ name = models.CharField("Nom", max_length=30)
+ department = models.ForeignKey(Department, verbose_name="Département")
+
+ def __str__(self):
+ return settings.JOINT.join((self.name, str(self.department)))
+
+
+class Canton(models.Model):
+ name = models.CharField("Nom", max_length=30)
+ arrondissement = models.ForeignKey(Arrondissement,
+ verbose_name="Arrondissement")
+
+ def __str__(self):
+ return settings.JOINT.join(
+ (self.name, str(self.arrondissement)))
+
+
+class TownManager(models.GeoManager):
+ def get_by_natural_key(self, numero_insee, year):
+ return self.get(numero_insee=numero_insee, year=year)
+
+
+class Town(Imported, models.Model):
+ name = models.CharField(_("Name"), max_length=100)
+ surface = models.IntegerField(_("Surface (m2)"), blank=True, null=True)
+ center = models.PointField(_("Localisation"), srid=settings.SRID,
+ blank=True, null=True)
+ limit = models.MultiPolygonField(_("Limit"), blank=True, null=True)
+ numero_insee = models.CharField("Code commune (numéro INSEE)",
+ max_length=120)
+ departement = models.ForeignKey(
+ Department, verbose_name=_("Department"),
+ on_delete=models.SET_NULL, null=True, blank=True)
+ year = models.IntegerField(
+ _("Year of creation"), null=True, blank=True,
+ help_text=_("Filling this field is relevant to distinguish old towns "
+ "from new towns."))
+ children = models.ManyToManyField(
+ 'Town', verbose_name=_("Town children"), blank=True,
+ related_name='parents')
+ cached_label = models.CharField(_("Cached name"), max_length=500,
+ null=True, blank=True, db_index=True)
+ objects = TownManager()
+
+ class Meta:
+ verbose_name = _("Town")
+ verbose_name_plural = _("Towns")
+ if settings.COUNTRY == 'fr':
+ ordering = ['numero_insee']
+ unique_together = (('numero_insee', 'year'),)
+
+ def natural_key(self):
+ return (self.numero_insee, self.year)
+
+ def history_compress(self):
+ values = {'numero_insee': self.numero_insee,
+ 'year': self.year or ""}
+ return values
+
+ def get_values(self, prefix='', no_values=False, no_base_finds=True):
+ return {
+ prefix or "label": str(self),
+ prefix + "name": self.name,
+ prefix + "numero_insee": self.numero_insee
+ }
+
+ @classmethod
+ def history_decompress(cls, full_value, create=False):
+ if not full_value:
+ return []
+ res = []
+ for value in full_value:
+ try:
+ res.append(
+ cls.objects.get(numero_insee=value['numero_insee'],
+ year=value['year'] or None))
+ except cls.DoesNotExist:
+ continue
+ return res
+
+ def __str__(self):
+ return self.cached_label or ""
+
+ @property
+ def label_with_areas(self):
+ label = [self.name]
+ if self.numero_insee:
+ label.append("({})".format(self.numero_insee))
+ for area in self.areas.all():
+ label.append(" - ")
+ label.append(area.full_label)
+ return " ".join(label)
+
+ def generate_geo(self, force=False):
+ force = self.generate_limit(force=force)
+ self.generate_center(force=force)
+ self.generate_area(force=force)
+
+ def generate_limit(self, force=False):
+ if not force and self.limit:
+ return
+ parents = None
+ if not self.parents.count():
+ return
+ for parent in self.parents.all():
+ if not parent.limit:
+ return
+ if not parents:
+ parents = parent.limit
+ else:
+ parents = parents.union(parent.limit)
+ # if union is a simple polygon make it a multi
+ if 'MULTI' not in parents.wkt:
+ parents = parents.wkt.replace('POLYGON', 'MULTIPOLYGON(') + ")"
+ if not parents:
+ return
+ self.limit = parents
+ self.save()
+ return True
+
+ def generate_center(self, force=False):
+ if not force and (self.center or not self.limit):
+ return
+ self.center = self.limit.centroid
+ if not self.center:
+ return False
+ self.save()
+ return True
+
+ def generate_area(self, force=False):
+ if not force and (self.surface or not self.limit):
+ return
+ surface = self.limit.transform(settings.SURFACE_SRID,
+ clone=True).area
+ if surface > 214748364 or not surface:
+ return False
+ self.surface = surface
+ self.save()
+ return True
+
+ def update_town_code(self):
+ if not self.numero_insee or not self.children.count() or not self.year:
+ return
+ old_num = self.numero_insee[:]
+ numero = old_num.split('-')[0]
+ self.numero_insee = "{}-{}".format(numero, self.year)
+ if self.numero_insee != old_num:
+ return True
+
+ def _generate_cached_label(self):
+ cached_label = self.name
+ if settings.COUNTRY == "fr" and self.numero_insee:
+ dpt_len = 2
+ if self.numero_insee.startswith('97') or \
+ self.numero_insee.startswith('98') or \
+ self.numero_insee[0] not in ('0', '1', '2', '3', '4', '5',
+ '6', '7', '8', '9'):
+ dpt_len = 3
+ cached_label = "%s - %s" % (self.name, self.numero_insee[:dpt_len])
+ if self.year and self.children.count():
+ cached_label += " ({})".format(self.year)
+ return cached_label
+
+
+def post_save_town(sender, **kwargs):
+ cached_label_changed(sender, **kwargs)
+ town = kwargs['instance']
+ town.generate_geo()
+ if town.update_town_code():
+ town.save()
+
+
+post_save.connect(post_save_town, sender=Town)
+
+
+def town_child_changed(sender, **kwargs):
+ town = kwargs['instance']
+ if town.update_town_code():
+ town.save()
+
+
+m2m_changed.connect(town_child_changed, sender=Town.children.through)
+
+
+class Address(BaseHistorizedItem):
+ FIELDS = (
+ "address", "address_complement", "postal_code", "town",
+ "precise_town", "country",
+ "alt_address", "alt_address_complement", "alt_postal_code", "alt_town",
+ "alt_country",
+ "phone", "phone_desc", "phone2", "phone_desc2", "phone3", "phone_desc3",
+ "raw_phone", "mobile_phone", "email", "alt_address_is_prefered"
+ )
+ address = models.TextField(_("Address"), null=True, blank=True)
+ address_complement = models.TextField(_("Address complement"), null=True,
+ blank=True)
+ postal_code = models.CharField(_("Postal code"), max_length=10, null=True,
+ blank=True)
+ town = models.CharField(_("Town (freeform)"), max_length=150, null=True,
+ blank=True)
+ precise_town = models.ForeignKey(
+ Town, verbose_name=_("Town (precise)"), null=True,
+ blank=True)
+ country = models.CharField(_("Country"), max_length=30, null=True,
+ blank=True)
+ alt_address = models.TextField(_("Other address: address"), null=True,
+ blank=True)
+ alt_address_complement = models.TextField(
+ _("Other address: address complement"), null=True, blank=True)
+ alt_postal_code = models.CharField(_("Other address: postal code"),
+ max_length=10, null=True, blank=True)
+ alt_town = models.CharField(_("Other address: town"), max_length=70,
+ null=True, blank=True)
+ alt_country = models.CharField(_("Other address: country"),
+ max_length=30, null=True, blank=True)
+ phone = models.CharField(_("Phone"), max_length=18, null=True, blank=True)
+ phone_desc = models.CharField(_("Phone description"), max_length=300,
+ null=True, blank=True)
+ phone2 = models.CharField(_("Phone description 2"), max_length=18,
+ null=True, blank=True)
+ phone_desc2 = models.CharField(_("Phone description 2"), max_length=300,
+ null=True, blank=True)
+ phone3 = models.CharField(_("Phone 3"), max_length=18, null=True,
+ blank=True)
+ phone_desc3 = models.CharField(_("Phone description 3"), max_length=300,
+ null=True, blank=True)
+ raw_phone = models.TextField(_("Raw phone"), blank=True, null=True)
+ mobile_phone = models.CharField(_("Mobile phone"), max_length=18,
+ null=True, blank=True)
+ email = models.EmailField(
+ _("Email"), max_length=300, blank=True, null=True)
+ alt_address_is_prefered = models.BooleanField(
+ _("Alternative address is prefered"), default=False)
+ history = HistoricalRecords(inherit=True)
+ SUB_ADDRESSES = []
+
+ class Meta:
+ abstract = True
+
+ def get_short_html_items(self):
+ items = []
+ if self.address:
+ items.append(
+ """<span class="subadress">{}</span>""".format(self.address))
+ if self.address_complement:
+ items.append(
+ """<span class="subadress-complement">{}</span>""".format(
+ self.address_complement))
+ if self.postal_code:
+ items.append(
+ """<span class="postal-code">{}</span>""".format(
+ self.postal_code))
+ if self.precise_town:
+ items.append(
+ """<span class="town">{}</span>""".format(
+ self.precise_town.name))
+ elif self.town:
+ items.append(
+ """<span class="town">{}</span>""".format(
+ self.town))
+ if self.country:
+ items.append(
+ """<span class="country">{}</span>""".format(
+ self.country))
+ return items
+
+ def get_short_html_detail(self):
+ html = """<div class="address">"""
+ items = self.get_short_html_items()
+ if not items:
+ items = [
+ "<span class='no-address'>{}</span>".format(
+ _("No associated address")
+ )
+ ]
+ html += "".join(items)
+ html += """</div>"""
+ return html
+
+ def get_town_centroid(self):
+ if self.precise_town:
+ return self.precise_town.center, self._meta.verbose_name
+ for sub_address in self.SUB_ADDRESSES:
+ sub_item = getattr(self, sub_address)
+ if sub_item and sub_item.precise_town:
+ return sub_item.precise_town.center, sub_item._meta.verbose_name
+
+ def get_town_polygons(self):
+ if self.precise_town:
+ return self.precise_town.limit, self._meta.verbose_name
+ for sub_address in self.SUB_ADDRESSES:
+ sub_item = getattr(self, sub_address)
+ if sub_item and sub_item.precise_town:
+ return sub_item.precise_town.limit, sub_item._meta.verbose_name
+
+ def get_attribute(self, attr):
+ if self.town or self.precise_town:
+ return getattr(self, attr)
+ for sub_address in self.SUB_ADDRESSES:
+ sub_item = getattr(self, sub_address)
+ if not sub_item:
+ continue
+ if sub_item.town or sub_item.precise_town:
+ return getattr(sub_item, attr)
+ return getattr(self, attr)
+
+ def get_address(self):
+ return self.get_attribute("address")
+
+ def get_address_complement(self):
+ return self.get_attribute("address_complement")
+
+ def get_postal_code(self):
+ return self.get_attribute("postal_code")
+
+ def get_town(self):
+ return self.get_attribute("town")
+
+ def get_precise_town(self):
+ return self.get_attribute("precise_town")
+
+ def get_country(self):
+ return self.get_attribute("country")
+
+ def simple_lbl(self):
+ return str(self)
+
+ def full_address(self):
+ lbl = self.simple_lbl()
+ if lbl:
+ lbl += "\n"
+ lbl += self.address_lbl()
+ return lbl
+
+ def address_lbl(self):
+ lbl = u''
+ prefix = ''
+ if self.alt_address_is_prefered:
+ prefix = 'alt_'
+ if getattr(self, prefix + 'address'):
+ lbl += getattr(self, prefix + 'address')
+ if getattr(self, prefix + 'address_complement'):
+ if lbl:
+ lbl += "\n"
+ lbl += getattr(self, prefix + 'address_complement')
+ postal_code = getattr(self, prefix + 'postal_code')
+ town = getattr(self, prefix + 'town')
+ if postal_code or town:
+ if lbl:
+ lbl += "\n"
+ lbl += "{}{}{}".format(
+ postal_code or '',
+ " " if postal_code and town else '',
+ town or '')
+ if self.phone:
+ if lbl:
+ lbl += "\n"
+ lbl += "{} {}".format(str(_("Tel: ")), self.phone)
+ if self.mobile_phone:
+ if lbl:
+ lbl += "\n"
+ lbl += "{} {}".format(str(_("Mobile: ")), self.mobile_phone)
+ if self.email:
+ if lbl:
+ lbl += "\n"
+ lbl += "{} {}".format(str(_("Email: ")), self.email)
+ return lbl
+
+
+class Merge(models.Model):
+ merge_key = models.TextField(_("Merge key"), blank=True, null=True)
+ merge_candidate = models.ManyToManyField("self", blank=True)
+ merge_exclusion = models.ManyToManyField("self", blank=True)
+ archived = models.NullBooleanField(default=False,
+ blank=True, null=True)
+ # 1 for one word similarity, 2 for two word similarity, etc.
+ MERGE_CLEMENCY = None
+ EMPTY_MERGE_KEY = '--'
+ MERGE_ATTRIBUTE = "name"
+
+ class Meta:
+ abstract = True
+
+ def generate_merge_key(self):
+ if self.archived:
+ return
+ merge_attr = getattr(self, self.MERGE_ATTRIBUTE)
+ self.merge_key = slugify(merge_attr if merge_attr else '')
+ if not self.merge_key:
+ self.merge_key = self.EMPTY_MERGE_KEY
+ self.merge_key = self.merge_key
+
+ def generate_merge_candidate(self):
+ if self.archived:
+ return
+ if not self.merge_key:
+ self.generate_merge_key()
+ self.save(merge_key_generated=True)
+ if not self.pk or self.merge_key == self.EMPTY_MERGE_KEY:
+ return
+ q = self.__class__.objects \
+ .exclude(pk=self.pk) \
+ .exclude(merge_exclusion=self) \
+ .exclude(merge_candidate=self) \
+ .exclude(archived=True)
+ if not self.MERGE_CLEMENCY:
+ q = q.filter(merge_key=self.merge_key)
+ else:
+ subkeys_front = "-".join(
+ self.merge_key.split('-')[:self.MERGE_CLEMENCY])
+ subkeys_back = "-".join(
+ self.merge_key.split('-')[-self.MERGE_CLEMENCY:])
+ q = q.filter(Q(merge_key__istartswith=subkeys_front) |
+ Q(merge_key__iendswith=subkeys_back))
+ for item in q.all():
+ self.merge_candidate.add(item)
+
+ def save(self, *args, **kwargs):
+ # prevent circular save
+ merge_key_generated = False
+ if 'merge_key_generated' in kwargs:
+ merge_key_generated = kwargs.pop('merge_key_generated')
+ self.generate_merge_key()
+ item = super(Merge, self).save(*args, **kwargs)
+ if not merge_key_generated:
+ self.merge_candidate.clear()
+ self.generate_merge_candidate()
+ return item
+
+ def archive(self):
+ self.archived = True
+ self.save()
+ self.merge_candidate.clear()
+ self.merge_exclusion.clear()
+
+ def merge(self, item, keep_old=False, exclude_fields=None):
+ merge_model_objects(self, item, keep_old=keep_old,
+ exclude_fields=exclude_fields)
+ self.generate_merge_candidate()
+
+
+
+def __get_stats_cache_values(model_name, model_pk):
+ StatsCache = apps.get_model("ishtar_common", "StatsCache")
+ q = StatsCache.objects.filter(
+ model=model_name, model_pk=model_pk
+ )
+ nb = q.count()
+ if nb >= 1:
+ sc = q.all()[0]
+ for extra in q.order_by("-id").all()[1:]:
+ extra.delete()
+ else:
+ sc = StatsCache.objects.create(
+ model=model_name, model_pk=model_pk
+ )
+ values = sc.values
+ if not values:
+ values = {}
+ return sc, values
+
+
+@task()
+def _update_stats(app, model, model_pk, funcname):
+ model_name = app + "." + model
+ model = apps.get_model(app, model)
+ try:
+ item = model.objects.get(pk=model_pk)
+ except model.DoesNotExist:
+ return
+ value = getattr(item, funcname)()
+ sc, current_values = __get_stats_cache_values(model_name, model_pk)
+ current_values[funcname] = value
+ sc.values = current_values
+ sc.update_requested = None
+ sc.updated = datetime.datetime.now()
+ sc.save()
+
+def update_stats(statscache, item, funcname):
+ if not settings.USE_BACKGROUND_TASK:
+ current_values = statscache.values
+ if not current_values:
+ current_values = {}
+ value = getattr(item, funcname)()
+ current_values[funcname] = value
+ statscache.values = current_values
+ statscache.updated = datetime.datetime.now()
+ statscache.save()
+ return current_values
+
+ now = datetime.datetime.now()
+ app_name = item._meta.app_label
+ model_name = item._meta.model_name
+ statscache.update_requested = now.isoformat()
+ statscache.save()
+ _update_stats.delay(app_name, model_name, item.pk, funcname)
+ return statscache.values
+
+
+class DashboardFormItem:
+ """
+ Provide methods to manage statistics
+ """
+
+ def last_stats_update(self):
+ model_name = self._meta.app_label + "." + self._meta.model_name
+ StatsCache = apps.get_model("ishtar_common", "StatsCache")
+ q = StatsCache.objects.filter(
+ model=model_name, model_pk=self.pk).order_by("-updated")
+ if not q.count():
+ return
+ return q.all()[0].updated
+
+ def _get_or_set_stats(self, funcname, update=False,
+ expected_type=None):
+ model_name = self._meta.app_label + "." + self._meta.model_name
+ StatsCache = apps.get_model("ishtar_common", "StatsCache")
+ sc, __ = StatsCache.objects.get_or_create(
+ model=model_name, model_pk=self.pk
+ )
+ if not update:
+ values = sc.values
+ if funcname not in values:
+ if expected_type is not None:
+ return expected_type()
+ return 0
+ else:
+ values = update_stats(sc, self, funcname)
+ if funcname in values:
+ values = values[funcname]
+ else:
+ values = 0
+ if expected_type is not None and not isinstance(values, expected_type):
+ return expected_type()
+ return values
+
+ @classmethod
+ def get_periods(cls, slice='month', fltr={}, date_source='creation'):
+ date_var = date_source + '_date'
+ q = cls.objects.filter(**{date_var + '__isnull': False})
+ if fltr:
+ q = q.filter(**fltr)
+ if slice == 'year':
+ return [res[date_var].year for res in list(q.values(date_var)
+ .annotate(
+ Count("id")).order_by())]
+ elif slice == 'month':
+ return [(res[date_var].year, res[date_var].month)
+ for res in list(q.values(date_var)
+ .annotate(Count("id")).order_by())]
+ return []
+
+ @classmethod
+ def get_by_year(cls, year, fltr={}, date_source='creation'):
+ date_var = date_source + '_date'
+ q = cls.objects.filter(**{date_var + '__isnull': False})
+ if fltr:
+ q = q.filter(**fltr)
+ return q.filter(
+ **{date_var + '__year': year}).order_by('pk').distinct('pk')
+
+ @classmethod
+ def get_by_month(cls, year, month, fltr={}, date_source='creation'):
+ date_var = date_source + '_date'
+ q = cls.objects.filter(**{date_var + '__isnull': False})
+ if fltr:
+ q = q.filter(**fltr)
+ q = q.filter(
+ **{date_var + '__year': year, date_var + '__month': month})
+ return q.order_by('pk').distinct('pk')
+
+ @classmethod
+ def get_total_number(cls, fltr=None):
+ q = cls.objects
+ if fltr:
+ q = q.filter(**fltr)
+ return q.order_by('pk').distinct('pk').count()
+
+
+class DocumentItem:
+ ALT_NAMES = {
+ 'documents__image__isnull':
+ SearchAltName(
+ pgettext_lazy("key for text search", "has-image"),
+ 'documents__image__isnull'),
+ 'documents__associated_url__isnull':
+ SearchAltName(
+ pgettext_lazy("key for text search", "has-url"),
+ 'documents__associated_url__isnull'),
+ 'documents__associated_file__isnull':
+ SearchAltName(
+ pgettext_lazy("key for text search", "has-attached-file"),
+ 'documents__associated_file__isnull'),
+ }
+
+ def public_representation(self):
+ images = []
+ if getattr(self, "main_image", None):
+ images.append(self.main_image.public_representation())
+ images += [
+ image.public_representation()
+ for image in self.images_without_main_image.all()
+ ]
+ return {"images": images}
+
+ @property
+ def images(self):
+ if not hasattr(self, 'documents'):
+ Document = apps.get_model("ishtar_common", "Document")
+ return Document.objects.none()
+ return self.documents.filter(
+ image__isnull=False).exclude(image="").order_by("pk")
+
+ @property
+ def images_without_main_image(self):
+ if not hasattr(self, 'main_image') or not hasattr(self, 'documents'):
+ return self.images
+ if not self.main_image:
+ return self.documents.filter(
+ image__isnull=False).exclude(
+ image="").order_by("pk")
+ return self.documents.filter(
+ image__isnull=False).exclude(
+ image="").exclude(pk=self.main_image.pk).order_by("pk")
+
+ def get_extra_actions(self, request):
+ """
+ For sheet template: return "Add document / image" action
+ """
+ # url, base_text, icon, extra_text, extra css class, is a quick action
+ try:
+ actions = super(DocumentItem, self).get_extra_actions(request)
+ except AttributeError:
+ actions = []
+
+ if not hasattr(self, 'SLUG'):
+ return actions
+
+ can_add_doc = self.can_do(request, 'add_document')
+ if can_add_doc and (
+ not hasattr(self, "is_locked") or
+ not self.is_locked(request.user)):
+ actions += [
+ (
+ reverse("create-document") + "?{}={}".format(
+ self.SLUG, self.pk),
+ _("Add document/image"),
+ "fa fa-plus",
+ _("doc./image"),
+ "",
+ False
+ )
+ ]
+ return actions
+
+
+def document_attached_changed(sender, **kwargs):
+ # associate a default main image
+ instance = kwargs.get("instance", None)
+ model = kwargs.get("model", None)
+ pk_set = kwargs.get("pk_set", None)
+ if not instance or not model:
+ return
+
+ if hasattr(instance, "documents"):
+ items = [instance]
+ else:
+ if not pk_set:
+ return
+ try:
+ items = [model.objects.get(pk=pk) for pk in pk_set]
+ except model.DoesNotExist:
+ return
+
+ for item in items:
+ q = item.documents.filter(
+ image__isnull=False).exclude(image='')
+ if item.main_image:
+ if q.filter(pk=item.main_image.pk).count():
+ return
+ # the association has disappear not the main image anymore
+ item.main_image = None
+ item.skip_history_when_saving = True
+ item.save()
+ if not q.count():
+ return
+ # by default get the lowest pk
+ item.main_image = q.order_by('pk').all()[0]
+ item.skip_history_when_saving = True
+ item.save()
+
+
+class QuickAction:
+ """
+ Quick action available from tables
+ """
+
+ def __init__(self, url, icon_class='', text='', target=None, rights=None,
+ module=None):
+ self.url = url
+ self.icon_class = icon_class
+ self.text = text
+ self.rights = rights
+ self.target = target
+ self.module = module
+ assert self.target in ('one', 'many', None)
+
+ def is_available(self, user, session=None, obj=None):
+ if self.module and not getattr(get_current_profile(), self.module):
+ return False
+ if not self.rights: # no restriction
+ return True
+ if not user or not hasattr(user, 'ishtaruser') or not user.ishtaruser:
+ return False
+ user = user.ishtaruser
+
+ for right in self.rights:
+ if user.has_perm(right, session=session, obj=obj):
+ return True
+ return False
+
+ @property
+ def rendered_icon(self):
+ if not self.icon_class:
+ return ""
+ return "<i class='{}' aria-hidden='true'></i>".format(self.icon_class)
+
+ @property
+ def base_url(self):
+ if self.target is None:
+ url = reverse(self.url)
+ else:
+ # put arbitrary pk for the target
+ url = reverse(self.url, args=[0])
+ url = url[:-2] # all quick action url have to finish with the
+ # pk of the selected item and a "/"
+ return url
+
+
+class DynamicRequest:
+ def __init__(self, label, app_name, model_name, form_key, search_key,
+ type_query, search_query):
+ self.label = label
+ self.form_key = form_key
+ self.search_key = search_key
+ self.app_name = app_name
+ self.model_name = model_name
+ self.type_query = type_query
+ self.search_query = search_query
+
+ def get_all_types(self):
+ model = apps.get_app_config(self.app_name).get_model(self.model_name)
+ return model.objects.filter(available=True)
+
+ def get_form_fields(self):
+ fields = {}
+ for item in self.get_all_types().all():
+ fields[self.form_key + "-" + item.txt_idx] = forms.CharField(
+ label=str(self.label) + " " + str(item),
+ required=False
+ )
+ return fields
+
+ def get_extra_query(self, slug):
+ return {
+ self.type_query: slug
+ }
+
+ def get_alt_names(self):
+ alt_names = {}
+ for item in self.get_all_types().all():
+ alt_names[self.form_key + "-" + item.txt_idx] = SearchAltName(
+ self.search_key + "-" + item.txt_idx, self.search_query,
+ self.get_extra_query(item.txt_idx), distinct_query=True
+ )
+ return alt_names
+
+
+class SpatialReferenceSystem(GeneralType):
+ order = models.IntegerField(_("Order"), default=10)
+ auth_name = models.CharField(
+ _("Authority name"), default=u'EPSG', max_length=256)
+ srid = models.IntegerField(_("Authority SRID"))
+
+ class Meta:
+ verbose_name = _("Spatial reference system")
+ verbose_name_plural = _("Spatial reference systems")
+ ordering = ('label',)
+
+
+post_save.connect(post_save_cache, sender=SpatialReferenceSystem)
+post_delete.connect(post_save_cache, sender=SpatialReferenceSystem)
+
+
+class GeoItem(models.Model):
+ GEO_SOURCE = (
+ ('T', _("Town")), ('P', _("Precise")), ('M', _("Polygon"))
+ )
+
+ # gis
+ x = models.FloatField(_(u'X'), blank=True, null=True)
+ y = models.FloatField(_(u'Y'), blank=True, null=True)
+ z = models.FloatField(_(u'Z'), blank=True, null=True)
+ estimated_error_x = models.FloatField(_(u'Estimated error for X'),
+ blank=True, null=True)
+ estimated_error_y = models.FloatField(_(u'Estimated error for Y'),
+ blank=True, null=True)
+ estimated_error_z = models.FloatField(_(u'Estimated error for Z'),
+ blank=True, null=True)
+ spatial_reference_system = models.ForeignKey(
+ SpatialReferenceSystem, verbose_name=_("Spatial Reference System"),
+ blank=True, null=True)
+ point = models.PointField(_("Point"), blank=True, null=True, dim=3)
+ point_2d = models.PointField(_("Point (2D)"), blank=True, null=True)
+ point_source = models.CharField(
+ _("Point source"), choices=GEO_SOURCE, max_length=1, blank=True,
+ null=True)
+ point_source_item = models.CharField(
+ _("Point source item"), max_length=100, blank=True, null=True)
+ multi_polygon = models.MultiPolygonField(_("Multi polygon"), blank=True,
+ null=True)
+ multi_polygon_source = models.CharField(
+ _("Multi-polygon source"), choices=GEO_SOURCE, max_length=1,
+ blank=True, null=True)
+ multi_polygon_source_item = models.CharField(
+ _("Multi polygon source item"), max_length=100, blank=True, null=True)
+
+ GEO_LABEL = ""
+
+ class Meta:
+ abstract = True
+
+ def get_town_centroid(self):
+ raise NotImplementedError
+
+ def get_town_polygons(self):
+ raise NotImplementedError
+
+ @property
+ def display_coordinates(self):
+ if not self.point_2d:
+ return ""
+ profile = get_current_profile()
+ if not profile.display_srs or not profile.display_srs.srid:
+ return self.x, self.y
+ point = self.point_2d.transform(profile.display_srs.srid, clone=True)
+ return round(point.x, 5), round(point.y, 5)
+
+ @property
+ def display_spatial_reference_system(self):
+ profile = get_current_profile()
+ if not profile.display_srs or not profile.display_srs.srid:
+ return self.spatial_reference_system
+ return profile.display_srs
+
+ def get_precise_points(self):
+ if self.point_source == 'P' and self.point_2d:
+ return self.point_2d, self.point, self.point_source_item
+
+ def get_precise_polygons(self):
+ if self.multi_polygon_source == 'P' and self.multi_polygon:
+ return self.multi_polygon, self.multi_polygon_source_item
+
+ def most_precise_geo(self):
+ if self.point_source == 'M':
+ return 'multi_polygon'
+ current_source = str(self.__class__._meta.verbose_name)
+ if self.multi_polygon_source_item == current_source \
+ and (self.multi_polygon_source == "P" or
+ self.point_source_item != current_source):
+ return 'multi_polygon'
+ if self.point_source_item == current_source \
+ and self.point_source == 'P':
+ return 'point'
+ if self.multi_polygon_source == 'P':
+ return 'multi_polygon'
+ if self.point_source == 'P':
+ return 'point'
+ if self.multi_polygon:
+ return 'multi_polygon'
+ if self.point_2d:
+ return 'point'
+
+ def geo_point_source(self):
+ if not self.point_source:
+ return ""
+ src = "{} - {}".format(
+ dict(self.GEO_SOURCE)[self.point_source],
+ self.point_source_item
+ )
+ return src
+
+ def geo_polygon_source(self):
+ if not self.multi_polygon_source:
+ return ""
+ src = "{} - {}".format(
+ dict(self.GEO_SOURCE)[self.multi_polygon_source],
+ self.multi_polygon_source_item
+ )
+ return src
+
+ def _geojson_serialize(self, geom_attr):
+ if not hasattr(self, geom_attr):
+ return ""
+ cached_label_key = 'cached_label'
+ if self.GEO_LABEL:
+ cached_label_key = self.GEO_LABEL
+ if getattr(self, "CACHED_LABELS", None):
+ cached_label_key = self.CACHED_LABELS[-1]
+ geojson = serialize(
+ 'geojson',
+ self.__class__.objects.filter(pk=self.pk),
+ geometry_field=geom_attr, fields=(cached_label_key,))
+ geojson_dct = json.loads(geojson)
+ profile = get_current_profile()
+ precision = profile.point_precision
+
+ features = geojson_dct.pop('features')
+ for idx in range(len(features)):
+ feature = features[idx]
+ lbl = feature['properties'].pop(cached_label_key)
+ feature['properties']['name'] = lbl
+ feature['properties']['id'] = self.pk
+ if precision is not None:
+ geom_type = feature["geometry"].get("type", None)
+ if geom_type == "Point":
+ feature["geometry"]["coordinates"] = [
+ round(coord, precision)
+ for coord in feature["geometry"]["coordinates"]
+ ]
+ geojson_dct['features'] = features
+ geojson_dct['link_template'] = simple_link_to_window(self).replace(
+ '999999', '<pk>'
+ )
+ geojson = json.dumps(geojson_dct)
+ return geojson
+
+ @property
+ def point_2d_geojson(self):
+ return self._geojson_serialize('point_2d')
+
+ @property
+ def multi_polygon_geojson(self):
+ return self._geojson_serialize('multi_polygon')
+
+
+class ImageContainerModel:
+ def _get_image_path(self, filename):
+ return "{}/{}".format(self._get_base_image_path(), filename)
+
+ def _get_base_image_path(self):
+ n = datetime.datetime.now()
+ return "upload/{}/{:02d}/{:02d}".format(n.year, n.month, n.day)
+
+
+class QRCodeItem(models.Model, ImageContainerModel):
+ HAS_QR_CODE = True
+ qrcode = models.ImageField(upload_to=get_image_path, blank=True, null=True,
+ max_length=255)
+
+ class Meta:
+ abstract = True
+
+ @property
+ def qrcode_path(self):
+ if not self.qrcode:
+ self.generate_qrcode()
+ if not self.qrcode: # error on qrcode generation
+ return ""
+ return self.qrcode.path
+
+ def generate_qrcode(self, request=None, secure=True, tmpdir=None):
+ url = self.get_absolute_url()
+ site = Site.objects.get_current()
+ if request:
+ scheme = request.scheme
+ else:
+ if secure:
+ scheme = "https"
+ else:
+ scheme = "http"
+ url = scheme + "://" + site.domain + url
+ TinyUrl = apps.get_model("ishtar_common", "TinyUrl")
+ tiny_url = TinyUrl()
+ tiny_url.link = url
+ tiny_url.save()
+ short_url = scheme + "://" + site.domain + reverse(
+ 'tiny-redirect', args=[tiny_url.get_short_id()])
+ qr = pyqrcode.create(short_url, version=settings.ISHTAR_QRCODE_VERSION)
+ tmpdir_created = False
+ if not tmpdir:
+ tmpdir = tempfile.mkdtemp("-qrcode")
+ tmpdir_created = True
+ filename = tmpdir + os.sep + 'qrcode.png'
+ qr.png(filename, scale=settings.ISHTAR_QRCODE_SCALE)
+ with open(filename, 'rb') as qrfile:
+ self.qrcode.save("qrcode.png", File(qrfile))
+ self.skip_history_when_saving = True
+ self._no_move = True
+ self.save()
+ if tmpdir_created:
+ shutil.rmtree(tmpdir)
+
+
+class SearchVectorConfig:
+ def __init__(self, key, language=None, func=None):
+ self.key = key
+ if language:
+ self.language = language
+ if language == "local":
+ self.language = settings.ISHTAR_SEARCH_LANGUAGE
+ else:
+ self.language = "simple"
+ self.func = func
+
+ def format(self, value):
+ if value == 'None':
+ value = ''
+ if not self.func:
+ return [value]
+ return self.func(value)
+
+
+class ShortMenuItem:
+ """
+ Item available in the short menu
+ """
+ UP_MODEL_QUERY = {}
+
+ @classmethod
+ def get_short_menu_class(cls, pk):
+ return ''
+
+ @property
+ def short_class_name(self):
+ return ""
+
+
+class MainItem(ShortMenuItem):
+ """
+ Item with quick actions available from tables
+ Extra actions are available from sheets
+ """
+ QUICK_ACTIONS = []
+
+ @classmethod
+ def get_quick_actions(cls, user, session=None, obj=None):
+ """
+ Get a list of (url, title, icon, target) actions for an user
+ """
+ qas = []
+ for action in cls.QUICK_ACTIONS:
+ if not action.is_available(user, session=session, obj=obj):
+ continue
+ qas.append([action.base_url,
+ mark_safe(action.text),
+ mark_safe(action.rendered_icon),
+ action.target or ""])
+ return qas
+
+ @classmethod
+ def get_quick_action_by_url(cls, url):
+ for action in cls.QUICK_ACTIONS:
+ if action.url == url:
+ return action
+
+ def regenerate_external_id(self):
+ if not hasattr(self, "external_id"):
+ return
+ self.skip_history_when_saving = True
+ self._no_move = True
+ if hasattr(self, "auto_external_id"):
+ self.external_id = None
+ self.save()
+
+ def get_extra_actions(self, request):
+ if not hasattr(self, 'SLUG'):
+ return []
+
+ actions = []
+ if request.user.is_superuser and hasattr(self, "auto_external_id"):
+ actions += [
+ (
+ reverse("regenerate-external-id") + "?{}={}".format(
+ self.SLUG, self.pk),
+ _("Regenerate ID"),
+ "fa fa-key",
+ _("regen."),
+ "",
+ True
+ )
+ ]
+
+ return actions