#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Generic models and tools for models """ import copy from collections import OrderedDict import datetime import json import logging import os import pyqrcode import shutil import tempfile import time from django import forms from django.apps import apps from django.conf import settings from django.contrib.auth.models import User, Group from django.contrib.contenttypes.models import ContentType from django.contrib.gis.db import models from django.contrib.gis.geos import Point from django.contrib.postgres.fields import JSONField from django.contrib.postgres.search import SearchVectorField, SearchVector from django.contrib.sites.models import Site from django.core.cache import cache from django.core.exceptions import ObjectDoesNotExist from django.core.files import File from django.core.serializers import serialize from django.core.urlresolvers import reverse, NoReverseMatch from django.core.validators import validate_slug from django.db import connection from django.db.models import Q, Count, Max from django.db.models.signals import post_save, post_delete, m2m_changed from django.template.defaultfilters import slugify from django.utils.safestring import SafeText, mark_safe from django.utils.translation import activate, deactivate from ishtar_common.utils import ugettext_lazy as _, pgettext_lazy, get_image_path from simple_history.models import HistoricalRecords as BaseHistoricalRecords from simple_history.signals import ( post_create_historical_record, pre_create_historical_record, ) from unidecode import unidecode from ishtar_common.model_managers import TypeManager from ishtar_common.model_merging import merge_model_objects from ishtar_common.models_imports import Import from ishtar_common.templatetags.link_to_window import simple_link_to_window from ishtar_common.utils import ( get_cache, disable_for_loaddata, get_all_field_names, merge_tsvectors, cached_label_changed, post_save_geo, task, duplicate_item, get_generated_id, get_current_profile, ) logger = logging.getLogger(__name__) """ from ishtar_common.models import GeneralType, get_external_id, \ LightHistorizedItem, OwnPerms, Address, post_save_cache, \ DashboardFormItem, document_attached_changed, SearchAltName, \ DynamicRequest, GeoItem, QRCodeItem, SearchVectorConfig, DocumentItem, \ QuickAction, MainItem, Merge """ class CachedGen(object): @classmethod def refresh_cache(cls): raise NotImplementedError() @classmethod def _add_cache_key_to_refresh(cls, keys): cache_ckey, current_keys = get_cache(cls, ["_current_keys"]) if type(current_keys) != list: current_keys = [] if keys not in current_keys: current_keys.append(keys) cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT) class Cached(CachedGen): slug_field = "txt_idx" @classmethod def refresh_cache(cls): cache_ckey, current_keys = get_cache(cls, ["_current_keys"]) if not current_keys: return for keys in current_keys: if len(keys) == 2 and keys[0] == "__slug": cls.get_cache(keys[1], force=True) elif keys[0] == "__get_types": default = None empty_first = True exclude = [] if len(keys) >= 2: default = keys.pop() if len(keys) > 1: empty_first = bool(keys.pop()) exclude = keys[1:] cls.get_types( exclude=exclude, empty_first=empty_first, default=default, force=True, ) elif keys[0] == "__get_help": cls.get_help(force=True) @classmethod def _add_cache_key_to_refresh(cls, keys): cache_ckey, current_keys = get_cache(cls, ["_current_keys"]) if type(current_keys) != list: current_keys = [] if keys not in current_keys: current_keys.append(keys) cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT) @classmethod def get_cache(cls, slug, force=False): cache_key, value = get_cache(cls, ["__slug", slug]) if not force and value: return value try: k = {cls.slug_field: slug} obj = cls.objects.get(**k) cache.set(cache_key, obj, settings.CACHE_TIMEOUT) return obj except cls.DoesNotExist: cache.set(cache_key, None, settings.CACHE_TIMEOUT) return None @disable_for_loaddata def post_save_cache(sender, **kwargs): sender.refresh_cache() class GeneralType(Cached, models.Model): """ Abstract class for "types" """ label = models.TextField(_("Label")) txt_idx = models.TextField( _("Textual ID"), validators=[validate_slug], unique=True, help_text=_( "The slug is the standardized version of the name. It contains " "only lowercase letters, numbers and hyphens. Each slug must " "be unique." ), ) comment = models.TextField(_("Comment"), blank=True, default="") available = models.BooleanField(_("Available"), default=True) HELP_TEXT = "" objects = TypeManager() class Meta: abstract = True def __str__(self): return self.label def natural_key(self): return (self.txt_idx,) def history_compress(self): return self.txt_idx @classmethod def get_documentation_string(cls): """ Used for automatic documentation generation """ s = "**label** {}, **txt_idx** {}".format(str(_("Label")), str(_("Textual ID"))) if hasattr(cls, "extra_documentation_string"): s += cls.extra_documentation_string() return s @classmethod def admin_url(cls): return str( reverse( "admin:{}_{}_changelist".format( cls._meta.app_label, cls._meta.model_name ) ) ) @classmethod def history_decompress(cls, value, create=False): if not value: return [] res = [] for txt_idx in value: try: res.append(cls.objects.get(txt_idx=txt_idx)) except cls.DoesNotExist: continue return res @property def explicit_label(self): return "{} ({})".format(self.label, self._meta.verbose_name) @classmethod def create_default_for_test(cls): return [cls.objects.create(label="Test %d" % i) for i in range(5)] @property def short_label(self): return self.label @property def name(self): return self.label @classmethod def get_or_create(cls, slug, label=""): """ Get or create a new item. :param slug: textual id :param label: label for initialization if the item doesn't exist (not mandatory) :return: instancied item of the base class """ item = cls.get_cache(slug) if item: return item item, created = cls.objects.get_or_create( txt_idx=slug, defaults={"label": label} ) return item @classmethod def get_or_create_pk(cls, slug): """ Get an id from a slug. Create the associated item if needed. :param slug: textual id :return: id of the item (string) """ return str(cls.get_or_create(slug).pk) @classmethod def get_or_create_pks(cls, slugs): """ Get and merge a list of ids from a slug list. Create the associated items if needed. :param slugs: textual ids :return: string with ids separated by "_" """ items = [] for slug in slugs: items.append(str(cls.get_or_create(slug).pk)) return "_".join(items) @classmethod def get_help(cls, dct=None, exclude=None, force=False, full_hierarchy=None): if not dct: dct = {} if not exclude: exclude = [] keys = ["__get_help"] keys += ["{}".format(ex) for ex in exclude] keys += ["{}-{}".format(str(k), dct[k]) for k in dct] cache_key, value = get_cache(cls, keys) if value and not force: return mark_safe(value) help_text = cls.HELP_TEXT c_rank = -1 help_items = "\n" for item in cls.get_types(dct=dct, instances=True, exclude=exclude): if hasattr(item, "__iter__"): pk = item[0] item = cls.objects.get(pk=pk) item.rank = c_rank + 1 if hasattr(item, "parent"): c_item = item parents = [] while c_item.parent: parents.append(c_item.parent.label) c_item = c_item.parent parents.reverse() parents.append(item.label) item.label = " / ".join(parents) if not item.comment: continue if c_rank > item.rank: help_items += "\n" elif c_rank < item.rank: help_items += "
\n" c_rank = item.rank help_items += "
%s
%s
" % ( item.label, "
".join(item.comment.split("\n")), ) c_rank += 1 if c_rank: help_items += c_rank * "
" if help_text or help_items != "\n": help_text = help_text + help_items else: help_text = "" cache.set(cache_key, help_text, settings.CACHE_TIMEOUT) return mark_safe(help_text) @classmethod def _get_initial_types(cls, initial, type_pks, instance=False): new_vals = [] if not initial: return [] if type(initial) not in (list, tuple): initial = [initial] for value in initial: try: pk = int(value) except (ValueError, TypeError): continue if pk in type_pks: continue try: extra_type = cls.objects.get(pk=pk) if instance: new_vals.append(extra_type) else: new_vals.append((extra_type.pk, str(extra_type))) except cls.DoesNotExist: continue return new_vals @classmethod def get_types( cls, dct=None, instances=False, exclude=None, empty_first=True, default=None, initial=None, force=False, full_hierarchy=False, ): if not dct: dct = {} if not exclude: exclude = [] types = [] if not instances and empty_first and not default: types = [("", "--")] types += cls._pre_get_types( dct, instances, exclude, default, force, get_full_hierarchy=full_hierarchy ) if not initial: return types new_vals = cls._get_initial_types(initial, [idx for idx, lbl in types]) types += new_vals return types @classmethod def _pre_get_types( cls, dct=None, instances=False, exclude=None, default=None, force=False, get_full_hierarchy=False, ): if not dct: dct = {} if not exclude: exclude = [] # cache cache_key = None if not instances: keys = ["__get_types"] keys += ["{}".format(ex) for ex in exclude] + ["{}".format(default)] keys += ["{}-{}".format(str(k), dct[k]) for k in dct] cache_key, value = get_cache(cls, keys) if value and not force: return value base_dct = dct.copy() if hasattr(cls, "parent"): if not cache_key: return cls._get_parent_types( base_dct, instances, exclude=exclude, default=default, get_full_hierarchy=get_full_hierarchy, ) vals = [ v for v in cls._get_parent_types( base_dct, instances, exclude=exclude, default=default, get_full_hierarchy=get_full_hierarchy, ) ] cache.set(cache_key, vals, settings.CACHE_TIMEOUT) return vals if not cache_key: return cls._get_types(base_dct, instances, exclude=exclude, default=default) vals = [ v for v in cls._get_types( base_dct, instances, exclude=exclude, default=default ) ] cache.set(cache_key, vals, settings.CACHE_TIMEOUT) return vals @classmethod def _get_types(cls, dct=None, instances=False, exclude=None, default=None): if not dct: dct = {} if not exclude: exclude = [] dct["available"] = True if default: try: default = cls.objects.get(txt_idx=default) yield (default.pk, _(str(default))) except cls.DoesNotExist: pass items = cls.objects.filter(**dct) if default and default != "None": if hasattr(default, "txt_idx"): exclude.append(default.txt_idx) else: exclude.append(default) if exclude: items = items.exclude(txt_idx__in=exclude) for item in items.order_by(*cls._meta.ordering).all(): if instances: item.rank = 0 yield item else: yield (item.pk, _(str(item)) if item and str(item) else "") @classmethod def _get_childs_list(cls, dct=None, exclude=None, instances=False): if not dct: dct = {} if not exclude: exclude = [] if "parent" in dct: dct.pop("parent") childs = cls.objects.filter(**dct) if exclude: childs = childs.exclude(txt_idx__in=exclude) if hasattr(cls, "order"): childs = childs.order_by("order") res = {} if instances: for item in childs.all(): parent_id = item.parent_id or 0 if parent_id not in res: res[parent_id] = [] res[parent_id].append(item) else: for item in childs.values("id", "parent_id", "label").all(): parent_id = item["parent_id"] or 0 if item["id"] == item["parent_id"]: parent_id = 0 if parent_id not in res: res[parent_id] = [] res[parent_id].append((item["id"], item["label"])) return res PREFIX = "│ " PREFIX_EMPTY = "  " PREFIX_MEDIUM = "├ " PREFIX_LAST = "└ " PREFIX_CODES = ["\u2502", "\u251C", "\u2514"] @classmethod def _get_childs( cls, item, child_list, prefix=0, instances=False, is_last=False, last_of=None, get_full_hierarchy=False, ): if not last_of: last_of = [] prefix += 1 current_child_lst = [] if item in child_list: current_child_lst = child_list[item] lst = [] total = len(current_child_lst) full_hierarchy_initial = get_full_hierarchy for idx, child in enumerate(current_child_lst): mylast_of = last_of[:] p = "" if instances: child.rank = prefix lst.append(child) else: if full_hierarchy_initial: if isinstance(full_hierarchy_initial, str): p = full_hierarchy_initial + " > " else: p = "" else: cprefix = prefix while cprefix: cprefix -= 1 if not cprefix: if (idx + 1) == total: p += cls.PREFIX_LAST else: p += cls.PREFIX_MEDIUM elif is_last: if mylast_of: clast = mylast_of.pop(0) if clast: p += cls.PREFIX_EMPTY else: p += cls.PREFIX else: p += cls.PREFIX_EMPTY else: p += cls.PREFIX lst.append((child[0], SafeText(p + str(_(child[1]))))) clast_of = last_of[:] clast_of.append(idx + 1 == total) if instances: child_id = child.id else: child_id = child[0] if get_full_hierarchy: if p: if not p.endswith(" > "): p += " > " get_full_hierarchy = p + child[1] else: get_full_hierarchy = child[1] for sub_child in cls._get_childs( child_id, child_list, prefix, instances, is_last=((idx + 1) == total), last_of=clast_of, get_full_hierarchy=get_full_hierarchy, ): lst.append(sub_child) return lst @classmethod def _get_parent_types( cls, dct=None, instances=False, exclude=None, default=None, get_full_hierarchy=False, ): if not dct: dct = {} if not exclude: exclude = [] dct["available"] = True child_list = cls._get_childs_list(dct, exclude, instances) if 0 in child_list: for item in child_list[0]: if instances: item.rank = 0 item_id = item.pk yield item else: item_id = item[0] yield item if get_full_hierarchy: get_full_hierarchy = item[1] for child in cls._get_childs( item_id, child_list, instances=instances, get_full_hierarchy=get_full_hierarchy, ): yield child def save(self, *args, **kwargs): ItemKey = apps.get_model("ishtar_common", "ItemKey") if not self.id and not self.label: txt_idx = self.txt_idx if isinstance(txt_idx, list): txt_idx = txt_idx[0] self.txt_idx = txt_idx self.label = " ".join(" ".join(self.txt_idx.split("-")).split("_")).title() if not self.txt_idx: self.txt_idx = slugify(self.label)[:100] # clean old keys if self.pk: old = self.__class__.objects.get(pk=self.pk) content_type = ContentType.objects.get_for_model(self.__class__) if slugify(self.label) != slugify(old.label): ItemKey.objects.filter( object_id=self.pk, key=slugify(old.label), content_type=content_type ).delete() if self.txt_idx != old.txt_idx: ItemKey.objects.filter( object_id=self.pk, key=old.txt_idx, content_type=content_type ).delete() obj = super(GeneralType, self).save(*args, **kwargs) self.generate_key(force=True) return obj def add_key(self, key, force=False, importer=None, group=None, user=None): ItemKey = apps.get_model("ishtar_common", "ItemKey") content_type = ContentType.objects.get_for_model(self.__class__) if ( not importer and not force and ItemKey.objects.filter(key=key, content_type=content_type).count() ): return filtr = {"key": key, "content_type": content_type} if group: filtr["group"] = group elif user: filtr["user"] = user else: filtr["importer"] = importer if force: ItemKey.objects.filter(**filtr).exclude(object_id=self.pk).delete() filtr["object_id"] = self.pk ItemKey.objects.get_or_create(**filtr) def generate_key(self, force=False): for key in (slugify(self.label), self.txt_idx): self.add_key(key) def get_keys(self, importer): ItemKey = apps.get_model("ishtar_common", "ItemKey") keys = [self.txt_idx] content_type = ContentType.objects.get_for_model(self.__class__) base_q = Q(content_type=content_type, object_id=self.pk) subquery = Q(importer__isnull=True, user__isnull=True, group__isnull=True) subquery |= Q(user__isnull=True, group__isnull=True, importer=importer) if importer.user: subquery |= Q(user=importer.user, group__isnull=True, importer=importer) if importer.associated_group: subquery |= Q( user__isnull=True, group=importer.associated_group, importer=importer ) q = ItemKey.objects.filter(base_q & subquery) for ik in q.exclude(key=self.txt_idx).all(): keys.append(ik.key) return keys @classmethod def generate_keys(cls): # content_type = ContentType.objects.get_for_model(cls) for item in cls.objects.all(): item.generate_key() class HierarchicalType(GeneralType): parent = models.ForeignKey( "self", blank=True, null=True, on_delete=models.SET_NULL, verbose_name=_("Parent"), ) class Meta: abstract = True def full_label(self): lbls = [self.label] item = self while item.parent: item = item.parent lbls.append(item.label) return " > ".join(reversed(lbls)) @property def first_parent(self): parent = self.parent parents = [] while parent: if parent in parents: # prevent circular return parent parents.append(parent) if not parent.parent: return parent parent = parent.parent class StatisticItem: STATISTIC_MODALITIES = [] # example: "year", "operation_type__label" STATISTIC_MODALITIES_OPTIONS = OrderedDict() # example: # OrderedDict([('year', _("Year")), # ("operation_type__label", _("Operation type"))]) STATISTIC_SUM_VARIABLE = OrderedDict( (("pk", (_("Number"), 1)),) ) # example: "Price", "Volume" - the number is a multiplier class TemplateItem: @classmethod def _label_templates_q(cls): model_name = "{}.{}".format(cls.__module__, cls.__name__) q = Q(associated_model__klass=model_name, for_labels=True, available=True) alt_model_name = model_name.replace("models_finds", "models").replace( "models_treatments", "models" ) if alt_model_name != model_name: q |= Q(associated_model__klass=model_name, for_labels=True, available=True) DocumentTemplate = apps.get_model("ishtar_common", "DocumentTemplate") return DocumentTemplate.objects.filter(q) @classmethod def has_label_templates(cls): return cls._label_templates_q().count() @classmethod def label_templates(cls): return cls._label_templates_q() def get_extra_templates(self, request): cls = self.__class__ templates = [] name = str(cls.__name__) module = str(cls.__module__) if "archaeological_finds" in module: if "models_finds" in name or "models_treatments" in name: names = [ name, name.replace("models_finds", "models").replace( "models_treatments", "models" ), ] else: names = [ name, name.replace("models", "models_finds"), name.replace("models", "models_treatments"), ] else: names = [name] model_names = ["{}.{}".format(module, name) for name in names] DocumentTemplate = apps.get_model("ishtar_common", "DocumentTemplate") q = DocumentTemplate.objects.filter( associated_model__klass__in=model_names, for_labels=False, available=True ) for template in q.all(): urlname = "generate-document" templates.append( (template.name, reverse(urlname, args=[template.slug, self.pk])) ) return templates class FullSearch(models.Model): search_vector = SearchVectorField( _("Search vector"), blank=True, null=True, help_text=_("Auto filled at save") ) EXTRA_REQUEST_KEYS = {} DYNAMIC_REQUESTS = {} ALT_NAMES = {} BASE_SEARCH_VECTORS = [] PROPERTY_SEARCH_VECTORS = [] INT_SEARCH_VECTORS = [] M2M_SEARCH_VECTORS = [] PARENT_SEARCH_VECTORS = [] # prevent circular dependency PARENT_ONLY_SEARCH_VECTORS = [] class Meta: abstract = True @classmethod def general_types(cls): for k in get_all_field_names(cls): field = cls._meta.get_field(k) if not hasattr(field, "rel") or not field.rel: continue rel_model = field.rel.to if issubclass(rel_model, (GeneralType, HierarchicalType)): yield k @classmethod def get_alt_names(cls): alt_names = cls.ALT_NAMES.copy() for dr_k in cls.DYNAMIC_REQUESTS: alt_names.update(cls.DYNAMIC_REQUESTS[dr_k].get_alt_names()) return alt_names @classmethod def get_query_parameters(cls): query_parameters = {} for v in cls.get_alt_names().values(): for language_code, language_lbl in settings.LANGUAGES: activate(language_code) query_parameters[str(v.search_key)] = v deactivate() return query_parameters def _update_search_field(self, search_vector_conf, search_vectors, data): for value in search_vector_conf.format(data): with connection.cursor() as cursor: cursor.execute( "SELECT to_tsvector(%s, %s)", [search_vector_conf.language, value] ) row = cursor.fetchone() search_vectors.append(row[0]) def _update_search_number_field(self, search_vectors, val): search_vectors.append("'{}':1".format(val)) def update_search_vector(self, save=True, exclude_parent=False): """ Update the search vector :param save: True if you want to save the object immediately :return: True if modified """ if not hasattr(self, "search_vector"): return if not self.pk: # logger.warning("Cannot update search vector before save or " # "after deletion.") return if ( not self.BASE_SEARCH_VECTORS and not self.M2M_SEARCH_VECTORS and not self.INT_SEARCH_VECTORS and not self.PROPERTY_SEARCH_VECTORS and not self.PARENT_SEARCH_VECTORS ): logger.warning("No search_vectors defined for {}".format(self.__class__)) return if getattr(self, "_search_updated", None): return JsonDataField = apps.get_model("ishtar_common", "JsonDataField") self._search_updated = True old_search = "" if self.search_vector: old_search = self.search_vector[:] search_vectors = [] base_q = self.__class__.objects.filter(pk=self.pk) # many to many have to be queried one by one otherwise only one is fetch for m2m_search_vector in self.M2M_SEARCH_VECTORS: key = m2m_search_vector.key.split("__")[0] rel_key = getattr(self, key) for item in rel_key.values("pk").all(): query_dct = {key + "__pk": item["pk"]} q = copy.copy(base_q).filter(**query_dct) q = q.annotate( search=SearchVector( m2m_search_vector.key, config=m2m_search_vector.language ) ).values("search") search_vectors.append(q.all()[0]["search"]) # int/float are not well managed by the SearchVector for int_search_vector in self.INT_SEARCH_VECTORS: q = base_q.values(int_search_vector.key) for val in int_search_vector.format(q.all()[0][int_search_vector.key]): self._update_search_number_field(search_vectors, val) if not exclude_parent: # copy parent vector fields for PARENT_SEARCH_VECTOR in self.PARENT_SEARCH_VECTORS: parent = getattr(self, PARENT_SEARCH_VECTOR) if hasattr(parent, "all"): # m2m for p in parent.all(): search_vectors.append(p.search_vector) elif parent: search_vectors.append(parent.search_vector) for PARENT_ONLY_SEARCH_VECTOR in self.PARENT_ONLY_SEARCH_VECTORS: parent = getattr(self, PARENT_ONLY_SEARCH_VECTOR) if hasattr(parent, "all"): # m2m for p in parent.all(): search_vectors.append( p.update_search_vector(save=False, exclude_parent=True) ) elif parent: search_vectors.append( parent.update_search_vector(save=False, exclude_parent=True) ) if self.BASE_SEARCH_VECTORS: # query "simple" fields q = base_q.values(*[sv.key for sv in self.BASE_SEARCH_VECTORS]) res = q.all()[0] for base_search_vector in self.BASE_SEARCH_VECTORS: data = res[base_search_vector.key] data = unidecode(str(data)) self._update_search_field(base_search_vector, search_vectors, data) if self.PROPERTY_SEARCH_VECTORS: for property_search_vector in self.PROPERTY_SEARCH_VECTORS: data = getattr(self, property_search_vector.key) if callable(data): data = data() if not data: continue data = str(data) self._update_search_field(property_search_vector, search_vectors, data) if hasattr(self, "data") and self.data: content_type = ContentType.objects.get_for_model(self) for json_field in JsonDataField.objects.filter( content_type=content_type, search_index=True ).all(): data = copy.deepcopy(self.data) no_data = False for key in json_field.key.split("__"): if key not in data: no_data = True break data = data[key] if no_data or not data: continue if json_field.value_type == "B": if data is True: data = json_field.name else: continue elif json_field.value_type in ("I", "F"): self._update_search_number_field(search_vectors, data) continue elif json_field.value_type == "D": # only index year self._update_search_number_field(search_vectors, data.year) continue for lang in ("simple", settings.ISHTAR_SEARCH_LANGUAGE): with connection.cursor() as cursor: cursor.execute("SELECT to_tsvector(%s, %s)", [lang, data]) row = cursor.fetchone() search_vectors.append(row[0]) new_search_vector = merge_tsvectors(search_vectors) changed = old_search != new_search_vector self.search_vector = new_search_vector if save and changed: self.__class__.objects.filter(pk=self.pk).update( search_vector=new_search_vector ) elif not save: return new_search_vector return changed class Imported(models.Model): imports = models.ManyToManyField( Import, blank=True, related_name="imported_%(app_label)s_%(class)s" ) class Meta: abstract = True class JsonData(models.Model, CachedGen): data = JSONField(default={}, blank=True) class Meta: abstract = True def pre_save(self): if not self.data: self.data = {} @property def json_sections(self): sections = [] try: content_type = ContentType.objects.get_for_model(self) except ContentType.DoesNotExists: return sections JsonDataField = apps.get_model("ishtar_common", "JsonDataField") fields = list( JsonDataField.objects.filter( content_type=content_type, display=True, section__isnull=True ).all() ) # no section fields fields += list( JsonDataField.objects.filter( content_type=content_type, display=True, section__isnull=False ) .order_by("section__order", "order") .all() ) for field in fields: value = None data = self.data.copy() for key in field.key.split("__"): if key in data: value = copy.copy(data[key]) data = data[key] else: value = None break if value is None: continue if type(value) in (list, tuple): value = " ; ".join([str(v) for v in value]) section_name = field.section.name if field.section else None if not sections or section_name != sections[-1][0]: # if section name is identical it is the same sections.append((section_name, [])) sections[-1][1].append((field.name, value)) return sections @classmethod def refresh_cache(cls): __, refreshed = get_cache(cls, ["cache_refreshed"]) if refreshed and time.time() - refreshed < 1: return cache_ckey, current_keys = get_cache(cls, ["_current_keys"]) if not current_keys: return for keys in current_keys: if keys[0] == "__get_dynamic_choices": cls._get_dynamic_choices(keys[1], force=True) @classmethod def _get_dynamic_choices(cls, key, force=False): """ Get choice from existing values :param key: data key :param force: if set to True do not use cache :return: tuple of choices (id, value) """ cache_key, value = get_cache(cls, ["__get_dynamic_choices", key]) if not force and value: return value choices = set() splitted_key = key[len("data__") :].split("__") q = cls.objects.filter(data__has_key=key[len("data__") :]).values_list( "data", flat=True ) for value in q.all(): for k in splitted_key: value = value[k] choices.add(value) choices = [("", "")] + [(v, v) for v in sorted(list(choices))] cache.set(cache_key, choices, settings.CACHE_SMALLTIMEOUT) return choices class FixAssociated: ASSOCIATED = {} def fix_associated(self): for key in self.ASSOCIATED: item = getattr(self, key) if not item: continue dct = self.ASSOCIATED[key] for dct_key in dct: subkey, ctype = dct_key expected_values = dct[dct_key] if not isinstance(expected_values, (list, tuple)): expected_values = [expected_values] if hasattr(ctype, "txt_idx"): try: expected_values = [ ctype.objects.get(txt_idx=v) for v in expected_values ] except ctype.DoesNotExist: # type not yet initialized return current_vals = getattr(item, subkey) is_many = False if hasattr(current_vals, "all"): is_many = True current_vals = current_vals.all() else: current_vals = [current_vals] is_ok = False for current_val in current_vals: if current_val in expected_values: is_ok = True break if is_ok: continue # the first value is used new_value = expected_values[0] if is_many: getattr(item, subkey).add(new_value) else: setattr(item, subkey, new_value) class CascasdeUpdate: DOWN_MODEL_UPDATE = [] def cascade_update(self): for down_model in self.DOWN_MODEL_UPDATE: if not settings.USE_BACKGROUND_TASK: rel = getattr(self, down_model) if hasattr(rel.model, "need_update"): rel.update(need_update=True) continue for item in getattr(self, down_model).all(): cached_label_changed(item.__class__, instance=item) if hasattr(item, "point_2d"): post_save_geo(item.__class__, instance=item) class SearchAltName(object): def __init__( self, search_key, search_query, extra_query=None, distinct_query=False ): self.search_key = search_key self.search_query = search_query self.extra_query = extra_query or {} self.distinct_query = distinct_query class HistoryError(Exception): def __init__(self, value): self.value = value def __str__(self): return repr(self.value) class HistoricalRecords(BaseHistoricalRecords): def _save_historic( self, manager, instance, history_date, history_type, history_user, history_change_reason, using, attrs, ): history_instance = manager.model( history_date=history_date, history_type=history_type, history_user=history_user, history_change_reason=history_change_reason, **attrs ) pre_create_historical_record.send( sender=manager.model, instance=instance, history_date=history_date, history_user=history_user, history_change_reason=history_change_reason, history_instance=history_instance, using=using, ) history_instance.save(using=using) post_create_historical_record.send( sender=manager.model, instance=instance, history_instance=history_instance, history_date=history_date, history_user=history_user, history_change_reason=history_change_reason, using=using, ) def create_historical_record(self, instance, history_type, using=None): try: history_modifier = getattr(instance, "history_modifier", None) assert history_modifier except (User.DoesNotExist, AssertionError): # on batch removing of users, user could have disappeared return history_date = getattr(instance, "_history_date", datetime.datetime.now()) history_change_reason = getattr(instance, "changeReason", None) force = getattr(instance, "_force_history", False) manager = getattr(instance, self.manager_name) attrs = {} for field in instance._meta.fields: attrs[field.attname] = getattr(instance, field.attname) q_history = instance.history.filter( history_modifier_id=history_modifier.pk ).order_by("-history_date", "-history_id") # instance.skip_history_when_saving = True if not q_history.count(): if force: delattr(instance, "_force_history") self._save_historic( manager, instance, history_date, history_type, history_modifier, history_change_reason, using, attrs, ) return old_instance = q_history.all()[0] # multiple saving by the same user in a very short time are generaly # caused by post_save signals it is not relevant to keep them min_history_date = datetime.datetime.now() - datetime.timedelta(seconds=5) q = q_history.filter( history_date__isnull=False, history_date__gt=min_history_date ).order_by("-history_date", "-history_id") if not force and q.count(): return if force: delattr(instance, "_force_history") # record a new version only if data have been changed for field in instance._meta.fields: if getattr(old_instance, field.attname) != attrs[field.attname]: self._save_historic( manager, instance, history_date, history_type, history_modifier, history_change_reason, using, attrs, ) return class BaseHistorizedItem( StatisticItem, TemplateItem, FullSearch, Imported, JsonData, FixAssociated, CascasdeUpdate, ): """ Historized item with external ID management. All historized items are searchable and have a data json field. Historized items can be "locked" for edition. """ IS_BASKET = False SHOW_URL = None EXTERNAL_ID_KEY = "" EXTERNAL_ID_DEPENDENCIES = [] HISTORICAL_M2M = [] history_modifier = models.ForeignKey( User, related_name="+", on_delete=models.SET_NULL, verbose_name=_("Last editor"), blank=True, null=True, ) history_creator = models.ForeignKey( User, related_name="+", on_delete=models.SET_NULL, verbose_name=_("Creator"), blank=True, null=True, ) last_modified = models.DateTimeField(auto_now=True) history_m2m = JSONField(default={}, blank=True) need_update = models.BooleanField(verbose_name=_("Need update"), default=False) locked = models.BooleanField( verbose_name=_("Item locked for edition"), default=False ) lock_user = models.ForeignKey( User, related_name="+", on_delete=models.SET_NULL, verbose_name=_("Locked by"), blank=True, null=True, ) ALT_NAMES = { "history_creator": SearchAltName( pgettext_lazy("key for text search", "created-by"), "history_creator__ishtaruser__person__cached_label__iexact", ), "history_modifier": SearchAltName( pgettext_lazy("key for text search", "modified-by"), "history_modifier__ishtaruser__person__cached_label__iexact", ), "modified_before": SearchAltName( pgettext_lazy("key for text search", "modified-before"), "last_modified__lte", ), "modified_after": SearchAltName( pgettext_lazy("key for text search", "modified-after"), "last_modified__gte" ), } class Meta: abstract = True @classmethod def get_verbose_name(cls): return cls._meta.verbose_name def is_locked(self, user=None): if not user: return self.locked return self.locked and (not self.lock_user or self.lock_user != user) def merge(self, item, keep_old=False): merge_model_objects(self, item, keep_old=keep_old) def public_representation(self): return {} def duplicate(self, user=None, data=None): return duplicate_item(self, user, data) def update_external_id(self, save=False): if not self.EXTERNAL_ID_KEY or ( self.external_id and not getattr(self, "auto_external_id", False) ): return external_id = get_generated_id(self.EXTERNAL_ID_KEY, self) if external_id == self.external_id: return self.auto_external_id = True self.external_id = external_id self._cached_label_checked = False if save: self.skip_history_when_saving = True self.save() return external_id def get_last_history_date(self): q = self.history.values("history_date").order_by("-history_date") if not q.count(): return return q.all()[0]["history_date"] def get_previous(self, step=None, date=None, strict=False): """ Get a "step" previous state of the item """ assert step or date historized = self.history.all() item = None if step: if len(historized) <= step: # silently return the last step if too far in the history item = historized[len(historized) - 1] else: item = historized[step] else: for step, item in enumerate(historized): if item.history_date == date: break # ended with no match if item.history_date != date: return item._step = step if len(historized) != (step + 1): item._previous = historized[step + 1].history_date else: item._previous = None if step > 0: item._next = historized[step - 1].history_date else: item._next = None item.history_date = historized[step].history_date model = self.__class__ for k in get_all_field_names(model): field = model._meta.get_field(k) if hasattr(field, "rel") and field.rel: if not hasattr(item, k + "_id"): setattr(item, k, getattr(self, k)) continue val = getattr(item, k + "_id") if not val: setattr(item, k, None) continue try: val = field.rel.to.objects.get(pk=val) setattr(item, k, val) except ObjectDoesNotExist: if strict: raise HistoryError( "The class %s has no pk %d" % (str(field.rel.to), val) ) setattr(item, k, None) item.pk = self.pk return item @property def last_edition_date(self): try: return self.history.order_by("-history_date").all()[0].history_date except (AttributeError, IndexError): return @property def history_creation_date(self): try: return self.history.order_by("history_date").all()[0].history_date except (AttributeError, IndexError): return def rollback(self, date): """ Rollback to a previous state """ to_del, new_item = [], None for item in self.history.all(): if item.history_date == date: new_item = item break to_del.append(item) if not new_item: raise HistoryError("The date to rollback to doesn't exist.") try: field_keys = [f.name for f in self._meta.fields] for k in field_keys: if k != "id" and hasattr(self, k): if not hasattr(new_item, k): k = k + "_id" setattr(self, k, getattr(new_item, k)) try: self.history_modifier = User.objects.get( pk=new_item.history_modifier_id ) except User.ObjectDoesNotExist: pass self.save() saved_m2m = new_item.history_m2m.copy() for hist_key in self.HISTORICAL_M2M: # after each association m2m is rewrite - force the original # to be reset new_item.history_m2m = saved_m2m values = new_item.m2m_listing(hist_key, create=True) or [] hist_field = getattr(self, hist_key) hist_field.clear() for val in values: hist_field.add(val) # force label regeneration self._cached_label_checked = False self.save() except ObjectDoesNotExist: raise HistoryError("The rollback has failed.") # clean the obsolete history for historized_item in to_del: historized_item.delete() def m2m_listing(self, key): return getattr(self, key).all() def values(self): values = {} for f in self._meta.fields: k = f.name if k != "id": values[k] = getattr(self, k) return values def get_absolute_url(self): try: return reverse("display-item", args=[self.SLUG, self.pk]) except NoReverseMatch: return def get_show_url(self): show_url = self.SHOW_URL if not show_url: show_url = "show-" + self.__class__.__name__.lower() try: return reverse(show_url, args=[self.pk, ""]) except NoReverseMatch: return @property def associated_filename(self): if [ True for attr in ( "get_town_label", "get_department", "reference", "short_class_name", ) if not hasattr(self, attr) ]: return "" items = [ slugify(self.get_department()), slugify(self.get_town_label()).upper(), slugify(self.short_class_name), slugify(self.reference), slugify(self.name or "").replace("-", "_").capitalize(), ] last_edition_date = self.last_edition_date if last_edition_date: items.append(last_edition_date.strftime("%Y%m%d")) else: items.append("00000000") return "-".join([str(item) for item in items]) def save(self, *args, **kwargs): created = not self.pk if not getattr(self, "skip_history_when_saving", False): assert hasattr(self, "history_modifier") if created: self.history_creator = self.history_modifier # external ID can have related item not available before save external_id_updated = ( kwargs.pop("external_id_updated") if "external_id_updated" in kwargs else False ) if not created and not external_id_updated: self.update_external_id() super(BaseHistorizedItem, self).save(*args, **kwargs) if created and self.update_external_id(): # force resave for external ID creation self.skip_history_when_saving = True self._updated_id = True return self.save(external_id_updated=True) for dep in self.EXTERNAL_ID_DEPENDENCIES: for obj in getattr(self, dep).all(): obj.update_external_id(save=True) self.fix_associated() return True class LightHistorizedItem(BaseHistorizedItem): history_date = models.DateTimeField(default=datetime.datetime.now) class Meta: abstract = True def save(self, *args, **kwargs): super(LightHistorizedItem, self).save(*args, **kwargs) return self class OwnPerms(object): """ Manage special permissions for object's owner """ @classmethod def get_query_owns(cls, ishtaruser): """ Query object to get own items """ return None # implement for each object def can_view(self, request): if hasattr(self, "LONG_SLUG"): perm = "view_" + self.LONG_SLUG else: perm = "view_" + self.SLUG return self.can_do(request, perm) def can_do(self, request, action_name): """ Check permission availability for the current object. :param request: request object :param action_name: action name eg: "change_find" - "own" variation is checked :return: boolean """ if not getattr(request.user, "ishtaruser", None): return False splited = action_name.split("_") action_own_name = splited[0] + "_own_" + "_".join(splited[1:]) user = request.user if action_own_name == "view_own_findbasket": action_own_name = "view_own_find" return user.ishtaruser.has_right(action_name, request.session) or ( user.ishtaruser.has_right(action_own_name, request.session) and self.is_own(user.ishtaruser) ) def is_own(self, user, alt_query_own=None): """ Check if the current object is owned by the user """ IshtarUser = apps.get_model("ishtar_common", "IshtarUser") if isinstance(user, IshtarUser): ishtaruser = user elif hasattr(user, "ishtaruser"): ishtaruser = user.ishtaruser else: return False if not alt_query_own: query = self.get_query_owns(ishtaruser) else: query = getattr(self, alt_query_own)(ishtaruser) if not query: return False query &= Q(pk=self.pk) return self.__class__.objects.filter(query).count() @classmethod def has_item_of(cls, user): """ Check if the user own some items """ IshtarUser = apps.get_model("ishtar_common", "IshtarUser") if isinstance(user, IshtarUser): ishtaruser = user elif hasattr(user, "ishtaruser"): ishtaruser = user.ishtaruser else: return False query = cls.get_query_owns(ishtaruser) if not query: return False return cls.objects.filter(query).count() @classmethod def _return_get_owns( cls, owns, values, get_short_menu_class, label_key="cached_label" ): if not owns: return [] sorted_values = [] if hasattr(cls, "BASKET_MODEL"): owns_len = len(owns) for idx, item in enumerate(reversed(owns)): if get_short_menu_class: item = item[0] if type(item) == cls.BASKET_MODEL: basket = owns.pop(owns_len - idx - 1) sorted_values.append(basket) sorted_values = list(reversed(sorted_values)) if not values: if not get_short_menu_class: return sorted_values + list( sorted(owns, key=lambda x: getattr(x, label_key) or "") ) return sorted_values + list( sorted(owns, key=lambda x: getattr(x[0], label_key) or "") ) if not get_short_menu_class: return sorted_values + list(sorted(owns, key=lambda x: x[label_key] or "")) return sorted_values + list(sorted(owns, key=lambda x: x[0][label_key] or "")) @classmethod def get_owns( cls, user, replace_query=None, limit=None, values=None, get_short_menu_class=False, menu_filtr=None, ): """ Get Own items """ if not replace_query: replace_query = {} if hasattr(user, "is_authenticated") and not user.is_authenticated(): returned = cls.objects.filter(pk__isnull=True) if values: returned = [] return returned IshtarUser = apps.get_model("ishtar_common", "IshtarUser") if isinstance(user, User): try: ishtaruser = IshtarUser.objects.get(user_ptr=user) except IshtarUser.DoesNotExist: returned = cls.objects.filter(pk__isnull=True) if values: returned = [] return returned elif isinstance(user, IshtarUser): ishtaruser = user else: if values: return [] return cls.objects.filter(pk__isnull=True) items = [] if hasattr(cls, "BASKET_MODEL"): items = list(cls.BASKET_MODEL.objects.filter(user=ishtaruser).all()) query = cls.get_query_owns(ishtaruser) if not query and not replace_query: returned = cls.objects.filter(pk__isnull=True) if values: returned = [] return returned if query: q = cls.objects.filter(query) else: # replace_query q = cls.objects.filter(replace_query) if values: q = q.values(*values) if limit: items += list(q.order_by("-pk")[:limit]) else: items += list(q.order_by(*cls._meta.ordering).all()) if get_short_menu_class: if values: if "id" not in values: raise NotImplementedError( "Call of get_owns with get_short_menu_class option and" " no 'id' in values is not implemented" ) my_items = [] for i in items: if hasattr(cls, "BASKET_MODEL") and type(i) == cls.BASKET_MODEL: dct = dict([(k, getattr(i, k)) for k in values]) my_items.append( (dct, cls.BASKET_MODEL.get_short_menu_class(i.pk)) ) else: my_items.append((i, cls.get_short_menu_class(i["id"]))) items = my_items else: items = [(i, cls.get_short_menu_class(i.pk)) for i in items] return items @classmethod def _get_query_owns_dicts(cls, ishtaruser): """ List of query own dict to construct the query. Each dict are join with an AND operator, each dict key, values are joined with OR operator """ return [] @classmethod def _construct_query_own(cls, prefix, dct_list): q = None for subquery_dict in dct_list: subquery = None for k in subquery_dict: subsubquery = Q(**{prefix + k: subquery_dict[k]}) if subquery: subquery |= subsubquery else: subquery = subsubquery if not subquery: continue if q: q &= subquery else: q = subquery return q class NumberManager(models.Manager): def get_by_natural_key(self, number): return self.get(number=number) class State(models.Model): label = models.CharField(_("Label"), max_length=30) number = models.CharField(_("Number"), unique=True, max_length=3) objects = NumberManager() class Meta: verbose_name = _("State") ordering = ["number"] def __str__(self): return self.label def natural_key(self): return (self.number,) class Department(models.Model): label = models.CharField(_("Label"), max_length=30) number = models.CharField(_("Number"), unique=True, max_length=3) state = models.ForeignKey( "State", verbose_name=_("State"), blank=True, null=True, on_delete=models.SET_NULL, ) objects = NumberManager() class Meta: verbose_name = _("Department") verbose_name_plural = _("Departments") ordering = ["number"] def __str__(self): return self.label def natural_key(self): return (self.number,) def history_compress(self): return self.number @classmethod def history_decompress(cls, full_value, create=False): if not full_value: return [] res = [] for value in full_value: try: res.append(cls.objects.get(number=value)) except cls.DoesNotExist: continue return res class Arrondissement(models.Model): name = models.CharField("Nom", max_length=30) department = models.ForeignKey(Department, verbose_name="Département") def __str__(self): return settings.JOINT.join((self.name, str(self.department))) class Canton(models.Model): name = models.CharField("Nom", max_length=30) arrondissement = models.ForeignKey(Arrondissement, verbose_name="Arrondissement") def __str__(self): return settings.JOINT.join((self.name, str(self.arrondissement))) class TownManager(models.GeoManager): def get_by_natural_key(self, numero_insee, year): return self.get(numero_insee=numero_insee, year=year) class Town(Imported, models.Model): name = models.CharField(_("Name"), max_length=100) surface = models.IntegerField(_("Surface (m2)"), blank=True, null=True) center = models.PointField( _("Localisation"), srid=settings.SRID, blank=True, null=True ) limit = models.MultiPolygonField(_("Limit"), blank=True, null=True) numero_insee = models.CharField("Code commune (numéro INSEE)", max_length=120) departement = models.ForeignKey( Department, verbose_name=_("Department"), on_delete=models.SET_NULL, null=True, blank=True, ) year = models.IntegerField( _("Year of creation"), null=True, blank=True, help_text=_( "Filling this field is relevant to distinguish old towns " "from new towns." ), ) children = models.ManyToManyField( "Town", verbose_name=_("Town children"), blank=True, related_name="parents" ) cached_label = models.CharField( _("Cached name"), max_length=500, null=True, blank=True, db_index=True ) objects = TownManager() class Meta: verbose_name = _("Town") verbose_name_plural = _("Towns") if settings.COUNTRY == "fr": ordering = ["numero_insee"] unique_together = (("numero_insee", "year"),) def natural_key(self): return (self.numero_insee, self.year) def history_compress(self): return {"numero_insee": self.numero_insee, "year": self.year or ""} @classmethod def get_documentation_string(cls): """ Used for automatic documentation generation """ return "**name** {}, **numero_insee** {}, **cached_label** {}".format( _("Name"), "Code commune (numéro INSEE)", _("Cached name") ) def get_values(self, prefix="", **kwargs): return { prefix or "label": str(self), prefix + "name": self.name, prefix + "numero_insee": self.numero_insee, } @classmethod def history_decompress(cls, full_value, create=False): if not full_value: return [] res = [] for value in full_value: try: res.append( cls.objects.get( numero_insee=value["numero_insee"], year=value["year"] or None ) ) except cls.DoesNotExist: continue return res def __str__(self): return self.cached_label or "" @property def label_with_areas(self): label = [self.name] if self.numero_insee: label.append("({})".format(self.numero_insee)) for area in self.areas.all(): label.append(" - ") label.append(area.full_label) return " ".join(label) def generate_geo(self, force=False): force = self.generate_limit(force=force) self.generate_center(force=force) self.generate_area(force=force) def generate_limit(self, force=False): if not force and self.limit: return parents = None if not self.parents.count(): return for parent in self.parents.all(): if not parent.limit: return if not parents: parents = parent.limit else: parents = parents.union(parent.limit) # if union is a simple polygon make it a multi if "MULTI" not in parents.wkt: parents = parents.wkt.replace("POLYGON", "MULTIPOLYGON(") + ")" if not parents: return self.limit = parents self.save() return True def generate_center(self, force=False): if not force and (self.center or not self.limit): return self.center = self.limit.centroid if not self.center: return False self.save() return True def generate_area(self, force=False): if not force and (self.surface or not self.limit): return surface = self.limit.transform(settings.SURFACE_SRID, clone=True).area if surface > 214748364 or not surface: return False self.surface = surface self.save() return True def update_town_code(self): if not self.numero_insee or not self.children.count() or not self.year: return old_num = self.numero_insee[:] numero = old_num.split("-")[0] self.numero_insee = "{}-{}".format(numero, self.year) if self.numero_insee != old_num: return True def _generate_cached_label(self): cached_label = self.name if settings.COUNTRY == "fr" and self.numero_insee: dpt_len = 2 if ( self.numero_insee.startswith("97") or self.numero_insee.startswith("98") or self.numero_insee[0] not in ("0", "1", "2", "3", "4", "5", "6", "7", "8", "9") ): dpt_len = 3 cached_label = "%s - %s" % (self.name, self.numero_insee[:dpt_len]) if self.year and self.children.count(): cached_label += " ({})".format(self.year) return cached_label def post_save_town(sender, **kwargs): cached_label_changed(sender, **kwargs) town = kwargs["instance"] town.generate_geo() if town.update_town_code(): town.save() post_save.connect(post_save_town, sender=Town) def town_child_changed(sender, **kwargs): town = kwargs["instance"] if town.update_town_code(): town.save() m2m_changed.connect(town_child_changed, sender=Town.children.through) class Address(BaseHistorizedItem): FIELDS = ( "address", "address_complement", "postal_code", "town", "precise_town", "country", "alt_address", "alt_address_complement", "alt_postal_code", "alt_town", "alt_country", "phone", "phone_desc", "phone2", "phone_desc2", "phone3", "phone_desc3", "raw_phone", "mobile_phone", "email", "alt_address_is_prefered", ) address = models.TextField(_("Address"), blank=True, default="") address_complement = models.TextField( _("Address complement"), blank=True, default="" ) postal_code = models.CharField( _("Postal code"), max_length=10, null=True, blank=True ) town = models.CharField(_("Town (freeform)"), max_length=150, null=True, blank=True) precise_town = models.ForeignKey( Town, verbose_name=_("Town (precise)"), null=True, blank=True ) country = models.CharField(_("Country"), max_length=30, null=True, blank=True) alt_address = models.TextField(_("Other address: address"), blank=True, default="") alt_address_complement = models.TextField( _("Other address: address complement"), blank=True, default="" ) alt_postal_code = models.CharField( _("Other address: postal code"), max_length=10, null=True, blank=True ) alt_town = models.CharField( _("Other address: town"), max_length=70, null=True, blank=True ) alt_country = models.CharField( _("Other address: country"), max_length=30, null=True, blank=True ) phone = models.CharField(_("Phone"), max_length=18, null=True, blank=True) phone_desc = models.CharField( _("Phone description"), max_length=300, null=True, blank=True ) phone2 = models.CharField( _("Phone description 2"), max_length=18, null=True, blank=True ) phone_desc2 = models.CharField( _("Phone description 2"), max_length=300, null=True, blank=True ) phone3 = models.CharField(_("Phone 3"), max_length=18, null=True, blank=True) phone_desc3 = models.CharField( _("Phone description 3"), max_length=300, null=True, blank=True ) raw_phone = models.TextField(_("Raw phone"), blank=True, default="") mobile_phone = models.CharField( _("Mobile phone"), max_length=18, null=True, blank=True ) email = models.EmailField(_("Email"), max_length=300, blank=True, null=True) alt_address_is_prefered = models.BooleanField( _("Alternative address is prefered"), default=False ) history = HistoricalRecords(inherit=True) SUB_ADDRESSES = [] class Meta: abstract = True def get_short_html_items(self): items = [] if self.address: items.append("""{}""".format(self.address)) if self.address_complement: items.append( """{}""".format( self.address_complement ) ) if self.postal_code: items.append( """{}""".format(self.postal_code) ) if self.precise_town: items.append( """{}""".format(self.precise_town.name) ) elif self.town: items.append("""{}""".format(self.town)) if self.country: items.append("""{}""".format(self.country)) return items def get_short_html_detail(self): html = """
""" items = self.get_short_html_items() if not items: items = [ "{}".format(_("No associated address")) ] html += "".join(items) html += """
""" return html def get_town_centroid(self): if self.precise_town: return self.precise_town.center, self._meta.verbose_name for sub_address in self.SUB_ADDRESSES: sub_item = getattr(self, sub_address) if sub_item and sub_item.precise_town: return sub_item.precise_town.center, sub_item._meta.verbose_name def get_town_polygons(self): if self.precise_town: return self.precise_town.limit, self._meta.verbose_name for sub_address in self.SUB_ADDRESSES: sub_item = getattr(self, sub_address) if sub_item and sub_item.precise_town: return sub_item.precise_town.limit, sub_item._meta.verbose_name def get_attribute(self, attr): if self.town or self.precise_town: return getattr(self, attr) for sub_address in self.SUB_ADDRESSES: sub_item = getattr(self, sub_address) if not sub_item: continue if sub_item.town or sub_item.precise_town: return getattr(sub_item, attr) return getattr(self, attr) def get_address(self): return self.get_attribute("address") def get_address_complement(self): return self.get_attribute("address_complement") def get_postal_code(self): return self.get_attribute("postal_code") def get_town(self): return self.get_attribute("town") def get_precise_town(self): return self.get_attribute("precise_town") def get_country(self): return self.get_attribute("country") def simple_lbl(self): return str(self) def full_address(self): lbl = self.simple_lbl() if lbl: lbl += "\n" lbl += self.address_lbl() return lbl def address_lbl(self): lbl = "" prefix = "" if self.alt_address_is_prefered: prefix = "alt_" if getattr(self, prefix + "address"): lbl += getattr(self, prefix + "address") if getattr(self, prefix + "address_complement"): if lbl: lbl += "\n" lbl += getattr(self, prefix + "address_complement") postal_code = getattr(self, prefix + "postal_code") town = getattr(self, prefix + "town") if postal_code or town: if lbl: lbl += "\n" lbl += "{}{}{}".format( postal_code or "", " " if postal_code and town else "", town or "" ) if self.phone: if lbl: lbl += "\n" lbl += "{} {}".format(str(_("Tel: ")), self.phone) if self.mobile_phone: if lbl: lbl += "\n" lbl += "{} {}".format(str(_("Mobile: ")), self.mobile_phone) if self.email: if lbl: lbl += "\n" lbl += "{} {}".format(str(_("Email: ")), self.email) return lbl class Merge(models.Model): merge_key = models.TextField(_("Merge key"), blank=True, null=True) merge_candidate = models.ManyToManyField("self", blank=True) merge_exclusion = models.ManyToManyField("self", blank=True) archived = models.NullBooleanField(default=False, blank=True, null=True) # 1 for one word similarity, 2 for two word similarity, etc. MERGE_CLEMENCY = None EMPTY_MERGE_KEY = "--" MERGE_ATTRIBUTE = "name" class Meta: abstract = True def generate_merge_key(self): if self.archived: return merge_attr = getattr(self, self.MERGE_ATTRIBUTE) self.merge_key = slugify(merge_attr if merge_attr else "") if not self.merge_key: self.merge_key = self.EMPTY_MERGE_KEY self.merge_key = self.merge_key def generate_merge_candidate(self): if self.archived: return if not self.merge_key: self.generate_merge_key() self.save(merge_key_generated=True) if not self.pk or self.merge_key == self.EMPTY_MERGE_KEY: return q = ( self.__class__.objects.exclude(pk=self.pk) .exclude(merge_exclusion=self) .exclude(merge_candidate=self) .exclude(archived=True) ) if not self.MERGE_CLEMENCY: q = q.filter(merge_key=self.merge_key) else: subkeys_front = "-".join(self.merge_key.split("-")[: self.MERGE_CLEMENCY]) subkeys_back = "-".join(self.merge_key.split("-")[-self.MERGE_CLEMENCY :]) q = q.filter( Q(merge_key__istartswith=subkeys_front) | Q(merge_key__iendswith=subkeys_back) ) for item in q.all(): self.merge_candidate.add(item) def save(self, *args, **kwargs): # prevent circular save merge_key_generated = False if "merge_key_generated" in kwargs: merge_key_generated = kwargs.pop("merge_key_generated") self.generate_merge_key() item = super(Merge, self).save(*args, **kwargs) if not merge_key_generated: self.merge_candidate.clear() self.generate_merge_candidate() return item def archive(self): self.archived = True self.save() self.merge_candidate.clear() self.merge_exclusion.clear() def merge(self, item, keep_old=False, exclude_fields=None): merge_model_objects( self, item, keep_old=keep_old, exclude_fields=exclude_fields ) self.generate_merge_candidate() def __get_stats_cache_values(model_name, model_pk): StatsCache = apps.get_model("ishtar_common", "StatsCache") q = StatsCache.objects.filter(model=model_name, model_pk=model_pk) nb = q.count() if nb >= 1: sc = q.all()[0] for extra in q.order_by("-id").all()[1:]: extra.delete() else: sc = StatsCache.objects.create(model=model_name, model_pk=model_pk) values = sc.values if not values: values = {} return sc, values @task() def _update_stats(app, model, model_pk, funcname): model_name = app + "." + model model = apps.get_model(app, model) try: item = model.objects.get(pk=model_pk) except model.DoesNotExist: return value = getattr(item, funcname)() sc, current_values = __get_stats_cache_values(model_name, model_pk) current_values[funcname] = value sc.values = current_values sc.update_requested = None sc.updated = datetime.datetime.now() sc.save() def update_stats(statscache, item, funcname): if not settings.USE_BACKGROUND_TASK: current_values = statscache.values if not current_values: current_values = {} value = getattr(item, funcname)() current_values[funcname] = value statscache.values = current_values statscache.updated = datetime.datetime.now() statscache.save() return current_values now = datetime.datetime.now() app_name = item._meta.app_label model_name = item._meta.model_name statscache.update_requested = now.isoformat() statscache.save() _update_stats.delay(app_name, model_name, item.pk, funcname) return statscache.values class DashboardFormItem: """ Provide methods to manage statistics """ def last_stats_update(self): model_name = self._meta.app_label + "." + self._meta.model_name StatsCache = apps.get_model("ishtar_common", "StatsCache") q = StatsCache.objects.filter(model=model_name, model_pk=self.pk).order_by( "-updated" ) if not q.count(): return return q.all()[0].updated def _get_or_set_stats(self, funcname, update=False, expected_type=None): model_name = self._meta.app_label + "." + self._meta.model_name StatsCache = apps.get_model("ishtar_common", "StatsCache") sc, __ = StatsCache.objects.get_or_create(model=model_name, model_pk=self.pk) if not update: values = sc.values if funcname not in values: if expected_type is not None: return expected_type() return 0 else: values = update_stats(sc, self, funcname) if funcname in values: values = values[funcname] else: values = 0 if expected_type is not None and not isinstance(values, expected_type): return expected_type() return values @classmethod def get_periods(cls, slice="month", fltr={}, date_source="creation"): date_var = date_source + "_date" q = cls.objects.filter(**{date_var + "__isnull": False}) if fltr: q = q.filter(**fltr) if slice == "year": return [ res[date_var].year for res in list(q.values(date_var).annotate(Count("id")).order_by()) ] elif slice == "month": return [ (res[date_var].year, res[date_var].month) for res in list(q.values(date_var).annotate(Count("id")).order_by()) ] return [] @classmethod def get_by_year(cls, year, fltr={}, date_source="creation"): date_var = date_source + "_date" q = cls.objects.filter(**{date_var + "__isnull": False}) if fltr: q = q.filter(**fltr) return q.filter(**{date_var + "__year": year}).order_by("pk").distinct("pk") @classmethod def get_by_month(cls, year, month, fltr={}, date_source="creation"): date_var = date_source + "_date" q = cls.objects.filter(**{date_var + "__isnull": False}) if fltr: q = q.filter(**fltr) q = q.filter(**{date_var + "__year": year, date_var + "__month": month}) return q.order_by("pk").distinct("pk") @classmethod def get_total_number(cls, fltr=None): q = cls.objects if fltr: q = q.filter(**fltr) return q.order_by("pk").distinct("pk").count() class DocumentItem: ALT_NAMES = { "documents__image__isnull": SearchAltName( pgettext_lazy("key for text search", "has-image"), "documents__image__isnull", ), "documents__associated_url__isnull": SearchAltName( pgettext_lazy("key for text search", "has-url"), "documents__associated_url__isnull", ), "documents__associated_file__isnull": SearchAltName( pgettext_lazy("key for text search", "has-attached-file"), "documents__associated_file__isnull", ), } def public_representation(self): images = [] if getattr(self, "main_image", None): images.append(self.main_image.public_representation()) images += [ image.public_representation() for image in self.images_without_main_image.all() ] return {"images": images} @property def images(self): if not hasattr(self, "documents"): Document = apps.get_model("ishtar_common", "Document") return Document.objects.none() return ( self.documents.filter(image__isnull=False).exclude(image="").order_by("pk") ) @property def images_without_main_image(self): if not hasattr(self, "main_image") or not hasattr(self, "documents"): return self.images if not self.main_image: return ( self.documents.filter(image__isnull=False) .exclude(image="") .order_by("pk") ) return ( self.documents.filter(image__isnull=False) .exclude(image="") .exclude(pk=self.main_image.pk) .order_by("pk") ) @property def pdf_attached(self): for document in self.documents.filter( Q(associated_file__isnull=False) | Q(source__associated_file__isnull=False) ).all(): return document.pdf_attached def get_extra_actions(self, request): """ For sheet template: return "Add document / image" action """ # url, base_text, icon, extra_text, extra css class, is a quick action try: actions = super(DocumentItem, self).get_extra_actions(request) except AttributeError: actions = [] if not hasattr(self, "SLUG"): return actions can_add_doc = self.can_do(request, "add_document") if can_add_doc and ( not hasattr(self, "is_locked") or not self.is_locked(request.user) ): actions += [ ( reverse("create-document") + "?{}={}".format(self.SLUG, self.pk), _("Add document/image"), "fa fa-plus", _("doc./image"), "", False, ) ] return actions def clean_duplicate_association(document, related_item, action): profile = get_current_profile() if not profile.clean_redundant_document_association or action != "post_add": return class_name = related_item.__class__.__name__ if class_name not in ("Find", "ContextRecord", "Operation"): return if class_name == "Find": for cr in document.context_records.filter( base_finds__find__pk=related_item.pk ).all(): document.context_records.remove(cr) for ope in document.operations.filter( context_record__base_finds__find__pk=related_item.pk ).all(): document.operations.remove(ope) return if class_name == "ContextRecord": for ope in document.operations.filter(context_record__pk=related_item.pk).all(): document.operations.remove(ope) if document.finds.filter(base_finds__context_record=related_item.pk).count(): document.context_records.remove(related_item) return if class_name == "Operation": if document.context_records.filter(operation=related_item.pk).count(): document.operations.remove(related_item) return if document.finds.filter( base_finds__context_record__operation=related_item.pk ).count(): document.operations.remove(related_item) return def document_attached_changed(sender, **kwargs): # associate a default main image instance = kwargs.get("instance", None) model = kwargs.get("model", None) pk_set = kwargs.get("pk_set", None) if not instance or not model: return if hasattr(instance, "documents"): items = [instance] else: if not pk_set: return try: items = [model.objects.get(pk=pk) for pk in pk_set] except model.DoesNotExist: return for item in items: clean_duplicate_association(instance, item, kwargs.get("action", None)) for doc in item.documents.all(): doc.regenerate_all_ids() q = item.documents.filter(image__isnull=False).exclude(image="") if item.main_image: if q.filter(pk=item.main_image.pk).count(): return # the association has disappear not the main image anymore item.main_image = None item.skip_history_when_saving = True item.save() if not q.count(): return # by default get the lowest pk item.main_image = q.order_by("pk").all()[0] item.skip_history_when_saving = True item.save() class QuickAction: """ Quick action available from tables """ def __init__( self, url, icon_class="", text="", target=None, rights=None, module=None ): self.url = url self.icon_class = icon_class self.text = text self.rights = rights self.target = target self.module = module assert self.target in ("one", "many", None) def is_available(self, user, session=None, obj=None): if self.module and not getattr(get_current_profile(), self.module): return False if not self.rights: # no restriction return True if not user or not hasattr(user, "ishtaruser") or not user.ishtaruser: return False user = user.ishtaruser for right in self.rights: if user.has_perm(right, session=session, obj=obj): return True return False @property def rendered_icon(self): if not self.icon_class: return "" return "".format(self.icon_class) @property def base_url(self): if self.target is None: url = reverse(self.url) else: # put arbitrary pk for the target url = reverse(self.url, args=[0]) url = url[:-2] # all quick action url have to finish with the # pk of the selected item and a "/" return url class DynamicRequest: def __init__( self, label, app_name, model_name, form_key, search_key, type_query, search_query, ): self.label = label self.form_key = form_key self.search_key = search_key self.app_name = app_name self.model_name = model_name self.type_query = type_query self.search_query = search_query def get_all_types(self): model = apps.get_app_config(self.app_name).get_model(self.model_name) return model.objects.filter(available=True) def get_form_fields(self): fields = {} for item in self.get_all_types().all(): fields[self.form_key + "-" + item.txt_idx] = forms.CharField( label=str(self.label) + " " + str(item), required=False ) return fields def get_extra_query(self, slug): return {self.type_query: slug} def get_alt_names(self): alt_names = {} for item in self.get_all_types().all(): alt_names[self.form_key + "-" + item.txt_idx] = SearchAltName( self.search_key + "-" + item.txt_idx, self.search_query, self.get_extra_query(item.txt_idx), distinct_query=True, ) return alt_names class SpatialReferenceSystem(GeneralType): order = models.IntegerField(_("Order"), default=10) auth_name = models.CharField(_("Authority name"), default="EPSG", max_length=256) srid = models.IntegerField(_("Authority SRID")) class Meta: verbose_name = _("Spatial reference system") verbose_name_plural = _("Spatial reference systems") ordering = ("label",) @classmethod def get_documentation_string(cls): """ Used for automatic documentation generation """ doc = super(SpatialReferenceSystem, cls).get_documentation_string() doc += ", **srid** {}, **auth_name** {}".format( _("Authority SRID"), _("Authority name") ) return doc post_save.connect(post_save_cache, sender=SpatialReferenceSystem) post_delete.connect(post_save_cache, sender=SpatialReferenceSystem) class GeoItem(models.Model): GEO_SOURCE = (("T", _("Town")), ("P", _("Precise")), ("M", _("Polygon"))) # gis x = models.FloatField(_("X"), blank=True, null=True) y = models.FloatField(_("Y"), blank=True, null=True) z = models.FloatField(_("Z"), blank=True, null=True) estimated_error_x = models.FloatField( _("Estimated error for X"), blank=True, null=True ) estimated_error_y = models.FloatField( _("Estimated error for Y"), blank=True, null=True ) estimated_error_z = models.FloatField( _("Estimated error for Z"), blank=True, null=True ) spatial_reference_system = models.ForeignKey( SpatialReferenceSystem, verbose_name=_("Spatial Reference System"), blank=True, null=True, ) point = models.PointField(_("Point"), blank=True, null=True, dim=3) point_2d = models.PointField(_("Point (2D)"), blank=True, null=True) point_source = models.CharField( _("Point source"), choices=GEO_SOURCE, max_length=1, blank=True, null=True ) point_source_item = models.CharField( _("Point source item"), max_length=100, blank=True, null=True ) multi_polygon = models.MultiPolygonField(_("Multi polygon"), blank=True, null=True) multi_polygon_source = models.CharField( _("Multi-polygon source"), choices=GEO_SOURCE, max_length=1, blank=True, null=True, ) multi_polygon_source_item = models.CharField( _("Multi polygon source item"), max_length=100, blank=True, null=True ) GEO_LABEL = "" class Meta: abstract = True def get_town_centroid(self): raise NotImplementedError def get_town_polygons(self): raise NotImplementedError @property def X(self): """x coordinates using the default SRS""" coord = self.display_coordinates if not coord: return return coord[0] @property def Y(self): """y coordinates using the default SRS""" coord = self.display_coordinates if not coord: return return coord[1] @property def display_coordinates(self, rounded=True): if not self.point_2d: return "" profile = get_current_profile() if ( not profile.display_srs or not profile.display_srs.srid or ( profile.display_srs == self.spatial_reference_system and self.x and self.y ) ): x, y = self.x, self.y else: point = self.point_2d.transform(profile.display_srs.srid, clone=True) x, y = point.x, point.y if rounded: return round(x, 5), round(y, 5) return x, y @property def display_spatial_reference_system(self): profile = get_current_profile() if not profile.display_srs or not profile.display_srs.srid: return self.spatial_reference_system return profile.display_srs def get_precise_points(self): if self.point_source == "P" and self.point_2d: return self.point_2d, self.point, self.point_source_item def get_precise_polygons(self): if self.multi_polygon_source == "P" and self.multi_polygon: return self.multi_polygon, self.multi_polygon_source_item def get_geo_items(self, get_polygons=False, rounded=True): dict = { "type": "Feature", "geometry": {}, } if self.multi_polygon: if get_polygons: list_coords = [] for polygon in self.multi_polygon: list_coords.append([]) for linear_ring in range(len(polygon)): list_coords[-1].append([]) for coords in polygon[linear_ring]: point_2d = Point(coords[0], coords[1], srid=self.multi_polygon.srid) list_coords[-1][linear_ring].append(self.convert_coordinates(point_2d, rounded)) dict["geometry"]["type"] = "MultiPolygon" dict["geometry"]["coordinates"] = list_coords else: dict["geometry"]["type"] = "Point" dict["geometry"]["coordinates"] = self.convert_coordinates(self.multi_polygon.centroid, rounded) else: dict["geometry"]["type"] = "Point" x,y = self.display_coordinates dict["geometry"]["coordinates"] = [x,y] return dict def convert_coordinates(self, point_2d, rounded): profile = get_current_profile() if ( not profile.display_srs or not profile.display_srs.srid or ( profile.display_srs == self.spatial_reference_system and point_2d.x and point_2d.y) ): x, y = point_2d.x, point_2d.y else: point = point_2d.transform(profile.display_srs.srid, clone=True) x, y = point.x, point.y if rounded: return [round(x, 5), round(y, 5)] return [x, y] def most_precise_geo(self): if self.point_source == "M": return "multi_polygon" current_source = str(self.__class__._meta.verbose_name) if self.multi_polygon_source_item == current_source and ( self.multi_polygon_source == "P" or (self.point_source_item != current_source and self.point_source != "P") ): return "multi_polygon" if self.point_source_item == current_source and self.point_source == "P": return "point" if self.multi_polygon_source == "P": return "multi_polygon" if self.point_source == "P": return "point" if self.multi_polygon: return "multi_polygon" if self.point_2d: return "point" def geo_point_source(self): if not self.point_source: return "" return "{} - {}".format( dict(self.GEO_SOURCE)[self.point_source], self.point_source_item ) def geo_polygon_source(self): if not self.multi_polygon_source: return "" return "{} - {}".format( dict(self.GEO_SOURCE)[self.multi_polygon_source], self.multi_polygon_source_item, ) def _geojson_serialize(self, geom_attr): if not hasattr(self, geom_attr): return "" cached_label_key = "cached_label" if self.GEO_LABEL: cached_label_key = self.GEO_LABEL if getattr(self, "CACHED_LABELS", None): cached_label_key = self.CACHED_LABELS[-1] geojson = serialize( "geojson", self.__class__.objects.filter(pk=self.pk), geometry_field=geom_attr, fields=(cached_label_key,), ) geojson_dct = json.loads(geojson) profile = get_current_profile() precision = profile.point_precision features = geojson_dct.pop("features") for idx in range(len(features)): feature = features[idx] lbl = feature["properties"].pop(cached_label_key) feature["properties"]["name"] = lbl feature["properties"]["id"] = self.pk if precision is not None: geom_type = feature["geometry"].get("type", None) if geom_type == "Point": feature["geometry"]["coordinates"] = [ round(coord, precision) for coord in feature["geometry"]["coordinates"] ] geojson_dct["features"] = features geojson_dct["link_template"] = simple_link_to_window(self).replace( "999999", "" ) geojson = json.dumps(geojson_dct) return geojson @property def point_2d_geojson(self): return self._geojson_serialize("point_2d") @property def multi_polygon_geojson(self): return self._geojson_serialize("multi_polygon") class ImageContainerModel: def _get_image_path(self, filename): return "{}/{}".format(self._get_base_image_path(), filename) def _get_base_image_path(self): n = datetime.datetime.now() return "upload/{}/{:02d}/{:02d}".format(n.year, n.month, n.day) class CompleteIdentifierItem(models.Model, ImageContainerModel): HAS_QR_CODE = True complete_identifier = models.TextField( _("Complete identifier"), blank=True, default="" ) custom_index = models.IntegerField("Custom index", blank=True, null=True) qrcode = models.ImageField( upload_to=get_image_path, blank=True, null=True, max_length=255 ) class Meta: abstract = True @property def qrcode_path(self): if not self.qrcode: self.generate_qrcode() if not self.qrcode: # error on qrcode generation return "" return self.qrcode.path def generate_qrcode(self, request=None, secure=True, tmpdir=None): url = self.get_absolute_url() site = Site.objects.get_current() if request: scheme = request.scheme else: if secure: scheme = "https" else: scheme = "http" url = scheme + "://" + site.domain + url TinyUrl = apps.get_model("ishtar_common", "TinyUrl") tiny_url = TinyUrl() tiny_url.link = url tiny_url.save() short_url = ( scheme + "://" + site.domain + reverse("tiny-redirect", args=[tiny_url.get_short_id()]) ) qr = pyqrcode.create(short_url, version=settings.ISHTAR_QRCODE_VERSION) tmpdir_created = False if not tmpdir: tmpdir = tempfile.mkdtemp("-qrcode") tmpdir_created = True filename = tmpdir + os.sep + "qrcode.png" qr.png(filename, scale=settings.ISHTAR_QRCODE_SCALE) with open(filename, "rb") as qrfile: self.qrcode.save("qrcode.png", File(qrfile)) self.skip_history_when_saving = True self._no_move = True self.save() if tmpdir_created: shutil.rmtree(tmpdir) def generate_complete_identifier(self): SLUG = getattr(self, "SLUG", None) if not SLUG: return "" complete_identifier = get_generated_id(SLUG + "_complete_identifier", self) if complete_identifier: return complete_identifier cached_label_key = "cached_label" if getattr(self, "GEO_LABEL", None): cached_label_key = getattr(self, "GEO_LABEL", None) if hasattr(self, "CACHED_COMPLETE_ID"): cached_label_key = self.CACHED_COMPLETE_ID if not cached_label_key: return complete_identifier = getattr(self, cached_label_key) return complete_identifier def generate_custom_index(self, force=False): if not self.pk: return if self.custom_index and not force: return self.custom_index SLUG = getattr(self, "SLUG", None) if not SLUG: return k = SLUG + "_custom_index" profile = get_current_profile() if not hasattr(profile, k): return key = getattr(profile, k) if not key or not key.strip(): return keys = key.strip().split(";") if len(keys) == 1 and hasattr(self, "get_index_" + keys[0]): # custom index generation return getattr(self, "get_index_" + key)() model = self.__class__ try: self_keys = set(list(model.objects.filter(pk=self.pk).values_list(*keys))) except Exception: # bad settings - not managed here print("Bad settings for custom_index {}".format(";".join(keys))) return if len(self_keys) != 1: # key is not distinct return self_key = self_keys.pop() return self._get_index(keys, self_key) def _get_index(self, keys: list, self_keys: list): model = self.__class__ q = model.objects if self.pk: q = model.objects.exclude(pk=self.pk) for idx, key in enumerate(keys): q = q.filter(**{key: self_keys[idx]}) try: r = q.aggregate(max_index=Max("custom_index")) except Exception: # bad settings return if not r["max_index"]: return 1 return r["max_index"] + 1 def save(self, *args, **kwargs): super(CompleteIdentifierItem, self).save(*args, **kwargs) self.regenerate_all_ids() def regenerate_all_ids(self): if getattr(self, "_prevent_loop", False): return modified = False custom_index = self.generate_custom_index() if custom_index != self.custom_index: modified = True self.custom_index = custom_index complete_id = self.generate_complete_identifier() if complete_id: modified = True self.complete_identifier = complete_id if modified: self._prevent_loop = True self.skip_history_when_saving = True self.save() class SearchVectorConfig: def __init__(self, key, language=None, func=None): self.key = key if language: self.language = language if language == "local": self.language = settings.ISHTAR_SEARCH_LANGUAGE else: self.language = "simple" self.func = func def format(self, value): if value == "None": value = "" if not self.func: return [value] return self.func(value) class ShortMenuItem: """ Item available in the short menu """ UP_MODEL_QUERY = {} @classmethod def get_short_menu_class(cls, pk): return "" @property def short_class_name(self): return "" class MainItem(ShortMenuItem): """ Item with quick actions available from tables Extra actions are available from sheets """ QUICK_ACTIONS = [] @classmethod def get_quick_actions(cls, user, session=None, obj=None): """ Get a list of (url, title, icon, target) actions for an user """ qas = [] for action in cls.QUICK_ACTIONS: if not action.is_available(user, session=session, obj=obj): continue qas.append( [ action.base_url, mark_safe(action.text), mark_safe(action.rendered_icon), action.target or "", ] ) return qas @classmethod def get_quick_action_by_url(cls, url): for action in cls.QUICK_ACTIONS: if action.url == url: return action def regenerate_external_id(self): if not hasattr(self, "external_id"): return self.skip_history_when_saving = True self._no_move = True self.external_id = "" self.auto_external_id = True self.save() def get_extra_actions(self, request): if not hasattr(self, "SLUG"): return [] actions = [] if request.user.is_superuser and hasattr(self, "auto_external_id"): actions += [ ( reverse("regenerate-external-id") + "?{}={}".format(self.SLUG, self.pk), _("Regenerate ID"), "fa fa-key", _("regen."), "btn-info", True, 200, ) ] return actions