diff options
author | Étienne Loks <etienne.loks@iggdrasil.net> | 2020-10-07 19:09:30 +0200 |
---|---|---|
committer | Étienne Loks <etienne.loks@iggdrasil.net> | 2021-02-28 12:15:21 +0100 |
commit | 1e3da04336b9095e4497d098ea19c3178bc74cf6 (patch) | |
tree | 9cd21bf7e51d271b958a9a4b2b85367adbb97992 /ishtar_common/models_common.py | |
parent | cf6139abc5a64a2ff52eccdcf0424870cff6d57c (diff) | |
download | Ishtar-1e3da04336b9095e4497d098ea19c3178bc74cf6.tar.bz2 Ishtar-1e3da04336b9095e4497d098ea19c3178bc74cf6.zip |
Refactoring of models. Document container - declare only id
Diffstat (limited to 'ishtar_common/models_common.py')
-rw-r--r-- | ishtar_common/models_common.py | 2777 |
1 files changed, 2777 insertions, 0 deletions
diff --git a/ishtar_common/models_common.py b/ishtar_common/models_common.py new file mode 100644 index 000000000..b7685b8b5 --- /dev/null +++ b/ishtar_common/models_common.py @@ -0,0 +1,2777 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Generic models and tools for models +""" + +""" +from ishtar_common.models import GeneralType, get_external_id, \ + LightHistorizedItem, OwnPerms, Address, post_save_cache, \ + DashboardFormItem, document_attached_changed, SearchAltName, \ + DynamicRequest, GeoItem, QRCodeItem, SearchVectorConfig, DocumentItem, \ + QuickAction, MainItem, Merge + + +""" + +import copy +from collections import OrderedDict +import datetime +import json +import logging +import os +import pyqrcode +import shutil +import tempfile +import time + +from django import forms +from django.apps import apps +from django.conf import settings +from django.contrib.auth.models import User, Group +from django.contrib.contenttypes.models import ContentType +from django.contrib.gis.db import models +from django.contrib.postgres.fields import JSONField +from django.contrib.postgres.search import SearchVectorField, SearchVector +from django.contrib.sites.models import Site +from django.core.cache import cache +from django.core.exceptions import ObjectDoesNotExist +from django.core.files import File +from django.core.serializers import serialize +from django.core.urlresolvers import reverse, NoReverseMatch +from django.core.validators import validate_slug +from django.db import connection +from django.db.models import Q, Count +from django.db.models.signals import post_save, post_delete, m2m_changed +from django.template.defaultfilters import slugify +from django.utils.safestring import SafeText, mark_safe +from django.utils.translation import activate, deactivate +from ishtar_common.utils import ugettext_lazy as _, \ + pgettext_lazy, get_image_path +from simple_history.models import HistoricalRecords as BaseHistoricalRecords +from simple_history.signals import post_create_historical_record, \ + pre_create_historical_record +from unidecode import unidecode + +from ishtar_common.model_managers import TypeManager +from ishtar_common.model_merging import merge_model_objects +from ishtar_common.models_imports import Import +from ishtar_common.templatetags.link_to_window import simple_link_to_window +from ishtar_common.utils import get_cache, disable_for_loaddata, \ + get_all_field_names, merge_tsvectors, cached_label_changed, post_save_geo, \ + task, duplicate_item, get_external_id, get_current_profile + +""" +from ishtar_common.models import get_external_id, \ + LightHistorizedItem, OwnPerms, Address, post_save_cache, \ + DashboardFormItem, document_attached_changed, SearchAltName, \ + DynamicRequest, GeoItem, QRCodeItem, SearchVectorConfig, DocumentItem, \ + QuickAction, MainItem, Merge + + +""" + +logger = logging.getLogger(__name__) + + +class CachedGen(object): + @classmethod + def refresh_cache(cls): + raise NotImplementedError() + + @classmethod + def _add_cache_key_to_refresh(cls, keys): + cache_ckey, current_keys = get_cache(cls, ['_current_keys']) + if type(current_keys) != list: + current_keys = [] + if keys not in current_keys: + current_keys.append(keys) + cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT) + + +class Cached(CachedGen): + slug_field = 'txt_idx' + + @classmethod + def refresh_cache(cls): + cache_ckey, current_keys = get_cache(cls, ['_current_keys']) + if not current_keys: + return + for keys in current_keys: + if len(keys) == 2 and keys[0] == '__slug': + cls.get_cache(keys[1], force=True) + elif keys[0] == '__get_types': + default = None + empty_first = True + exclude = [] + if len(keys) >= 2: + default = keys.pop() + if len(keys) > 1: + empty_first = bool(keys.pop()) + exclude = keys[1:] + cls.get_types( + exclude=exclude, empty_first=empty_first, default=default, + force=True) + elif keys[0] == '__get_help': + cls.get_help(force=True) + + @classmethod + def _add_cache_key_to_refresh(cls, keys): + cache_ckey, current_keys = get_cache(cls, ['_current_keys']) + if type(current_keys) != list: + current_keys = [] + if keys not in current_keys: + current_keys.append(keys) + cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT) + + @classmethod + def get_cache(cls, slug, force=False): + cache_key, value = get_cache(cls, ['__slug', slug]) + if not force and value: + return value + try: + k = {cls.slug_field: slug} + obj = cls.objects.get(**k) + cache.set(cache_key, obj, settings.CACHE_TIMEOUT) + return obj + except cls.DoesNotExist: + cache.set(cache_key, None, settings.CACHE_TIMEOUT) + return None + + +@disable_for_loaddata +def post_save_cache(sender, **kwargs): + sender.refresh_cache() + + +class GeneralType(Cached, models.Model): + """ + Abstract class for "types" + """ + label = models.TextField(_("Label")) + txt_idx = models.TextField( + _("Textual ID"), validators=[validate_slug], + unique=True, + help_text=_( + "The slug is the standardized version of the name. It contains " + "only lowercase letters, numbers and hyphens. Each slug must " + "be unique.")) + comment = models.TextField(_("Comment"), blank=True, null=True) + available = models.BooleanField(_("Available"), default=True) + HELP_TEXT = "" + objects = TypeManager() + + class Meta: + abstract = True + + def __str__(self): + return self.label + + def natural_key(self): + return (self.txt_idx,) + + def history_compress(self): + return self.txt_idx + + @classmethod + def history_decompress(cls, value, create=False): + if not value: + return [] + res = [] + for txt_idx in value: + try: + res.append(cls.objects.get(txt_idx=txt_idx)) + except cls.DoesNotExist: + continue + return res + + @property + def explicit_label(self): + return "{} ({})".format(self.label, self._meta.verbose_name) + + @classmethod + def create_default_for_test(cls): + return [cls.objects.create(label='Test %d' % i) for i in range(5)] + + @property + def short_label(self): + return self.label + + @property + def name(self): + return self.label + + @classmethod + def get_or_create(cls, slug, label=''): + """ + Get or create a new item. + + :param slug: textual id + :param label: label for initialization if the item doesn't exist (not + mandatory) + + :return: instancied item of the base class + """ + + item = cls.get_cache(slug) + if item: + return item + item, created = cls.objects.get_or_create( + txt_idx=slug, defaults={'label': label}) + return item + + @classmethod + def get_or_create_pk(cls, slug): + """ + Get an id from a slug. Create the associated item if needed. + + :param slug: textual id + + :return: id of the item (string) + """ + return str(cls.get_or_create(slug).pk) + + @classmethod + def get_or_create_pks(cls, slugs): + """ + Get and merge a list of ids from a slug list. Create the associated + items if needed. + + :param slugs: textual ids + + :return: string with ids separated by "_" + """ + items = [] + for slug in slugs: + items.append(str(cls.get_or_create(slug).pk)) + return "_".join(items) + + @classmethod + def get_help(cls, dct=None, exclude=None, force=False, full_hierarchy=None): + if not dct: + dct = {} + if not exclude: + exclude = [] + keys = ['__get_help'] + keys += ["{}".format(ex) for ex in exclude] + keys += ['{}-{}'.format(str(k), dct[k]) for k in dct] + cache_key, value = get_cache(cls, keys) + if value and not force: + return mark_safe(value) + help_text = cls.HELP_TEXT + c_rank = -1 + help_items = "\n" + for item in cls.get_types(dct=dct, instances=True, exclude=exclude): + if hasattr(item, '__iter__'): + pk = item[0] + item = cls.objects.get(pk=pk) + item.rank = c_rank + 1 + if hasattr(item, 'parent'): + c_item = item + parents = [] + while c_item.parent: + parents.append(c_item.parent.label) + c_item = c_item.parent + parents.reverse() + parents.append(item.label) + item.label = " / ".join(parents) + if not item.comment: + continue + if c_rank > item.rank: + help_items += "</dl>\n" + elif c_rank < item.rank: + help_items += "<dl>\n" + c_rank = item.rank + help_items += "<dt>%s</dt><dd>%s</dd>" % ( + item.label, "<br/>".join(item.comment.split('\n'))) + c_rank += 1 + if c_rank: + help_items += c_rank * "</dl>" + if help_text or help_items != u'\n': + help_text = help_text + help_items + else: + help_text = "" + cache.set(cache_key, help_text, settings.CACHE_TIMEOUT) + return mark_safe(help_text) + + @classmethod + def _get_initial_types(cls, initial, type_pks, instance=False): + new_vals = [] + if not initial: + return [] + if type(initial) not in (list, tuple): + initial = [initial] + for value in initial: + try: + pk = int(value) + except (ValueError, TypeError): + continue + if pk in type_pks: + continue + try: + extra_type = cls.objects.get(pk=pk) + if instance: + new_vals.append(extra_type) + else: + new_vals.append((extra_type.pk, str(extra_type))) + except cls.DoesNotExist: + continue + return new_vals + + @classmethod + def get_types(cls, dct=None, instances=False, exclude=None, + empty_first=True, default=None, initial=None, force=False, + full_hierarchy=False): + if not dct: + dct = {} + if not exclude: + exclude = [] + types = [] + if not instances and empty_first and not default: + types = [('', '--')] + types += cls._pre_get_types(dct, instances, exclude, + default, force, + get_full_hierarchy=full_hierarchy) + if not initial: + return types + new_vals = cls._get_initial_types(initial, [idx for idx, lbl in types]) + types += new_vals + return types + + @classmethod + def _pre_get_types(cls, dct=None, instances=False, exclude=None, + default=None, force=False, get_full_hierarchy=False): + if not dct: + dct = {} + if not exclude: + exclude = [] + # cache + cache_key = None + if not instances: + keys = ['__get_types'] + keys += ["{}".format(ex) for ex in exclude] + \ + ["{}".format(default)] + keys += ['{}-{}'.format(str(k), dct[k]) for k in dct] + cache_key, value = get_cache(cls, keys) + if value and not force: + return value + base_dct = dct.copy() + if hasattr(cls, 'parent'): + if not cache_key: + return cls._get_parent_types( + base_dct, instances, exclude=exclude, + default=default, get_full_hierarchy=get_full_hierarchy) + vals = [v for v in cls._get_parent_types( + base_dct, instances, exclude=exclude, + default=default, get_full_hierarchy=get_full_hierarchy)] + cache.set(cache_key, vals, settings.CACHE_TIMEOUT) + return vals + + if not cache_key: + return cls._get_types(base_dct, instances, exclude=exclude, + default=default) + vals = [ + v for v in cls._get_types(base_dct, instances, exclude=exclude, + default=default) + ] + cache.set(cache_key, vals, settings.CACHE_TIMEOUT) + return vals + + @classmethod + def _get_types(cls, dct=None, instances=False, exclude=None, default=None): + if not dct: + dct = {} + if not exclude: + exclude = [] + dct['available'] = True + if default: + try: + default = cls.objects.get(txt_idx=default) + yield (default.pk, _(str(default))) + except cls.DoesNotExist: + pass + items = cls.objects.filter(**dct) + if default and default != "None": + if hasattr(default, 'txt_idx'): + exclude.append(default.txt_idx) + else: + exclude.append(default) + if exclude: + items = items.exclude(txt_idx__in=exclude) + for item in items.order_by(*cls._meta.ordering).all(): + if instances: + item.rank = 0 + yield item + else: + yield (item.pk, _(str(item)) if item and str(item) else '') + + @classmethod + def _get_childs_list(cls, dct=None, exclude=None, instances=False): + if not dct: + dct = {} + if not exclude: + exclude = [] + if 'parent' in dct: + dct.pop('parent') + childs = cls.objects.filter(**dct) + if exclude: + childs = childs.exclude(txt_idx__in=exclude) + if hasattr(cls, 'order'): + childs = childs.order_by('order') + res = {} + if instances: + for item in childs.all(): + parent_id = item.parent_id or 0 + if parent_id not in res: + res[parent_id] = [] + res[parent_id].append(item) + else: + for item in childs.values("id", "parent_id", "label").all(): + parent_id = item["parent_id"] or 0 + if item["id"] == item["parent_id"]: + parent_id = 0 + if parent_id not in res: + res[parent_id] = [] + res[parent_id].append((item["id"], item["label"])) + return res + + PREFIX = "│ " + PREFIX_EMPTY = " " + PREFIX_MEDIUM = "├ " + PREFIX_LAST = "└ " + PREFIX_CODES = ["\u2502", "\u251C", "\u2514"] + + @classmethod + def _get_childs(cls, item, child_list, prefix=0, instances=False, + is_last=False, last_of=None, get_full_hierarchy=False): + if not last_of: + last_of = [] + + prefix += 1 + current_child_lst = [] + if item in child_list: + current_child_lst = child_list[item] + + lst = [] + total = len(current_child_lst) + full_hierarchy_initial = get_full_hierarchy + for idx, child in enumerate(current_child_lst): + mylast_of = last_of[:] + p = '' + if instances: + child.rank = prefix + lst.append(child) + else: + if full_hierarchy_initial: + if isinstance(full_hierarchy_initial, str): + p = full_hierarchy_initial + " > " + else: + p = "" + else: + cprefix = prefix + while cprefix: + cprefix -= 1 + if not cprefix: + if (idx + 1) == total: + p += cls.PREFIX_LAST + else: + p += cls.PREFIX_MEDIUM + elif is_last: + if mylast_of: + clast = mylast_of.pop(0) + if clast: + p += cls.PREFIX_EMPTY + else: + p += cls.PREFIX + else: + p += cls.PREFIX_EMPTY + else: + p += cls.PREFIX + lst.append(( + child[0], SafeText(p + str(_(child[1]))) + )) + clast_of = last_of[:] + clast_of.append(idx + 1 == total) + if instances: + child_id = child.id + else: + child_id = child[0] + if get_full_hierarchy: + if p: + if not p.endswith(" > "): + p += " > " + get_full_hierarchy = p + child[1] + else: + get_full_hierarchy = child[1] + for sub_child in cls._get_childs( + child_id, child_list, prefix, instances, + is_last=((idx + 1) == total), last_of=clast_of, + get_full_hierarchy=get_full_hierarchy): + lst.append(sub_child) + return lst + + @classmethod + def _get_parent_types(cls, dct=None, instances=False, exclude=None, + default=None, get_full_hierarchy=False): + if not dct: + dct = {} + if not exclude: + exclude = [] + dct['available'] = True + child_list = cls._get_childs_list(dct, exclude, instances) + + if 0 in child_list: + for item in child_list[0]: + if instances: + item.rank = 0 + item_id = item.pk + yield item + else: + item_id = item[0] + yield item + if get_full_hierarchy: + get_full_hierarchy = item[1] + for child in cls._get_childs( + item_id, child_list, instances=instances, + get_full_hierarchy=get_full_hierarchy): + yield child + + def save(self, *args, **kwargs): + ItemKey = apps.get_model("ishtar_common", "ItemKey") + if not self.id and not self.label: + txt_idx = self.txt_idx + if isinstance(txt_idx, list): + txt_idx = txt_idx[0] + self.txt_idx = txt_idx + self.label = " ".join(" ".join(self.txt_idx.split('-')) + .split('_')).title() + if not self.txt_idx: + self.txt_idx = slugify(self.label)[:100] + + # clean old keys + if self.pk: + old = self.__class__.objects.get(pk=self.pk) + content_type = ContentType.objects.get_for_model(self.__class__) + if slugify(self.label) != slugify(old.label): + ItemKey.objects.filter( + object_id=self.pk, key=slugify(old.label), + content_type=content_type).delete() + if self.txt_idx != old.txt_idx: + ItemKey.objects.filter( + object_id=self.pk, key=old.txt_idx, + content_type=content_type).delete() + + obj = super(GeneralType, self).save(*args, **kwargs) + self.generate_key(force=True) + return obj + + def add_key(self, key, force=False, importer=None, group=None, + user=None): + ItemKey = apps.get_model("ishtar_common", "ItemKey") + content_type = ContentType.objects.get_for_model(self.__class__) + if not importer and not force and ItemKey.objects.filter( + key=key, content_type=content_type).count(): + return + filtr = {'key': key, 'content_type': content_type} + if group: + filtr['group'] = group + elif user: + filtr['user'] = user + else: + filtr['importer'] = importer + if force: + ItemKey.objects.filter(**filtr).exclude(object_id=self.pk).delete() + filtr['object_id'] = self.pk + ItemKey.objects.get_or_create(**filtr) + + def generate_key(self, force=False): + for key in (slugify(self.label), self.txt_idx): + self.add_key(key) + + def get_keys(self, importer): + ItemKey = apps.get_model("ishtar_common", "ItemKey") + keys = [self.txt_idx] + content_type = ContentType.objects.get_for_model(self.__class__) + base_q = Q(content_type=content_type, object_id=self.pk) + subquery = Q(importer__isnull=True, user__isnull=True, + group__isnull=True) + subquery |= Q(user__isnull=True, group__isnull=True, + importer=importer) + if importer.user: + subquery |= Q(user=importer.user, group__isnull=True, + importer=importer) + if importer.associated_group: + subquery |= Q(user__isnull=True, group=importer.associated_group, + importer=importer) + q = ItemKey.objects.filter(base_q & subquery) + for ik in q.exclude(key=self.txt_idx).all(): + keys.append(ik.key) + return keys + + @classmethod + def generate_keys(cls): + # content_type = ContentType.objects.get_for_model(cls) + for item in cls.objects.all(): + item.generate_key() + + +class HierarchicalType(GeneralType): + parent = models.ForeignKey('self', blank=True, null=True, + on_delete=models.SET_NULL, + verbose_name=_("Parent")) + + class Meta: + abstract = True + + def full_label(self): + lbls = [self.label] + item = self + while item.parent: + item = item.parent + lbls.append(item.label) + return " > ".join(reversed(lbls)) + + +class StatisticItem: + STATISTIC_MODALITIES = [] # example: "year", "operation_type__label" + STATISTIC_MODALITIES_OPTIONS = OrderedDict() # example: + # OrderedDict([('year', _("Year")), + # ("operation_type__label", _("Operation type"))]) + STATISTIC_SUM_VARIABLE = OrderedDict( + (("pk", (_("Number"), 1)),) + ) # example: "Price", "Volume" - the number is a multiplier + + +class TemplateItem: + @classmethod + def _label_templates_q(cls): + model_name = "{}.{}".format( + cls.__module__, cls.__name__) + q = Q(associated_model__klass=model_name, + for_labels=True, available=True) + alt_model_name = model_name.replace( + "models_finds", "models").replace( + "models_treatments", "models") + if alt_model_name != model_name: + q |= Q(associated_model__klass=model_name, + for_labels=True, available=True) + DocumentTemplate = apps.get_model("ishtar_common", "DocumentTemplate") + return DocumentTemplate.objects.filter(q) + + @classmethod + def has_label_templates(cls): + return cls._label_templates_q().count() + + @classmethod + def label_templates(cls): + return cls._label_templates_q() + + def get_extra_templates(self, request): + cls = self.__class__ + templates = [] + name = str(cls.__name__) + module = str(cls.__module__) + if "archaeological_finds" in module: + if "models_finds" in name or "models_treatments" in name: + names = [ + name, + name.replace("models_finds", "models" + ).replace("models_treatments", "models") + ] + else: + names = [name, name.replace("models", "models_finds"), + name.replace("models", "models_treatments")] + else: + names = [name] + model_names = [ + "{}.{}".format(module, name) for name in names + ] + DocumentTemplate = apps.get_model("ishtar_common", "DocumentTemplate") + q = DocumentTemplate.objects.filter( + associated_model__klass__in=model_names, + for_labels=False, available=True) + for template in q.all(): + urlname = "generate-document" + templates.append( + (template.name, reverse( + urlname, args=[template.slug, self.pk])) + ) + return templates + + +class FullSearch(models.Model): + search_vector = SearchVectorField(_("Search vector"), blank=True, null=True, + help_text=_("Auto filled at save")) + + EXTRA_REQUEST_KEYS = {} + DYNAMIC_REQUESTS = {} + ALT_NAMES = {} + + BASE_SEARCH_VECTORS = [] + PROPERTY_SEARCH_VECTORS = [] + INT_SEARCH_VECTORS = [] + M2M_SEARCH_VECTORS = [] + PARENT_SEARCH_VECTORS = [] + # prevent circular dependency + PARENT_ONLY_SEARCH_VECTORS = [] + + class Meta: + abstract = True + + @classmethod + def general_types(cls): + for k in get_all_field_names(cls): + field = cls._meta.get_field(k) + if not hasattr(field, 'rel') or not field.rel: + continue + rel_model = field.rel.to + if issubclass(rel_model, (GeneralType, HierarchicalType)): + yield k + + @classmethod + def get_alt_names(cls): + alt_names = cls.ALT_NAMES.copy() + for dr_k in cls.DYNAMIC_REQUESTS: + alt_names.update(cls.DYNAMIC_REQUESTS[dr_k].get_alt_names()) + return alt_names + + @classmethod + def get_query_parameters(cls): + query_parameters = {} + for v in cls.get_alt_names().values(): + for language_code, language_lbl in settings.LANGUAGES: + activate(language_code) + query_parameters[str(v.search_key)] = v + deactivate() + return query_parameters + + def _update_search_field(self, search_vector_conf, search_vectors, data): + for value in search_vector_conf.format(data): + with connection.cursor() as cursor: + cursor.execute("SELECT to_tsvector(%s, %s)", [ + search_vector_conf.language, value]) + row = cursor.fetchone() + search_vectors.append(row[0]) + + def _update_search_number_field(self, search_vectors, val): + search_vectors.append("'{}':1".format(val)) + + def update_search_vector(self, save=True, exclude_parent=False): + """ + Update the search vector + :param save: True if you want to save the object immediately + :return: True if modified + """ + if not hasattr(self, 'search_vector'): + return + if not self.pk: + # logger.warning("Cannot update search vector before save or " + # "after deletion.") + return + if not self.BASE_SEARCH_VECTORS and not self.M2M_SEARCH_VECTORS \ + and not self.INT_SEARCH_VECTORS \ + and not self.PROPERTY_SEARCH_VECTORS \ + and not self.PARENT_SEARCH_VECTORS: + logger.warning("No search_vectors defined for {}".format( + self.__class__)) + return + if getattr(self, '_search_updated', None): + return + JsonDataField = apps.get_model("ishtar_common", "JsonDataField") + self._search_updated = True + + old_search = "" + if self.search_vector: + old_search = self.search_vector[:] + search_vectors = [] + base_q = self.__class__.objects.filter(pk=self.pk) + + # many to many have to be queried one by one otherwise only one is fetch + for m2m_search_vector in self.M2M_SEARCH_VECTORS: + key = m2m_search_vector.key.split('__')[0] + rel_key = getattr(self, key) + for item in rel_key.values('pk').all(): + query_dct = {key + "__pk": item['pk']} + q = copy.copy(base_q).filter(**query_dct) + q = q.annotate( + search=SearchVector( + m2m_search_vector.key, + config=m2m_search_vector.language) + ).values('search') + search_vectors.append(q.all()[0]['search']) + + # int/float are not well managed by the SearchVector + for int_search_vector in self.INT_SEARCH_VECTORS: + q = base_q.values(int_search_vector.key) + for val in int_search_vector.format( + q.all()[0][int_search_vector.key]): + self._update_search_number_field(search_vectors, val) + + if not exclude_parent: + # copy parent vector fields + for PARENT_SEARCH_VECTOR in self.PARENT_SEARCH_VECTORS: + parent = getattr(self, PARENT_SEARCH_VECTOR) + if hasattr(parent, 'all'): # m2m + for p in parent.all(): + search_vectors.append(p.search_vector) + elif parent: + search_vectors.append(parent.search_vector) + + for PARENT_ONLY_SEARCH_VECTOR in self.PARENT_ONLY_SEARCH_VECTORS: + parent = getattr(self, PARENT_ONLY_SEARCH_VECTOR) + if hasattr(parent, 'all'): # m2m + for p in parent.all(): + search_vectors.append( + p.update_search_vector(save=False, exclude_parent=True) + ) + elif parent: + search_vectors.append( + parent.update_search_vector(save=False, exclude_parent=True) + ) + + if self.BASE_SEARCH_VECTORS: + # query "simple" fields + q = base_q.values(*[sv.key for sv in self.BASE_SEARCH_VECTORS]) + res = q.all()[0] + for base_search_vector in self.BASE_SEARCH_VECTORS: + data = res[base_search_vector.key] + data = unidecode(str(data)) + self._update_search_field(base_search_vector, + search_vectors, data) + + if self.PROPERTY_SEARCH_VECTORS: + for property_search_vector in self.PROPERTY_SEARCH_VECTORS: + data = getattr(self, property_search_vector.key) + if callable(data): + data = data() + if not data: + continue + data = str(data) + self._update_search_field(property_search_vector, + search_vectors, data) + + if hasattr(self, 'data') and self.data: + content_type = ContentType.objects.get_for_model(self) + for json_field in JsonDataField.objects.filter( + content_type=content_type, + search_index=True).all(): + data = copy.deepcopy(self.data) + no_data = False + for key in json_field.key.split('__'): + if key not in data: + no_data = True + break + data = data[key] + if no_data or not data: + continue + + if json_field.value_type == 'B': + if data is True: + data = json_field.name + else: + continue + elif json_field.value_type in ('I', 'F'): + self._update_search_number_field(search_vectors, data) + continue + elif json_field.value_type == 'D': + # only index year + self._update_search_number_field(search_vectors, data.year) + continue + for lang in ("simple", settings.ISHTAR_SEARCH_LANGUAGE): + with connection.cursor() as cursor: + cursor.execute("SELECT to_tsvector(%s, %s)", + [lang, data]) + row = cursor.fetchone() + search_vectors.append(row[0]) + new_search_vector = merge_tsvectors(search_vectors) + changed = old_search != new_search_vector + self.search_vector = new_search_vector + if save and changed: + self.__class__.objects.filter(pk=self.pk).update( + search_vector=new_search_vector) + elif not save: + return new_search_vector + return changed + + +class Imported(models.Model): + imports = models.ManyToManyField( + Import, blank=True, + related_name="imported_%(app_label)s_%(class)s") + + class Meta: + abstract = True + + +class JsonData(models.Model, CachedGen): + data = JSONField(default={}, blank=True) + + class Meta: + abstract = True + + def pre_save(self): + if not self.data: + self.data = {} + + @property + def json_sections(self): + sections = [] + try: + content_type = ContentType.objects.get_for_model(self) + except ContentType.DoesNotExists: + return sections + JsonDataField = apps.get_model("ishtar_common", "JsonDataField") + fields = list(JsonDataField.objects.filter( + content_type=content_type, display=True, section__isnull=True + ).all()) # no section fields + + fields += list(JsonDataField.objects.filter( + content_type=content_type, display=True, section__isnull=False + ).order_by('section__order', 'order').all()) + + for field in fields: + value = None + data = self.data.copy() + for key in field.key.split('__'): + if key in data: + value = copy.copy(data[key]) + data = data[key] + else: + value = None + break + if value is None: + continue + if type(value) in (list, tuple): + value = " ; ".join([str(v) for v in value]) + section_name = field.section.name if field.section else None + if not sections or section_name != sections[-1][0]: + # if section name is identical it is the same + sections.append((section_name, [])) + sections[-1][1].append((field.name, value)) + return sections + + @classmethod + def refresh_cache(cls): + __, refreshed = get_cache(cls, ['cache_refreshed']) + if refreshed and time.time() - refreshed < 1: + return + cache_ckey, current_keys = get_cache(cls, ['_current_keys']) + if not current_keys: + return + for keys in current_keys: + if keys[0] == '__get_dynamic_choices': + cls._get_dynamic_choices(keys[1], force=True) + + @classmethod + def _get_dynamic_choices(cls, key, force=False): + """ + Get choice from existing values + :param key: data key + :param force: if set to True do not use cache + :return: tuple of choices (id, value) + """ + cache_key, value = get_cache(cls, ['__get_dynamic_choices', key]) + if not force and value: + return value + choices = set() + splitted_key = key[len('data__'):].split('__') + q = cls.objects.filter( + data__has_key=key[len('data__'):]).values_list('data', flat=True) + for value in q.all(): + for k in splitted_key: + value = value[k] + choices.add(value) + choices = [('', '')] + [(v, v) for v in sorted(list(choices))] + cache.set(cache_key, choices, settings.CACHE_SMALLTIMEOUT) + return choices + + +class FixAssociated: + ASSOCIATED = {} + + def fix_associated(self): + for key in self.ASSOCIATED: + item = getattr(self, key) + if not item: + continue + dct = self.ASSOCIATED[key] + for dct_key in dct: + subkey, ctype = dct_key + expected_values = dct[dct_key] + if not isinstance(expected_values, (list, tuple)): + expected_values = [expected_values] + if hasattr(ctype, "txt_idx"): + try: + expected_values = [ctype.objects.get(txt_idx=v) + for v in expected_values] + except ctype.DoesNotExist: + # type not yet initialized + return + current_vals = getattr(item, subkey) + is_many = False + if hasattr(current_vals, "all"): + is_many = True + current_vals = current_vals.all() + else: + current_vals = [current_vals] + is_ok = False + for current_val in current_vals: + if current_val in expected_values: + is_ok = True + break + if is_ok: + continue + # the first value is used + new_value = expected_values[0] + if is_many: + getattr(item, subkey).add(new_value) + else: + setattr(item, subkey, new_value) + + +class CascasdeUpdate: + DOWN_MODEL_UPDATE = [] + + def cascade_update(self): + for down_model in self.DOWN_MODEL_UPDATE: + if not settings.USE_BACKGROUND_TASK: + rel = getattr(self, down_model) + if hasattr(rel.model, "need_update"): + rel.update(need_update=True) + continue + for item in getattr(self, down_model).all(): + cached_label_changed(item.__class__, instance=item) + if hasattr(item, "point_2d"): + post_save_geo(item.__class__, instance=item) + + +class SearchAltName(object): + def __init__(self, search_key, search_query, extra_query=None, + distinct_query=False): + self.search_key = search_key + self.search_query = search_query + self.extra_query = extra_query or {} + self.distinct_query = distinct_query + + +class HistoryError(Exception): + def __init__(self, value): + self.value = value + + def __str__(self): + return repr(self.value) + + +class HistoricalRecords(BaseHistoricalRecords): + def _save_historic(self, manager, instance, history_date, history_type, + history_user, history_change_reason, using, attrs): + history_instance = manager.model( + history_date=history_date, + history_type=history_type, + history_user=history_user, + history_change_reason=history_change_reason, + **attrs + ) + + pre_create_historical_record.send( + sender=manager.model, + instance=instance, + history_date=history_date, + history_user=history_user, + history_change_reason=history_change_reason, + history_instance=history_instance, + using=using, + ) + + history_instance.save(using=using) + + post_create_historical_record.send( + sender=manager.model, + instance=instance, + history_instance=history_instance, + history_date=history_date, + history_user=history_user, + history_change_reason=history_change_reason, + using=using, + ) + + def create_historical_record(self, instance, history_type, using=None): + try: + history_modifier = getattr(instance, 'history_modifier', None) + assert history_modifier + except (User.DoesNotExist, AssertionError): + # on batch removing of users, user could have disappeared + return + history_date = getattr(instance, "_history_date", + datetime.datetime.now()) + history_change_reason = getattr(instance, "changeReason", None) + force = getattr(instance, "_force_history", False) + manager = getattr(instance, self.manager_name) + attrs = {} + for field in instance._meta.fields: + attrs[field.attname] = getattr(instance, field.attname) + q_history = instance.history \ + .filter(history_modifier_id=history_modifier.pk) \ + .order_by('-history_date', '-history_id') + # instance.skip_history_when_saving = True + if not q_history.count(): + if force: + delattr(instance, '_force_history') + self._save_historic( + manager, instance, history_date, history_type, history_modifier, + history_change_reason, using, attrs) + return + old_instance = q_history.all()[0] + # multiple saving by the same user in a very short time are generaly + # caused by post_save signals it is not relevant to keep them + min_history_date = datetime.datetime.now() \ + - datetime.timedelta(seconds=5) + q = q_history.filter(history_date__isnull=False, + history_date__gt=min_history_date) \ + .order_by('-history_date', '-history_id') + if not force and q.count(): + return + + if force: + delattr(instance, '_force_history') + + # record a new version only if data have been changed + for field in instance._meta.fields: + if getattr(old_instance, field.attname) != attrs[field.attname]: + self._save_historic(manager, instance, history_date, + history_type, history_modifier, + history_change_reason, using, attrs) + return + + +class BaseHistorizedItem(StatisticItem, TemplateItem, FullSearch, Imported, + JsonData, FixAssociated, CascasdeUpdate): + """ + Historized item with external ID management. + All historized items are searchable and have a data json field. + Historized items can be "locked" for edition. + """ + IS_BASKET = False + SHOW_URL = None + EXTERNAL_ID_KEY = '' + EXTERNAL_ID_DEPENDENCIES = [] + HISTORICAL_M2M = [] + + history_modifier = models.ForeignKey( + User, related_name='+', on_delete=models.SET_NULL, + verbose_name=_("Last editor"), blank=True, null=True) + history_creator = models.ForeignKey( + User, related_name='+', on_delete=models.SET_NULL, + verbose_name=_("Creator"), blank=True, null=True) + last_modified = models.DateTimeField(auto_now=True) + history_m2m = JSONField(default={}, blank=True) + need_update = models.BooleanField( + verbose_name=_("Need update"), default=False) + locked = models.BooleanField( + verbose_name=_("Item locked for edition"), default=False) + lock_user = models.ForeignKey( + User, related_name='+', on_delete=models.SET_NULL, + verbose_name=_("Locked by"), blank=True, null=True) + + ALT_NAMES = { + 'history_creator': SearchAltName( + pgettext_lazy("key for text search", u"created-by"), + 'history_creator__ishtaruser__person__cached_label__iexact' + ), + 'history_modifier': SearchAltName( + pgettext_lazy("key for text search", u"modified-by"), + 'history_modifier__ishtaruser__person__cached_label__iexact' + ), + 'modified_before': SearchAltName( + pgettext_lazy("key for text search", "modified-before"), + 'last_modified__lte' + ), + 'modified_after': SearchAltName( + pgettext_lazy("key for text search", "modified-after"), + 'last_modified__gte' + ), + } + + class Meta: + abstract = True + + @classmethod + def get_verbose_name(cls): + return cls._meta.verbose_name + + def is_locked(self, user=None): + if not user: + return self.locked + return self.locked and (not self.lock_user or self.lock_user != user) + + def merge(self, item, keep_old=False): + merge_model_objects(self, item, keep_old=keep_old) + + def public_representation(self): + return {} + + def duplicate(self, user=None, data=None): + return duplicate_item(self, user, data) + + def update_external_id(self, save=False): + if not self.EXTERNAL_ID_KEY or ( + self.external_id and + not getattr(self, 'auto_external_id', False)): + return + external_id = get_external_id(self.EXTERNAL_ID_KEY, self) + if external_id == self.external_id: + return + self.auto_external_id = True + self.external_id = external_id + self._cached_label_checked = False + if save: + self.skip_history_when_saving = True + self.save() + return external_id + + def get_last_history_date(self): + q = self.history.values("history_date").order_by('-history_date') + if not q.count(): + return + return q.all()[0]['history_date'] + + def get_previous(self, step=None, date=None, strict=False): + """ + Get a "step" previous state of the item + """ + assert step or date + historized = self.history.all() + item = None + if step: + if len(historized) <= step: + # silently return the last step if too far in the history + item = historized[len(historized) - 1] + else: + item = historized[step] + else: + for step, item in enumerate(historized): + if item.history_date == date: + break + # ended with no match + if item.history_date != date: + return + item._step = step + if len(historized) != (step + 1): + item._previous = historized[step + 1].history_date + else: + item._previous = None + if step > 0: + item._next = historized[step - 1].history_date + else: + item._next = None + item.history_date = historized[step].history_date + model = self.__class__ + for k in get_all_field_names(model): + field = model._meta.get_field(k) + if hasattr(field, 'rel') and field.rel: + if not hasattr(item, k + '_id'): + setattr(item, k, getattr(self, k)) + continue + val = getattr(item, k + '_id') + if not val: + setattr(item, k, None) + continue + try: + val = field.rel.to.objects.get(pk=val) + setattr(item, k, val) + except ObjectDoesNotExist: + if strict: + raise HistoryError("The class %s has no pk %d" % ( + str(field.rel.to), val)) + setattr(item, k, None) + item.pk = self.pk + return item + + @property + def last_edition_date(self): + try: + return self.history.order_by('-history_date').all()[0].history_date + except (AttributeError, IndexError): + return + + @property + def history_creation_date(self): + try: + return self.history.order_by('history_date').all()[0].history_date + except (AttributeError, IndexError): + return + + def rollback(self, date): + """ + Rollback to a previous state + """ + to_del, new_item = [], None + for item in self.history.all(): + if item.history_date == date: + new_item = item + break + to_del.append(item) + if not new_item: + raise HistoryError("The date to rollback to doesn't exist.") + try: + field_keys = [f.name for f in self._meta.fields] + for k in field_keys: + if k != 'id' and hasattr(self, k): + if not hasattr(new_item, k): + k = k + "_id" + setattr(self, k, getattr(new_item, k)) + + try: + self.history_modifier = User.objects.get( + pk=new_item.history_modifier_id) + except User.ObjectDoesNotExist: + pass + self.save() + saved_m2m = new_item.history_m2m.copy() + for hist_key in self.HISTORICAL_M2M: + # after each association m2m is rewrite - force the original + # to be reset + new_item.history_m2m = saved_m2m + values = new_item.m2m_listing(hist_key, create=True) or [] + hist_field = getattr(self, hist_key) + hist_field.clear() + for val in values: + hist_field.add(val) + # force label regeneration + self._cached_label_checked = False + self.save() + except ObjectDoesNotExist: + raise HistoryError("The rollback has failed.") + # clean the obsolete history + for historized_item in to_del: + historized_item.delete() + + def m2m_listing(self, key): + return getattr(self, key).all() + + def values(self): + values = {} + for f in self._meta.fields: + k = f.name + if k != 'id': + values[k] = getattr(self, k) + return values + + def get_absolute_url(self): + try: + return reverse('display-item', args=[self.SLUG, self.pk]) + except NoReverseMatch: + return + + def get_show_url(self): + show_url = self.SHOW_URL + if not show_url: + show_url = 'show-' + self.__class__.__name__.lower() + try: + return reverse(show_url, args=[self.pk, '']) + except NoReverseMatch: + return + + @property + def associated_filename(self): + if [True for attr in ('get_town_label', 'get_department', 'reference', + 'short_class_name') if not hasattr(self, attr)]: + return '' + items = [slugify(self.get_department()), + slugify(self.get_town_label()).upper(), + slugify(self.short_class_name), + slugify(self.reference), + slugify(self.name or '').replace('-', '_').capitalize()] + last_edition_date = self.last_edition_date + if last_edition_date: + items.append(last_edition_date.strftime('%Y%m%d')) + else: + items.append('00000000') + return "-".join([str(item) for item in items]) + + def save(self, *args, **kwargs): + created = not self.pk + if not getattr(self, 'skip_history_when_saving', False): + assert hasattr(self, 'history_modifier') + if created: + self.history_creator = self.history_modifier + # external ID can have related item not available before save + external_id_updated = kwargs.pop('external_id_updated') \ + if 'external_id_updated' in kwargs else False + if not created and not external_id_updated: + self.update_external_id() + super(BaseHistorizedItem, self).save(*args, **kwargs) + if created and self.update_external_id(): + # force resave for external ID creation + self.skip_history_when_saving = True + self._updated_id = True + return self.save(external_id_updated=True) + for dep in self.EXTERNAL_ID_DEPENDENCIES: + for obj in getattr(self, dep).all(): + obj.update_external_id(save=True) + self.fix_associated() + return True + + +class LightHistorizedItem(BaseHistorizedItem): + history_date = models.DateTimeField(default=datetime.datetime.now) + + class Meta: + abstract = True + + def save(self, *args, **kwargs): + super(LightHistorizedItem, self).save(*args, **kwargs) + return self + + +class OwnPerms(object): + """ + Manage special permissions for object's owner + """ + + @classmethod + def get_query_owns(cls, ishtaruser): + """ + Query object to get own items + """ + return None # implement for each object + + def can_view(self, request): + if hasattr(self, "LONG_SLUG"): + perm = "view_" + self.LONG_SLUG + else: + perm = "view_" + self.SLUG + return self.can_do(request, perm) + + def can_do(self, request, action_name): + """ + Check permission availability for the current object. + :param request: request object + :param action_name: action name eg: "change_find" - "own" variation is + checked + :return: boolean + """ + if not getattr(request.user, 'ishtaruser', None): + return False + splited = action_name.split('_') + action_own_name = splited[0] + '_own_' + '_'.join(splited[1:]) + user = request.user + if action_own_name == "view_own_findbasket": + action_own_name = "view_own_find" + return user.ishtaruser.has_right(action_name, request.session) or \ + (user.ishtaruser.has_right(action_own_name, request.session) + and self.is_own(user.ishtaruser)) + + def is_own(self, user, alt_query_own=None): + """ + Check if the current object is owned by the user + """ + IshtarUser = apps.get_model("ishtar_common", "IshtarUser") + if isinstance(user, IshtarUser): + ishtaruser = user + elif hasattr(user, 'ishtaruser'): + ishtaruser = user.ishtaruser + else: + return False + if not alt_query_own: + query = self.get_query_owns(ishtaruser) + else: + query = getattr(self, alt_query_own)(ishtaruser) + if not query: + return False + query &= Q(pk=self.pk) + return self.__class__.objects.filter(query).count() + + @classmethod + def has_item_of(cls, user): + """ + Check if the user own some items + """ + IshtarUser = apps.get_model("ishtar_common", "IshtarUser") + if isinstance(user, IshtarUser): + ishtaruser = user + elif hasattr(user, 'ishtaruser'): + ishtaruser = user.ishtaruser + else: + return False + query = cls.get_query_owns(ishtaruser) + if not query: + return False + return cls.objects.filter(query).count() + + @classmethod + def _return_get_owns(cls, owns, values, get_short_menu_class, + label_key='cached_label'): + if not owns: + return [] + sorted_values = [] + if hasattr(cls, 'BASKET_MODEL'): + owns_len = len(owns) + for idx, item in enumerate(reversed(owns)): + if get_short_menu_class: + item = item[0] + if type(item) == cls.BASKET_MODEL: + basket = owns.pop(owns_len - idx - 1) + sorted_values.append(basket) + sorted_values = list(reversed(sorted_values)) + if not values: + if not get_short_menu_class: + return sorted_values + list( + sorted(owns, key=lambda x: getattr(x, label_key) or "")) + return sorted_values + list( + sorted(owns, key=lambda x: getattr(x[0], label_key) or "")) + if not get_short_menu_class: + return sorted_values + list( + sorted(owns, key=lambda x: x[label_key] or "")) + return sorted_values + list( + sorted(owns, key=lambda x: x[0][label_key] or "")) + + @classmethod + def get_owns(cls, user, replace_query=None, limit=None, values=None, + get_short_menu_class=False, menu_filtr=None): + """ + Get Own items + """ + if not replace_query: + replace_query = {} + if hasattr(user, 'is_authenticated') and not user.is_authenticated(): + returned = cls.objects.filter(pk__isnull=True) + if values: + returned = [] + return returned + IshtarUser = apps.get_model("ishtar_common", "IshtarUser") + if isinstance(user, User): + try: + ishtaruser = IshtarUser.objects.get(user_ptr=user) + except IshtarUser.DoesNotExist: + returned = cls.objects.filter(pk__isnull=True) + if values: + returned = [] + return returned + elif isinstance(user, IshtarUser): + ishtaruser = user + else: + if values: + return [] + return cls.objects.filter(pk__isnull=True) + items = [] + if hasattr(cls, 'BASKET_MODEL'): + items = list(cls.BASKET_MODEL.objects.filter(user=ishtaruser).all()) + query = cls.get_query_owns(ishtaruser) + if not query and not replace_query: + returned = cls.objects.filter(pk__isnull=True) + if values: + returned = [] + return returned + if query: + q = cls.objects.filter(query) + else: # replace_query + q = cls.objects.filter(replace_query) + if values: + q = q.values(*values) + if limit: + items += list(q.order_by('-pk')[:limit]) + else: + items += list(q.order_by(*cls._meta.ordering).all()) + if get_short_menu_class: + if values: + if 'id' not in values: + raise NotImplementedError( + "Call of get_owns with get_short_menu_class option and" + " no 'id' in values is not implemented") + my_items = [] + for i in items: + if hasattr(cls, 'BASKET_MODEL') and \ + type(i) == cls.BASKET_MODEL: + dct = dict([(k, getattr(i, k)) for k in values]) + my_items.append( + (dct, cls.BASKET_MODEL.get_short_menu_class(i.pk))) + else: + my_items.append((i, cls.get_short_menu_class(i['id']))) + items = my_items + else: + items = [(i, cls.get_short_menu_class(i.pk)) for i in items] + return items + + @classmethod + def _get_query_owns_dicts(cls, ishtaruser): + """ + List of query own dict to construct the query. + Each dict are join with an AND operator, each dict key, values are + joined with OR operator + """ + return [] + + @classmethod + def _construct_query_own(cls, prefix, dct_list): + q = None + for subquery_dict in dct_list: + subquery = None + for k in subquery_dict: + subsubquery = Q(**{prefix + k: subquery_dict[k]}) + if subquery: + subquery |= subsubquery + else: + subquery = subsubquery + if not subquery: + continue + if q: + q &= subquery + else: + q = subquery + return q + + +class NumberManager(models.Manager): + def get_by_natural_key(self, number): + return self.get(number=number) + + +class State(models.Model): + label = models.CharField(_("Label"), max_length=30) + number = models.CharField(_("Number"), unique=True, max_length=3) + objects = NumberManager() + + class Meta: + verbose_name = _("State") + ordering = ['number'] + + def __str__(self): + return self.label + + def natural_key(self): + return (self.number,) + + +class Department(models.Model): + label = models.CharField(_("Label"), max_length=30) + number = models.CharField(_("Number"), unique=True, max_length=3) + state = models.ForeignKey( + 'State', verbose_name=_("State"), blank=True, null=True, + on_delete=models.SET_NULL, + ) + objects = NumberManager() + + class Meta: + verbose_name = _("Department") + verbose_name_plural = _("Departments") + ordering = ['number'] + + def __str__(self): + return self.label + + def natural_key(self): + return (self.number,) + + def history_compress(self): + return self.number + + @classmethod + def history_decompress(cls, full_value, create=False): + if not full_value: + return [] + res = [] + for value in full_value: + try: + res.append(cls.objects.get(number=value)) + except cls.DoesNotExist: + continue + return res + + +class Arrondissement(models.Model): + name = models.CharField("Nom", max_length=30) + department = models.ForeignKey(Department, verbose_name="Département") + + def __str__(self): + return settings.JOINT.join((self.name, str(self.department))) + + +class Canton(models.Model): + name = models.CharField("Nom", max_length=30) + arrondissement = models.ForeignKey(Arrondissement, + verbose_name="Arrondissement") + + def __str__(self): + return settings.JOINT.join( + (self.name, str(self.arrondissement))) + + +class TownManager(models.GeoManager): + def get_by_natural_key(self, numero_insee, year): + return self.get(numero_insee=numero_insee, year=year) + + +class Town(Imported, models.Model): + name = models.CharField(_("Name"), max_length=100) + surface = models.IntegerField(_("Surface (m2)"), blank=True, null=True) + center = models.PointField(_("Localisation"), srid=settings.SRID, + blank=True, null=True) + limit = models.MultiPolygonField(_("Limit"), blank=True, null=True) + numero_insee = models.CharField("Code commune (numéro INSEE)", + max_length=120) + departement = models.ForeignKey( + Department, verbose_name=_("Department"), + on_delete=models.SET_NULL, null=True, blank=True) + year = models.IntegerField( + _("Year of creation"), null=True, blank=True, + help_text=_("Filling this field is relevant to distinguish old towns " + "from new towns.")) + children = models.ManyToManyField( + 'Town', verbose_name=_("Town children"), blank=True, + related_name='parents') + cached_label = models.CharField(_("Cached name"), max_length=500, + null=True, blank=True, db_index=True) + objects = TownManager() + + class Meta: + verbose_name = _("Town") + verbose_name_plural = _("Towns") + if settings.COUNTRY == 'fr': + ordering = ['numero_insee'] + unique_together = (('numero_insee', 'year'),) + + def natural_key(self): + return (self.numero_insee, self.year) + + def history_compress(self): + values = {'numero_insee': self.numero_insee, + 'year': self.year or ""} + return values + + def get_values(self, prefix='', no_values=False, no_base_finds=True): + return { + prefix or "label": str(self), + prefix + "name": self.name, + prefix + "numero_insee": self.numero_insee + } + + @classmethod + def history_decompress(cls, full_value, create=False): + if not full_value: + return [] + res = [] + for value in full_value: + try: + res.append( + cls.objects.get(numero_insee=value['numero_insee'], + year=value['year'] or None)) + except cls.DoesNotExist: + continue + return res + + def __str__(self): + return self.cached_label or "" + + @property + def label_with_areas(self): + label = [self.name] + if self.numero_insee: + label.append("({})".format(self.numero_insee)) + for area in self.areas.all(): + label.append(" - ") + label.append(area.full_label) + return " ".join(label) + + def generate_geo(self, force=False): + force = self.generate_limit(force=force) + self.generate_center(force=force) + self.generate_area(force=force) + + def generate_limit(self, force=False): + if not force and self.limit: + return + parents = None + if not self.parents.count(): + return + for parent in self.parents.all(): + if not parent.limit: + return + if not parents: + parents = parent.limit + else: + parents = parents.union(parent.limit) + # if union is a simple polygon make it a multi + if 'MULTI' not in parents.wkt: + parents = parents.wkt.replace('POLYGON', 'MULTIPOLYGON(') + ")" + if not parents: + return + self.limit = parents + self.save() + return True + + def generate_center(self, force=False): + if not force and (self.center or not self.limit): + return + self.center = self.limit.centroid + if not self.center: + return False + self.save() + return True + + def generate_area(self, force=False): + if not force and (self.surface or not self.limit): + return + surface = self.limit.transform(settings.SURFACE_SRID, + clone=True).area + if surface > 214748364 or not surface: + return False + self.surface = surface + self.save() + return True + + def update_town_code(self): + if not self.numero_insee or not self.children.count() or not self.year: + return + old_num = self.numero_insee[:] + numero = old_num.split('-')[0] + self.numero_insee = "{}-{}".format(numero, self.year) + if self.numero_insee != old_num: + return True + + def _generate_cached_label(self): + cached_label = self.name + if settings.COUNTRY == "fr" and self.numero_insee: + dpt_len = 2 + if self.numero_insee.startswith('97') or \ + self.numero_insee.startswith('98') or \ + self.numero_insee[0] not in ('0', '1', '2', '3', '4', '5', + '6', '7', '8', '9'): + dpt_len = 3 + cached_label = "%s - %s" % (self.name, self.numero_insee[:dpt_len]) + if self.year and self.children.count(): + cached_label += " ({})".format(self.year) + return cached_label + + +def post_save_town(sender, **kwargs): + cached_label_changed(sender, **kwargs) + town = kwargs['instance'] + town.generate_geo() + if town.update_town_code(): + town.save() + + +post_save.connect(post_save_town, sender=Town) + + +def town_child_changed(sender, **kwargs): + town = kwargs['instance'] + if town.update_town_code(): + town.save() + + +m2m_changed.connect(town_child_changed, sender=Town.children.through) + + +class Address(BaseHistorizedItem): + FIELDS = ( + "address", "address_complement", "postal_code", "town", + "precise_town", "country", + "alt_address", "alt_address_complement", "alt_postal_code", "alt_town", + "alt_country", + "phone", "phone_desc", "phone2", "phone_desc2", "phone3", "phone_desc3", + "raw_phone", "mobile_phone", "email", "alt_address_is_prefered" + ) + address = models.TextField(_("Address"), null=True, blank=True) + address_complement = models.TextField(_("Address complement"), null=True, + blank=True) + postal_code = models.CharField(_("Postal code"), max_length=10, null=True, + blank=True) + town = models.CharField(_("Town (freeform)"), max_length=150, null=True, + blank=True) + precise_town = models.ForeignKey( + Town, verbose_name=_("Town (precise)"), null=True, + blank=True) + country = models.CharField(_("Country"), max_length=30, null=True, + blank=True) + alt_address = models.TextField(_("Other address: address"), null=True, + blank=True) + alt_address_complement = models.TextField( + _("Other address: address complement"), null=True, blank=True) + alt_postal_code = models.CharField(_("Other address: postal code"), + max_length=10, null=True, blank=True) + alt_town = models.CharField(_("Other address: town"), max_length=70, + null=True, blank=True) + alt_country = models.CharField(_("Other address: country"), + max_length=30, null=True, blank=True) + phone = models.CharField(_("Phone"), max_length=18, null=True, blank=True) + phone_desc = models.CharField(_("Phone description"), max_length=300, + null=True, blank=True) + phone2 = models.CharField(_("Phone description 2"), max_length=18, + null=True, blank=True) + phone_desc2 = models.CharField(_("Phone description 2"), max_length=300, + null=True, blank=True) + phone3 = models.CharField(_("Phone 3"), max_length=18, null=True, + blank=True) + phone_desc3 = models.CharField(_("Phone description 3"), max_length=300, + null=True, blank=True) + raw_phone = models.TextField(_("Raw phone"), blank=True, null=True) + mobile_phone = models.CharField(_("Mobile phone"), max_length=18, + null=True, blank=True) + email = models.EmailField( + _("Email"), max_length=300, blank=True, null=True) + alt_address_is_prefered = models.BooleanField( + _("Alternative address is prefered"), default=False) + history = HistoricalRecords(inherit=True) + SUB_ADDRESSES = [] + + class Meta: + abstract = True + + def get_short_html_items(self): + items = [] + if self.address: + items.append( + """<span class="subadress">{}</span>""".format(self.address)) + if self.address_complement: + items.append( + """<span class="subadress-complement">{}</span>""".format( + self.address_complement)) + if self.postal_code: + items.append( + """<span class="postal-code">{}</span>""".format( + self.postal_code)) + if self.precise_town: + items.append( + """<span class="town">{}</span>""".format( + self.precise_town.name)) + elif self.town: + items.append( + """<span class="town">{}</span>""".format( + self.town)) + if self.country: + items.append( + """<span class="country">{}</span>""".format( + self.country)) + return items + + def get_short_html_detail(self): + html = """<div class="address">""" + items = self.get_short_html_items() + if not items: + items = [ + "<span class='no-address'>{}</span>".format( + _("No associated address") + ) + ] + html += "".join(items) + html += """</div>""" + return html + + def get_town_centroid(self): + if self.precise_town: + return self.precise_town.center, self._meta.verbose_name + for sub_address in self.SUB_ADDRESSES: + sub_item = getattr(self, sub_address) + if sub_item and sub_item.precise_town: + return sub_item.precise_town.center, sub_item._meta.verbose_name + + def get_town_polygons(self): + if self.precise_town: + return self.precise_town.limit, self._meta.verbose_name + for sub_address in self.SUB_ADDRESSES: + sub_item = getattr(self, sub_address) + if sub_item and sub_item.precise_town: + return sub_item.precise_town.limit, sub_item._meta.verbose_name + + def get_attribute(self, attr): + if self.town or self.precise_town: + return getattr(self, attr) + for sub_address in self.SUB_ADDRESSES: + sub_item = getattr(self, sub_address) + if not sub_item: + continue + if sub_item.town or sub_item.precise_town: + return getattr(sub_item, attr) + return getattr(self, attr) + + def get_address(self): + return self.get_attribute("address") + + def get_address_complement(self): + return self.get_attribute("address_complement") + + def get_postal_code(self): + return self.get_attribute("postal_code") + + def get_town(self): + return self.get_attribute("town") + + def get_precise_town(self): + return self.get_attribute("precise_town") + + def get_country(self): + return self.get_attribute("country") + + def simple_lbl(self): + return str(self) + + def full_address(self): + lbl = self.simple_lbl() + if lbl: + lbl += "\n" + lbl += self.address_lbl() + return lbl + + def address_lbl(self): + lbl = u'' + prefix = '' + if self.alt_address_is_prefered: + prefix = 'alt_' + if getattr(self, prefix + 'address'): + lbl += getattr(self, prefix + 'address') + if getattr(self, prefix + 'address_complement'): + if lbl: + lbl += "\n" + lbl += getattr(self, prefix + 'address_complement') + postal_code = getattr(self, prefix + 'postal_code') + town = getattr(self, prefix + 'town') + if postal_code or town: + if lbl: + lbl += "\n" + lbl += "{}{}{}".format( + postal_code or '', + " " if postal_code and town else '', + town or '') + if self.phone: + if lbl: + lbl += "\n" + lbl += "{} {}".format(str(_("Tel: ")), self.phone) + if self.mobile_phone: + if lbl: + lbl += "\n" + lbl += "{} {}".format(str(_("Mobile: ")), self.mobile_phone) + if self.email: + if lbl: + lbl += "\n" + lbl += "{} {}".format(str(_("Email: ")), self.email) + return lbl + + +class Merge(models.Model): + merge_key = models.TextField(_("Merge key"), blank=True, null=True) + merge_candidate = models.ManyToManyField("self", blank=True) + merge_exclusion = models.ManyToManyField("self", blank=True) + archived = models.NullBooleanField(default=False, + blank=True, null=True) + # 1 for one word similarity, 2 for two word similarity, etc. + MERGE_CLEMENCY = None + EMPTY_MERGE_KEY = '--' + MERGE_ATTRIBUTE = "name" + + class Meta: + abstract = True + + def generate_merge_key(self): + if self.archived: + return + merge_attr = getattr(self, self.MERGE_ATTRIBUTE) + self.merge_key = slugify(merge_attr if merge_attr else '') + if not self.merge_key: + self.merge_key = self.EMPTY_MERGE_KEY + self.merge_key = self.merge_key + + def generate_merge_candidate(self): + if self.archived: + return + if not self.merge_key: + self.generate_merge_key() + self.save(merge_key_generated=True) + if not self.pk or self.merge_key == self.EMPTY_MERGE_KEY: + return + q = self.__class__.objects \ + .exclude(pk=self.pk) \ + .exclude(merge_exclusion=self) \ + .exclude(merge_candidate=self) \ + .exclude(archived=True) + if not self.MERGE_CLEMENCY: + q = q.filter(merge_key=self.merge_key) + else: + subkeys_front = "-".join( + self.merge_key.split('-')[:self.MERGE_CLEMENCY]) + subkeys_back = "-".join( + self.merge_key.split('-')[-self.MERGE_CLEMENCY:]) + q = q.filter(Q(merge_key__istartswith=subkeys_front) | + Q(merge_key__iendswith=subkeys_back)) + for item in q.all(): + self.merge_candidate.add(item) + + def save(self, *args, **kwargs): + # prevent circular save + merge_key_generated = False + if 'merge_key_generated' in kwargs: + merge_key_generated = kwargs.pop('merge_key_generated') + self.generate_merge_key() + item = super(Merge, self).save(*args, **kwargs) + if not merge_key_generated: + self.merge_candidate.clear() + self.generate_merge_candidate() + return item + + def archive(self): + self.archived = True + self.save() + self.merge_candidate.clear() + self.merge_exclusion.clear() + + def merge(self, item, keep_old=False, exclude_fields=None): + merge_model_objects(self, item, keep_old=keep_old, + exclude_fields=exclude_fields) + self.generate_merge_candidate() + + + +def __get_stats_cache_values(model_name, model_pk): + StatsCache = apps.get_model("ishtar_common", "StatsCache") + q = StatsCache.objects.filter( + model=model_name, model_pk=model_pk + ) + nb = q.count() + if nb >= 1: + sc = q.all()[0] + for extra in q.order_by("-id").all()[1:]: + extra.delete() + else: + sc = StatsCache.objects.create( + model=model_name, model_pk=model_pk + ) + values = sc.values + if not values: + values = {} + return sc, values + + +@task() +def _update_stats(app, model, model_pk, funcname): + model_name = app + "." + model + model = apps.get_model(app, model) + try: + item = model.objects.get(pk=model_pk) + except model.DoesNotExist: + return + value = getattr(item, funcname)() + sc, current_values = __get_stats_cache_values(model_name, model_pk) + current_values[funcname] = value + sc.values = current_values + sc.update_requested = None + sc.updated = datetime.datetime.now() + sc.save() + +def update_stats(statscache, item, funcname): + if not settings.USE_BACKGROUND_TASK: + current_values = statscache.values + if not current_values: + current_values = {} + value = getattr(item, funcname)() + current_values[funcname] = value + statscache.values = current_values + statscache.updated = datetime.datetime.now() + statscache.save() + return current_values + + now = datetime.datetime.now() + app_name = item._meta.app_label + model_name = item._meta.model_name + statscache.update_requested = now.isoformat() + statscache.save() + _update_stats.delay(app_name, model_name, item.pk, funcname) + return statscache.values + + +class DashboardFormItem: + """ + Provide methods to manage statistics + """ + + def last_stats_update(self): + model_name = self._meta.app_label + "." + self._meta.model_name + StatsCache = apps.get_model("ishtar_common", "StatsCache") + q = StatsCache.objects.filter( + model=model_name, model_pk=self.pk).order_by("-updated") + if not q.count(): + return + return q.all()[0].updated + + def _get_or_set_stats(self, funcname, update=False, + expected_type=None): + model_name = self._meta.app_label + "." + self._meta.model_name + StatsCache = apps.get_model("ishtar_common", "StatsCache") + sc, __ = StatsCache.objects.get_or_create( + model=model_name, model_pk=self.pk + ) + if not update: + values = sc.values + if funcname not in values: + if expected_type is not None: + return expected_type() + return 0 + else: + values = update_stats(sc, self, funcname) + if funcname in values: + values = values[funcname] + else: + values = 0 + if expected_type is not None and not isinstance(values, expected_type): + return expected_type() + return values + + @classmethod + def get_periods(cls, slice='month', fltr={}, date_source='creation'): + date_var = date_source + '_date' + q = cls.objects.filter(**{date_var + '__isnull': False}) + if fltr: + q = q.filter(**fltr) + if slice == 'year': + return [res[date_var].year for res in list(q.values(date_var) + .annotate( + Count("id")).order_by())] + elif slice == 'month': + return [(res[date_var].year, res[date_var].month) + for res in list(q.values(date_var) + .annotate(Count("id")).order_by())] + return [] + + @classmethod + def get_by_year(cls, year, fltr={}, date_source='creation'): + date_var = date_source + '_date' + q = cls.objects.filter(**{date_var + '__isnull': False}) + if fltr: + q = q.filter(**fltr) + return q.filter( + **{date_var + '__year': year}).order_by('pk').distinct('pk') + + @classmethod + def get_by_month(cls, year, month, fltr={}, date_source='creation'): + date_var = date_source + '_date' + q = cls.objects.filter(**{date_var + '__isnull': False}) + if fltr: + q = q.filter(**fltr) + q = q.filter( + **{date_var + '__year': year, date_var + '__month': month}) + return q.order_by('pk').distinct('pk') + + @classmethod + def get_total_number(cls, fltr=None): + q = cls.objects + if fltr: + q = q.filter(**fltr) + return q.order_by('pk').distinct('pk').count() + + +class DocumentItem: + ALT_NAMES = { + 'documents__image__isnull': + SearchAltName( + pgettext_lazy("key for text search", "has-image"), + 'documents__image__isnull'), + 'documents__associated_url__isnull': + SearchAltName( + pgettext_lazy("key for text search", "has-url"), + 'documents__associated_url__isnull'), + 'documents__associated_file__isnull': + SearchAltName( + pgettext_lazy("key for text search", "has-attached-file"), + 'documents__associated_file__isnull'), + } + + def public_representation(self): + images = [] + if getattr(self, "main_image", None): + images.append(self.main_image.public_representation()) + images += [ + image.public_representation() + for image in self.images_without_main_image.all() + ] + return {"images": images} + + @property + def images(self): + if not hasattr(self, 'documents'): + Document = apps.get_model("ishtar_common", "Document") + return Document.objects.none() + return self.documents.filter( + image__isnull=False).exclude(image="").order_by("pk") + + @property + def images_without_main_image(self): + if not hasattr(self, 'main_image') or not hasattr(self, 'documents'): + return self.images + if not self.main_image: + return self.documents.filter( + image__isnull=False).exclude( + image="").order_by("pk") + return self.documents.filter( + image__isnull=False).exclude( + image="").exclude(pk=self.main_image.pk).order_by("pk") + + def get_extra_actions(self, request): + """ + For sheet template: return "Add document / image" action + """ + # url, base_text, icon, extra_text, extra css class, is a quick action + try: + actions = super(DocumentItem, self).get_extra_actions(request) + except AttributeError: + actions = [] + + if not hasattr(self, 'SLUG'): + return actions + + can_add_doc = self.can_do(request, 'add_document') + if can_add_doc and ( + not hasattr(self, "is_locked") or + not self.is_locked(request.user)): + actions += [ + ( + reverse("create-document") + "?{}={}".format( + self.SLUG, self.pk), + _("Add document/image"), + "fa fa-plus", + _("doc./image"), + "", + False + ) + ] + return actions + + +def document_attached_changed(sender, **kwargs): + # associate a default main image + instance = kwargs.get("instance", None) + model = kwargs.get("model", None) + pk_set = kwargs.get("pk_set", None) + if not instance or not model: + return + + if hasattr(instance, "documents"): + items = [instance] + else: + if not pk_set: + return + try: + items = [model.objects.get(pk=pk) for pk in pk_set] + except model.DoesNotExist: + return + + for item in items: + q = item.documents.filter( + image__isnull=False).exclude(image='') + if item.main_image: + if q.filter(pk=item.main_image.pk).count(): + return + # the association has disappear not the main image anymore + item.main_image = None + item.skip_history_when_saving = True + item.save() + if not q.count(): + return + # by default get the lowest pk + item.main_image = q.order_by('pk').all()[0] + item.skip_history_when_saving = True + item.save() + + +class QuickAction: + """ + Quick action available from tables + """ + + def __init__(self, url, icon_class='', text='', target=None, rights=None, + module=None): + self.url = url + self.icon_class = icon_class + self.text = text + self.rights = rights + self.target = target + self.module = module + assert self.target in ('one', 'many', None) + + def is_available(self, user, session=None, obj=None): + if self.module and not getattr(get_current_profile(), self.module): + return False + if not self.rights: # no restriction + return True + if not user or not hasattr(user, 'ishtaruser') or not user.ishtaruser: + return False + user = user.ishtaruser + + for right in self.rights: + if user.has_perm(right, session=session, obj=obj): + return True + return False + + @property + def rendered_icon(self): + if not self.icon_class: + return "" + return "<i class='{}' aria-hidden='true'></i>".format(self.icon_class) + + @property + def base_url(self): + if self.target is None: + url = reverse(self.url) + else: + # put arbitrary pk for the target + url = reverse(self.url, args=[0]) + url = url[:-2] # all quick action url have to finish with the + # pk of the selected item and a "/" + return url + + +class DynamicRequest: + def __init__(self, label, app_name, model_name, form_key, search_key, + type_query, search_query): + self.label = label + self.form_key = form_key + self.search_key = search_key + self.app_name = app_name + self.model_name = model_name + self.type_query = type_query + self.search_query = search_query + + def get_all_types(self): + model = apps.get_app_config(self.app_name).get_model(self.model_name) + return model.objects.filter(available=True) + + def get_form_fields(self): + fields = {} + for item in self.get_all_types().all(): + fields[self.form_key + "-" + item.txt_idx] = forms.CharField( + label=str(self.label) + " " + str(item), + required=False + ) + return fields + + def get_extra_query(self, slug): + return { + self.type_query: slug + } + + def get_alt_names(self): + alt_names = {} + for item in self.get_all_types().all(): + alt_names[self.form_key + "-" + item.txt_idx] = SearchAltName( + self.search_key + "-" + item.txt_idx, self.search_query, + self.get_extra_query(item.txt_idx), distinct_query=True + ) + return alt_names + + +class SpatialReferenceSystem(GeneralType): + order = models.IntegerField(_("Order"), default=10) + auth_name = models.CharField( + _("Authority name"), default=u'EPSG', max_length=256) + srid = models.IntegerField(_("Authority SRID")) + + class Meta: + verbose_name = _("Spatial reference system") + verbose_name_plural = _("Spatial reference systems") + ordering = ('label',) + + +post_save.connect(post_save_cache, sender=SpatialReferenceSystem) +post_delete.connect(post_save_cache, sender=SpatialReferenceSystem) + + +class GeoItem(models.Model): + GEO_SOURCE = ( + ('T', _("Town")), ('P', _("Precise")), ('M', _("Polygon")) + ) + + # gis + x = models.FloatField(_(u'X'), blank=True, null=True) + y = models.FloatField(_(u'Y'), blank=True, null=True) + z = models.FloatField(_(u'Z'), blank=True, null=True) + estimated_error_x = models.FloatField(_(u'Estimated error for X'), + blank=True, null=True) + estimated_error_y = models.FloatField(_(u'Estimated error for Y'), + blank=True, null=True) + estimated_error_z = models.FloatField(_(u'Estimated error for Z'), + blank=True, null=True) + spatial_reference_system = models.ForeignKey( + SpatialReferenceSystem, verbose_name=_("Spatial Reference System"), + blank=True, null=True) + point = models.PointField(_("Point"), blank=True, null=True, dim=3) + point_2d = models.PointField(_("Point (2D)"), blank=True, null=True) + point_source = models.CharField( + _("Point source"), choices=GEO_SOURCE, max_length=1, blank=True, + null=True) + point_source_item = models.CharField( + _("Point source item"), max_length=100, blank=True, null=True) + multi_polygon = models.MultiPolygonField(_("Multi polygon"), blank=True, + null=True) + multi_polygon_source = models.CharField( + _("Multi-polygon source"), choices=GEO_SOURCE, max_length=1, + blank=True, null=True) + multi_polygon_source_item = models.CharField( + _("Multi polygon source item"), max_length=100, blank=True, null=True) + + GEO_LABEL = "" + + class Meta: + abstract = True + + def get_town_centroid(self): + raise NotImplementedError + + def get_town_polygons(self): + raise NotImplementedError + + @property + def display_coordinates(self): + if not self.point_2d: + return "" + profile = get_current_profile() + if not profile.display_srs or not profile.display_srs.srid: + return self.x, self.y + point = self.point_2d.transform(profile.display_srs.srid, clone=True) + return round(point.x, 5), round(point.y, 5) + + @property + def display_spatial_reference_system(self): + profile = get_current_profile() + if not profile.display_srs or not profile.display_srs.srid: + return self.spatial_reference_system + return profile.display_srs + + def get_precise_points(self): + if self.point_source == 'P' and self.point_2d: + return self.point_2d, self.point, self.point_source_item + + def get_precise_polygons(self): + if self.multi_polygon_source == 'P' and self.multi_polygon: + return self.multi_polygon, self.multi_polygon_source_item + + def most_precise_geo(self): + if self.point_source == 'M': + return 'multi_polygon' + current_source = str(self.__class__._meta.verbose_name) + if self.multi_polygon_source_item == current_source \ + and (self.multi_polygon_source == "P" or + self.point_source_item != current_source): + return 'multi_polygon' + if self.point_source_item == current_source \ + and self.point_source == 'P': + return 'point' + if self.multi_polygon_source == 'P': + return 'multi_polygon' + if self.point_source == 'P': + return 'point' + if self.multi_polygon: + return 'multi_polygon' + if self.point_2d: + return 'point' + + def geo_point_source(self): + if not self.point_source: + return "" + src = "{} - {}".format( + dict(self.GEO_SOURCE)[self.point_source], + self.point_source_item + ) + return src + + def geo_polygon_source(self): + if not self.multi_polygon_source: + return "" + src = "{} - {}".format( + dict(self.GEO_SOURCE)[self.multi_polygon_source], + self.multi_polygon_source_item + ) + return src + + def _geojson_serialize(self, geom_attr): + if not hasattr(self, geom_attr): + return "" + cached_label_key = 'cached_label' + if self.GEO_LABEL: + cached_label_key = self.GEO_LABEL + if getattr(self, "CACHED_LABELS", None): + cached_label_key = self.CACHED_LABELS[-1] + geojson = serialize( + 'geojson', + self.__class__.objects.filter(pk=self.pk), + geometry_field=geom_attr, fields=(cached_label_key,)) + geojson_dct = json.loads(geojson) + profile = get_current_profile() + precision = profile.point_precision + + features = geojson_dct.pop('features') + for idx in range(len(features)): + feature = features[idx] + lbl = feature['properties'].pop(cached_label_key) + feature['properties']['name'] = lbl + feature['properties']['id'] = self.pk + if precision is not None: + geom_type = feature["geometry"].get("type", None) + if geom_type == "Point": + feature["geometry"]["coordinates"] = [ + round(coord, precision) + for coord in feature["geometry"]["coordinates"] + ] + geojson_dct['features'] = features + geojson_dct['link_template'] = simple_link_to_window(self).replace( + '999999', '<pk>' + ) + geojson = json.dumps(geojson_dct) + return geojson + + @property + def point_2d_geojson(self): + return self._geojson_serialize('point_2d') + + @property + def multi_polygon_geojson(self): + return self._geojson_serialize('multi_polygon') + + +class ImageContainerModel: + def _get_image_path(self, filename): + return "{}/{}".format(self._get_base_image_path(), filename) + + def _get_base_image_path(self): + n = datetime.datetime.now() + return "upload/{}/{:02d}/{:02d}".format(n.year, n.month, n.day) + + +class QRCodeItem(models.Model, ImageContainerModel): + HAS_QR_CODE = True + qrcode = models.ImageField(upload_to=get_image_path, blank=True, null=True, + max_length=255) + + class Meta: + abstract = True + + @property + def qrcode_path(self): + if not self.qrcode: + self.generate_qrcode() + if not self.qrcode: # error on qrcode generation + return "" + return self.qrcode.path + + def generate_qrcode(self, request=None, secure=True, tmpdir=None): + url = self.get_absolute_url() + site = Site.objects.get_current() + if request: + scheme = request.scheme + else: + if secure: + scheme = "https" + else: + scheme = "http" + url = scheme + "://" + site.domain + url + TinyUrl = apps.get_model("ishtar_common", "TinyUrl") + tiny_url = TinyUrl() + tiny_url.link = url + tiny_url.save() + short_url = scheme + "://" + site.domain + reverse( + 'tiny-redirect', args=[tiny_url.get_short_id()]) + qr = pyqrcode.create(short_url, version=settings.ISHTAR_QRCODE_VERSION) + tmpdir_created = False + if not tmpdir: + tmpdir = tempfile.mkdtemp("-qrcode") + tmpdir_created = True + filename = tmpdir + os.sep + 'qrcode.png' + qr.png(filename, scale=settings.ISHTAR_QRCODE_SCALE) + with open(filename, 'rb') as qrfile: + self.qrcode.save("qrcode.png", File(qrfile)) + self.skip_history_when_saving = True + self._no_move = True + self.save() + if tmpdir_created: + shutil.rmtree(tmpdir) + + +class SearchVectorConfig: + def __init__(self, key, language=None, func=None): + self.key = key + if language: + self.language = language + if language == "local": + self.language = settings.ISHTAR_SEARCH_LANGUAGE + else: + self.language = "simple" + self.func = func + + def format(self, value): + if value == 'None': + value = '' + if not self.func: + return [value] + return self.func(value) + + +class ShortMenuItem: + """ + Item available in the short menu + """ + UP_MODEL_QUERY = {} + + @classmethod + def get_short_menu_class(cls, pk): + return '' + + @property + def short_class_name(self): + return "" + + +class MainItem(ShortMenuItem): + """ + Item with quick actions available from tables + Extra actions are available from sheets + """ + QUICK_ACTIONS = [] + + @classmethod + def get_quick_actions(cls, user, session=None, obj=None): + """ + Get a list of (url, title, icon, target) actions for an user + """ + qas = [] + for action in cls.QUICK_ACTIONS: + if not action.is_available(user, session=session, obj=obj): + continue + qas.append([action.base_url, + mark_safe(action.text), + mark_safe(action.rendered_icon), + action.target or ""]) + return qas + + @classmethod + def get_quick_action_by_url(cls, url): + for action in cls.QUICK_ACTIONS: + if action.url == url: + return action + + def regenerate_external_id(self): + if not hasattr(self, "external_id"): + return + self.skip_history_when_saving = True + self._no_move = True + if hasattr(self, "auto_external_id"): + self.external_id = None + self.save() + + def get_extra_actions(self, request): + if not hasattr(self, 'SLUG'): + return [] + + actions = [] + if request.user.is_superuser and hasattr(self, "auto_external_id"): + actions += [ + ( + reverse("regenerate-external-id") + "?{}={}".format( + self.SLUG, self.pk), + _("Regenerate ID"), + "fa fa-key", + _("regen."), + "", + True + ) + ] + + return actions |