#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Generic models and tools for models
"""
import copy
from collections import OrderedDict
import datetime
import fiona
import json
import logging
import os
import pyqrcode
import re
import shutil
import tempfile
import time
from django import forms
from django.apps import apps
from django.conf import settings
from django.contrib.auth.models import User, Group
from django.contrib.contenttypes.models import ContentType
from django.contrib.contenttypes.fields import GenericForeignKey
from django.db import migrations as db_migrations
from django.contrib.gis.db import models
from django.contrib.gis.geos import GEOSGeometry, Point
from django.contrib.gis.gdal.error import GDALException
from django.contrib.postgres.fields import JSONField
from django.contrib.postgres.search import SearchVectorField, SearchVector
from django.contrib.sites.models import Site
from django.core.cache import cache
from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
from django.core.files import File
from django.core.serializers import serialize
from django.urls import reverse, NoReverseMatch
from django.core.validators import validate_slug
from django.db import connection, transaction, OperationalError, IntegrityError
from django.db.models import Q, Count, Max, fields
from django.db.models.signals import post_save, post_delete, m2m_changed
from django.template import loader
from django.template.defaultfilters import slugify
from django.utils.safestring import SafeText, mark_safe
from django.utils.translation import activate, deactivate
from ishtar_common.utils import (
    ugettext_lazy as _,
    pgettext_lazy,
    get_image_path,
    get_columns_from_class,
    human_date,
    reverse_list_coordinates,
)
from simple_history.models import HistoricalRecords as BaseHistoricalRecords
from simple_history.signals import (
    post_create_historical_record,
    pre_create_historical_record,
)
from unidecode import unidecode
from ishtar_common.model_managers import TypeManager
from ishtar_common.model_merging import merge_model_objects
from ishtar_common.models_imports import Import
from ishtar_common.templatetags.link_to_window import simple_link_to_window
from ishtar_common.utils import (
    get_cache,
    disable_for_loaddata,
    get_all_field_names,
    merge_tsvectors,
    cached_label_changed,
    post_save_geo,
    post_save_geodata,
    task,
    duplicate_item,
    get_generated_id,
    get_current_profile,
)
logger = logging.getLogger(__name__)
"""
from ishtar_common.models import GeneralType, get_external_id, \
    LightHistorizedItem, OwnPerms, Address, post_save_cache, \
    DashboardFormItem, document_attached_changed, SearchAltName, \
    DynamicRequest, GeoItem, QRCodeItem, SearchVectorConfig, DocumentItem, \
    QuickAction, MainItem, Merge
"""
class CachedGen(object):
    @classmethod
    def refresh_cache(cls):
        raise NotImplementedError()
    @classmethod
    def _add_cache_key_to_refresh(cls, keys):
        cache_ckey, current_keys = get_cache(cls, ["_current_keys"])
        if type(current_keys) != list:
            current_keys = []
        if keys not in current_keys:
            current_keys.append(keys)
            cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT)
class Cached(CachedGen):
    slug_field = "txt_idx"
    @classmethod
    def refresh_cache(cls):
        cache_ckey, current_keys = get_cache(cls, ["_current_keys"])
        if not current_keys:
            return
        for keys in current_keys:
            if len(keys) == 2 and keys[0] == "__slug":
                cls.get_cache(keys[1], force=True)
            elif keys[0] == "__get_types":
                default = None
                empty_first = True
                exclude = []
                if len(keys) >= 2:
                    default = keys.pop()
                    if len(keys) > 1:
                        empty_first = bool(keys.pop())
                    exclude = keys[1:]
                cls.get_types(
                    exclude=exclude,
                    empty_first=empty_first,
                    default=default,
                    force=True,
                )
            elif keys[0] == "__get_help":
                cls.get_help(force=True)
    @classmethod
    def _add_cache_key_to_refresh(cls, keys):
        cache_ckey, current_keys = get_cache(cls, ["_current_keys"])
        if type(current_keys) != list:
            current_keys = []
        if keys not in current_keys:
            current_keys.append(keys)
            cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT)
    @classmethod
    def get_cache(cls, slug, force=False):
        cache_key, value = get_cache(cls, ["__slug", slug])
        if not force and value:
            return value
        try:
            k = {cls.slug_field: slug}
            obj = cls.objects.get(**k)
            cache.set(cache_key, obj, settings.CACHE_TIMEOUT)
            return obj
        except cls.DoesNotExist:
            cache.set(cache_key, None, settings.CACHE_TIMEOUT)
            return None
@disable_for_loaddata
def post_save_cache(sender, **kwargs):
    sender.refresh_cache()
class GeneralType(Cached, models.Model):
    """
    Abstract class for "types"
    """
    label = models.TextField(_("Label"))
    txt_idx = models.TextField(
        _("Textual ID"),
        validators=[validate_slug],
        unique=True,
        help_text=_(
            "The slug is the standardized version of the name. It contains "
            "only lowercase letters, numbers and hyphens. Each slug must "
            "be unique."
        ),
    )
    comment = models.TextField(_("Comment"), blank=True, default="")
    available = models.BooleanField(_("Available"), default=True)
    HELP_TEXT = ""
    objects = TypeManager()
    class Meta:
        abstract = True
    def __str__(self):
        return self.label
    def natural_key(self):
        return (self.txt_idx,)
    def history_compress(self):
        return self.txt_idx
    @classmethod
    def get_documentation_string(cls):
        """
        Used for automatic documentation generation
        """
        s = "**label** {}, **txt_idx** {}".format(str(_("Label")), str(_("Textual ID")))
        if hasattr(cls, "extra_documentation_string"):
            s += cls.extra_documentation_string()
        return s
    @classmethod
    def admin_url(cls):
        return str(
            reverse(
                "admin:{}_{}_changelist".format(
                    cls._meta.app_label, cls._meta.model_name
                )
            )
        )
    @classmethod
    def history_decompress(cls, value, create=False):
        if not value:
            return []
        res = []
        for txt_idx in value:
            try:
                res.append(cls.objects.get(txt_idx=txt_idx))
            except cls.DoesNotExist:
                continue
        return res
    @property
    def explicit_label(self):
        return "{} ({})".format(self.label, self._meta.verbose_name)
    @classmethod
    def create_default_for_test(cls):
        return [cls.objects.create(label="Test %d" % i) for i in range(5)]
    @property
    def short_label(self):
        return self.label
    @property
    def name(self):
        return self.label
    @classmethod
    def get_or_create(cls, slug, label=""):
        """
        Get or create a new item.
        :param slug: textual id
        :param label: label for initialization if the item doesn't exist (not
        mandatory)
        :return: instancied item of the base class
        """
        item = cls.get_cache(slug)
        if item:
            return item
        item, created = cls.objects.get_or_create(
            txt_idx=slug, defaults={"label": label}
        )
        return item
    @classmethod
    def get_or_create_pk(cls, slug):
        """
        Get an id from a slug. Create the associated item if needed.
        :param slug: textual id
        :return: id of the item (string)
        """
        return str(cls.get_or_create(slug).pk)
    @classmethod
    def get_or_create_pks(cls, slugs):
        """
        Get and merge a list of ids from a slug list. Create the associated
        items if needed.
        :param slugs: textual ids
        :return: string with ids separated by "_"
        """
        items = []
        for slug in slugs:
            items.append(str(cls.get_or_create(slug).pk))
        return "_".join(items)
    @classmethod
    def get_help(cls, dct=None, exclude=None, force=False, full_hierarchy=None):
        if not dct:
            dct = {}
        if not exclude:
            exclude = []
        keys = ["__get_help"]
        keys += ["{}".format(ex) for ex in exclude]
        keys += ["{}-{}".format(str(k), dct[k]) for k in dct]
        cache_key, value = get_cache(cls, keys)
        if value and not force:
            return mark_safe(value)
        help_text = cls.HELP_TEXT
        c_rank = -1
        help_items = "\n"
        for item in cls.get_types(dct=dct, instances=True, exclude=exclude):
            if hasattr(item, "__iter__"):
                pk = item[0]
                item = cls.objects.get(pk=pk)
                item.rank = c_rank + 1
                if hasattr(item, "parent"):
                    c_item = item
                    parents = []
                    while c_item.parent:
                        parents.append(c_item.parent.label)
                        c_item = c_item.parent
                    parents.reverse()
                    parents.append(item.label)
                    item.label = " / ".join(parents)
            if not item.comment:
                continue
            if c_rank > item.rank:
                help_items += "\n"
            elif c_rank < item.rank:
                help_items += "
\n"
            c_rank = item.rank
            help_items += "- %s
- %s" % (
                item.label,
                "
".join(item.comment.split("\n")),
            )
        c_rank += 1
        if c_rank:
            help_items += c_rank * "
"
        if help_text or help_items != "\n":
            help_text = help_text + help_items
        else:
            help_text = ""
        cache.set(cache_key, help_text, settings.CACHE_TIMEOUT)
        return mark_safe(help_text)
    @classmethod
    def _get_initial_types(cls, initial, type_pks, instance=False):
        new_vals = []
        if not initial:
            return []
        if type(initial) not in (list, tuple):
            initial = [initial]
        for value in initial:
            try:
                pk = int(value)
            except (ValueError, TypeError):
                continue
            if pk in type_pks:
                continue
            try:
                extra_type = cls.objects.get(pk=pk)
                if instance:
                    new_vals.append(extra_type)
                else:
                    new_vals.append((extra_type.pk, str(extra_type)))
            except cls.DoesNotExist:
                continue
        return new_vals
    @classmethod
    def get_types(
        cls,
        dct=None,
        instances=False,
        exclude=None,
        empty_first=True,
        default=None,
        initial=None,
        force=False,
        full_hierarchy=False,
    ):
        if not dct:
            dct = {}
        if not exclude:
            exclude = []
        types = []
        if not instances and empty_first and not default:
            types = [("", "--")]
        types += cls._pre_get_types(
            dct, instances, exclude, default, force, get_full_hierarchy=full_hierarchy
        )
        if not initial:
            return types
        new_vals = cls._get_initial_types(initial, [idx for idx, lbl in types])
        types += new_vals
        return types
    @classmethod
    def _pre_get_types(
        cls,
        dct=None,
        instances=False,
        exclude=None,
        default=None,
        force=False,
        get_full_hierarchy=False,
    ):
        if not dct:
            dct = {}
        if not exclude:
            exclude = []
        # cache
        cache_key = None
        if not instances:
            keys = ["__get_types"]
            keys += ["{}".format(ex) for ex in exclude] + ["{}".format(default)]
            keys += ["{}-{}".format(str(k), dct[k]) for k in dct]
            cache_key, value = get_cache(cls, keys)
            if value and not force:
                return value
        base_dct = dct.copy()
        if hasattr(cls, "parent"):
            if not cache_key:
                return cls._get_parent_types(
                    base_dct,
                    instances,
                    exclude=exclude,
                    default=default,
                    get_full_hierarchy=get_full_hierarchy,
                )
            vals = [
                v
                for v in cls._get_parent_types(
                    base_dct,
                    instances,
                    exclude=exclude,
                    default=default,
                    get_full_hierarchy=get_full_hierarchy,
                )
            ]
            cache.set(cache_key, vals, settings.CACHE_TIMEOUT)
            return vals
        if not cache_key:
            return cls._get_types(base_dct, instances, exclude=exclude, default=default)
        vals = [
            v
            for v in cls._get_types(
                base_dct, instances, exclude=exclude, default=default
            )
        ]
        cache.set(cache_key, vals, settings.CACHE_TIMEOUT)
        return vals
    @classmethod
    def _get_types(cls, dct=None, instances=False, exclude=None, default=None):
        if not dct:
            dct = {}
        if not exclude:
            exclude = []
        dct["available"] = True
        if default:
            try:
                default = cls.objects.get(txt_idx=default)
                yield (default.pk, str(default))
            except cls.DoesNotExist:
                pass
        items = cls.objects.filter(**dct)
        if default and default != "None":
            if hasattr(default, "txt_idx"):
                exclude.append(default.txt_idx)
            else:
                exclude.append(default)
        if exclude:
            items = items.exclude(txt_idx__in=exclude)
        for item in items.order_by(*cls._meta.ordering).all():
            if instances:
                item.rank = 0
                yield item
            else:
                yield item.pk, str(item) if item and str(item) else ""
    @classmethod
    def _get_childs_list(cls, dct=None, exclude=None, instances=False):
        if not dct:
            dct = {}
        if not exclude:
            exclude = []
        if "parent" in dct:
            dct.pop("parent")
        childs = cls.objects.filter(**dct)
        if exclude:
            childs = childs.exclude(txt_idx__in=exclude)
        if hasattr(cls, "order"):
            childs = childs.order_by("order")
        res = {}
        if instances:
            for item in childs.all():
                parent_id = item.parent_id or 0
                if parent_id not in res:
                    res[parent_id] = []
                res[parent_id].append(item)
        else:
            for item in childs.values("id", "parent_id", "label").all():
                parent_id = item["parent_id"] or 0
                if item["id"] == item["parent_id"]:
                    parent_id = 0
                if parent_id not in res:
                    res[parent_id] = []
                res[parent_id].append((item["id"], item["label"]))
        return res
    PREFIX = "│ "
    PREFIX_EMPTY = "  "
    PREFIX_MEDIUM = "├ "
    PREFIX_LAST = "└ "
    PREFIX_CODES = ["\u2502", "\u251C", "\u2514"]
    @classmethod
    def _get_childs(
        cls,
        item,
        child_list,
        prefix=0,
        instances=False,
        is_last=False,
        last_of=None,
        get_full_hierarchy=False,
    ):
        if not last_of:
            last_of = []
        prefix += 1
        current_child_lst = []
        if item in child_list:
            current_child_lst = child_list[item]
        lst = []
        total = len(current_child_lst)
        full_hierarchy_initial = get_full_hierarchy
        for idx, child in enumerate(current_child_lst):
            mylast_of = last_of[:]
            p = ""
            if instances:
                child.rank = prefix
                lst.append(child)
            else:
                if full_hierarchy_initial:
                    if isinstance(full_hierarchy_initial, str):
                        p = full_hierarchy_initial + " > "
                    else:
                        p = ""
                else:
                    cprefix = prefix
                    while cprefix:
                        cprefix -= 1
                        if not cprefix:
                            if (idx + 1) == total:
                                p += cls.PREFIX_LAST
                            else:
                                p += cls.PREFIX_MEDIUM
                        elif is_last:
                            if mylast_of:
                                clast = mylast_of.pop(0)
                                if clast:
                                    p += cls.PREFIX_EMPTY
                                else:
                                    p += cls.PREFIX
                            else:
                                p += cls.PREFIX_EMPTY
                        else:
                            p += cls.PREFIX
                lst.append((child[0], SafeText(p + child[1])))
            clast_of = last_of[:]
            clast_of.append(idx + 1 == total)
            if instances:
                child_id = child.id
            else:
                child_id = child[0]
                if get_full_hierarchy:
                    if p:
                        if not p.endswith(" > "):
                            p += " > "
                        get_full_hierarchy = p + child[1]
                    else:
                        get_full_hierarchy = child[1]
            for sub_child in cls._get_childs(
                child_id,
                child_list,
                prefix,
                instances,
                is_last=((idx + 1) == total),
                last_of=clast_of,
                get_full_hierarchy=get_full_hierarchy,
            ):
                lst.append(sub_child)
        return lst
    @classmethod
    def _get_parent_types(
        cls,
        dct=None,
        instances=False,
        exclude=None,
        default=None,
        get_full_hierarchy=False,
    ):
        if not dct:
            dct = {}
        if not exclude:
            exclude = []
        dct["available"] = True
        child_list = cls._get_childs_list(dct, exclude, instances)
        if 0 in child_list:
            for item in child_list[0]:
                if instances:
                    item.rank = 0
                    item_id = item.pk
                    yield item
                else:
                    item_id = item[0]
                    yield item
                    if get_full_hierarchy:
                        get_full_hierarchy = item[1]
                for child in cls._get_childs(
                    item_id,
                    child_list,
                    instances=instances,
                    get_full_hierarchy=get_full_hierarchy,
                ):
                    yield child
    def save(self, *args, **kwargs):
        ItemKey = apps.get_model("ishtar_common", "ItemKey")
        if not self.id and not self.label:
            txt_idx = self.txt_idx
            if isinstance(txt_idx, list):
                txt_idx = txt_idx[0]
                self.txt_idx = txt_idx
            self.label = " ".join(" ".join(self.txt_idx.split("-")).split("_")).title()
        if not self.txt_idx:
            self.txt_idx = slugify(self.label)[:100]
        # clean old keys
        if self.pk:
            old = self.__class__.objects.get(pk=self.pk)
            content_type = ContentType.objects.get_for_model(self.__class__)
            if slugify(self.label) != slugify(old.label):
                ItemKey.objects.filter(
                    object_id=self.pk, key=slugify(old.label), content_type=content_type
                ).delete()
            if self.txt_idx != old.txt_idx:
                ItemKey.objects.filter(
                    object_id=self.pk, key=old.txt_idx, content_type=content_type
                ).delete()
        obj = super(GeneralType, self).save(*args, **kwargs)
        self.generate_key(force=True)
        return obj
    def add_key(self, key, force=False, importer=None, group=None, user=None):
        ItemKey = apps.get_model("ishtar_common", "ItemKey")
        content_type = ContentType.objects.get_for_model(self.__class__)
        if (
            not importer
            and not force
            and ItemKey.objects.filter(key=key, content_type=content_type).count()
        ):
            return
        filtr = {"key": key, "content_type": content_type}
        if group:
            filtr["group"] = group
        elif user:
            filtr["user"] = user
        else:
            filtr["importer"] = importer
        if force:
            ItemKey.objects.filter(**filtr).exclude(object_id=self.pk).delete()
        filtr["object_id"] = self.pk
        ItemKey.objects.get_or_create(**filtr)
    def generate_key(self, force=False):
        for key in (slugify(self.label), self.txt_idx):
            self.add_key(key)
    def get_keys(self, importer):
        ItemKey = apps.get_model("ishtar_common", "ItemKey")
        keys = [self.txt_idx]
        content_type = ContentType.objects.get_for_model(self.__class__)
        base_q = Q(content_type=content_type, object_id=self.pk)
        subquery = Q(importer__isnull=True, user__isnull=True, group__isnull=True)
        subquery |= Q(user__isnull=True, group__isnull=True, importer=importer)
        if importer.user:
            subquery |= Q(user=importer.user, group__isnull=True, importer=importer)
        if importer.associated_group:
            subquery |= Q(
                user__isnull=True, group=importer.associated_group, importer=importer
            )
        q = ItemKey.objects.filter(base_q & subquery)
        for ik in q.exclude(key=self.txt_idx).all():
            keys.append(ik.key)
        return keys
    @classmethod
    def generate_keys(cls):
        # content_type = ContentType.objects.get_for_model(cls)
        for item in cls.objects.all():
            item.generate_key()
class HierarchicalType(GeneralType):
    parent = models.ForeignKey(
        "self",
        blank=True,
        null=True,
        on_delete=models.SET_NULL,
        verbose_name=_("Parent"),
    )
    class Meta:
        abstract = True
    def full_label(self):
        lbls = [self.label]
        item = self
        while item.parent:
            item = item.parent
            lbls.append(item.label)
        return " > ".join(reversed(lbls))
    @property
    def first_parent(self):
        parent = self.parent
        parents = []
        while parent:
            if parent in parents:  # prevent circular
                return parent
            parents.append(parent)
            if not parent.parent:
                return parent
            parent = parent.parent
class StatisticItem:
    STATISTIC_MODALITIES = []  # example: "year", "operation_type__label"
    STATISTIC_MODALITIES_OPTIONS = OrderedDict()  # example:
    # OrderedDict([('year', _("Year")),
    #              ("operation_type__label",  _("Operation type"))])
    STATISTIC_SUM_VARIABLE = OrderedDict(
        (("pk", (_("Number"), 1)),)
    )  # example: "Price", "Volume" - the number is a multiplier
class TemplateItem:
    @classmethod
    def _label_templates_q(cls):
        model_name = "{}.{}".format(cls.__module__, cls.__name__)
        q = Q(associated_model__klass=model_name, for_labels=True, available=True)
        alt_model_name = model_name.replace("models_finds", "models").replace(
            "models_treatments", "models"
        )
        if alt_model_name != model_name:
            q |= Q(associated_model__klass=model_name, for_labels=True, available=True)
        DocumentTemplate = apps.get_model("ishtar_common", "DocumentTemplate")
        return DocumentTemplate.objects.filter(q)
    @classmethod
    def has_label_templates(cls):
        return cls._label_templates_q().count()
    @classmethod
    def label_templates(cls):
        return cls._label_templates_q()
    def get_extra_templates(self, request):
        cls = self.__class__
        templates = []
        name = str(cls.__name__)
        module = str(cls.__module__)
        if "archaeological_finds" in module:
            if "models_finds" in name or "models_treatments" in name:
                names = [
                    name,
                    name.replace("models_finds", "models").replace(
                        "models_treatments", "models"
                    ),
                ]
            else:
                names = [
                    name,
                    name.replace("models", "models_finds"),
                    name.replace("models", "models_treatments"),
                ]
        else:
            names = [name]
        model_names = ["{}.{}".format(module, name) for name in names]
        DocumentTemplate = apps.get_model("ishtar_common", "DocumentTemplate")
        q = DocumentTemplate.objects.filter(
            associated_model__klass__in=model_names, for_labels=False, available=True
        )
        for template in q.all():
            urlname = "generate-document"
            templates.append(
                (template.name, reverse(urlname, args=[template.slug, self.pk]))
            )
        return templates
class SheetFilter(models.Model):
    key = models.CharField(_("Key"), max_length=200)
    class Meta:
        abstract = True
    def get_template(self):
        raise NotImplemented()
    def get_keys(self):
        attrs = re.compile(r"item\.([_a-zA-Z]+)")
        includes = re.compile(r"""\{\% *include *["'](/?(?:[^/]+/?)+)["'] *\%\}""")
        main_template = self.get_template()
        templates = [main_template]
        with open(main_template, "r") as fle:
            content = fle.read()
        keys = attrs.findall(content)
        for line in content.split("\n"):
            for tpl_name in includes.findall(line):
                if tpl_name in templates:
                    continue
                templates.append(tpl_name)
                tpl = loader.get_template(tpl_name)
                with open(tpl.template.origin.name, "r") as fle:
                    sub_content = fle.read()
                    keys += attrs.findall(sub_content)
        return sorted(set(keys))
class FullSearch(models.Model):
    search_vector = SearchVectorField(
        _("Search vector"), blank=True, null=True, help_text=_("Auto filled at save")
    )
    EXTRA_REQUEST_KEYS = {}
    DYNAMIC_REQUESTS = {}
    ALT_NAMES = {}
    BOOL_FIELDS = []
    REVERSED_BOOL_FIELDS = []
    CALLABLE_BOOL_FIELDS = []
    BASE_SEARCH_VECTORS = []
    PROPERTY_SEARCH_VECTORS = []
    INT_SEARCH_VECTORS = []
    M2M_SEARCH_VECTORS = []
    PARENT_SEARCH_VECTORS = []
    # prevent circular dependency
    PARENT_ONLY_SEARCH_VECTORS = []
    class Meta:
        abstract = True
    @classmethod
    def general_types(cls):
        for k in get_all_field_names(cls):
            field = cls._meta.get_field(k)
            if not hasattr(field, "remote_field") or not field.remote_field:
                continue
            rel_model = field.remote_field.model
            if issubclass(rel_model, (GeneralType, HierarchicalType)):
                yield k
    @classmethod
    def get_alt_names(cls):
        alt_names = cls.ALT_NAMES.copy()
        for dr_k in cls.DYNAMIC_REQUESTS:
            alt_names.update(cls.DYNAMIC_REQUESTS[dr_k].get_alt_names())
        return alt_names
    @classmethod
    def get_query_parameters(cls):
        query_parameters = {}
        for v in cls.get_alt_names().values():
            for language_code, language_lbl in settings.LANGUAGES:
                activate(language_code)
                query_parameters[str(v.search_key)] = v
                deactivate()
        return query_parameters
    @classmethod
    def _update_raw_search_field(cls, value):
        result = []
        if not value:
            value = ""
        for val in value.split("'"):
            result.append(f"'{val.lower()}':1")
        SEPS = [" ", "-", "/"]
        values = []
        # split ID terms
        for idx, sep in enumerate(SEPS):
            if not idx:
                values = value.split(sep)
                continue
            new_values = []
            for val in values:
                new_values += val.split(sep)
            values = new_values
        for val in values:
            if len(val) < 2:
                continue
            val = val.replace("'", "").lower()
            result.append(f"'{val}':1")
        return result
    def _update_search_field(self, search_vector_conf, search_vectors, data):
        for value in search_vector_conf.format(data):
            if search_vector_conf.language == "raw":
                search_vectors += self._update_raw_search_field(value)
                continue
            with connection.cursor() as cursor:
                cursor.execute(
                    "SELECT to_tsvector(%s, %s)", [search_vector_conf.language, value]
                )
                row = cursor.fetchone()
                search_vectors.append(row[0])
    def _update_search_number_field(self, search_vectors, val):
        try:
            search_vectors.append("'{}':1".format(int(val)))
        except (ValueError, TypeError):
            pass
    def update_search_vector(self, save=True, exclude_parent=False):
        """
        Update the search vector
        :param save: True if you want to save the object immediately
        :return: True if modified
        """
        if not hasattr(self, "search_vector"):
            return
        if not self.pk:
            # logger.warning("Cannot update search vector before save or "
            #                "after deletion.")
            return
        if (
            not self.BASE_SEARCH_VECTORS
            and not self.M2M_SEARCH_VECTORS
            and not self.INT_SEARCH_VECTORS
            and not self.PROPERTY_SEARCH_VECTORS
            and not self.PARENT_SEARCH_VECTORS
        ):
            logger.warning("No search_vectors defined for {}".format(self.__class__))
            return
        if getattr(self, "_search_updated", None):
            if not save:
                return self.search_vector
            return
        JsonDataField = apps.get_model("ishtar_common", "JsonDataField")
        self._search_updated = True
        old_search = ""
        if self.search_vector:
            old_search = self.search_vector[:]
        search_vectors = []
        base_q = self.__class__.objects.filter(pk=self.pk)
        # many to many have to be queried one by one otherwise only one is fetch
        for m2m_search_vector in self.M2M_SEARCH_VECTORS:
            key = m2m_search_vector.key.split("__")[0]
            rel_key = getattr(self, key)
            for item in rel_key.values("pk").all():
                query_dct = {key + "__pk": item["pk"]}
                q = copy.copy(base_q).filter(**query_dct)
                if m2m_search_vector.language == "raw":
                    q = q.values_list(m2m_search_vector.key, flat=True)
                    search_vectors += self._update_raw_search_field(q.all()[0])
                    continue
                query_dct = {key + "__pk": item["pk"]}
                q = copy.copy(base_q).filter(**query_dct)
                q = q.annotate(
                    search=SearchVector(
                        m2m_search_vector.key, config=m2m_search_vector.language
                    )
                ).values("search")
                search_vectors.append(q.all()[0]["search"])
        # int/float are not well managed by the SearchVector
        for int_search_vector in self.INT_SEARCH_VECTORS:
            q = base_q.values(int_search_vector.key)
            for val in int_search_vector.format(q.all()[0][int_search_vector.key]):
                self._update_search_number_field(search_vectors, val)
        if not exclude_parent:
            # copy parent vector fields
            for PARENT_SEARCH_VECTOR in self.PARENT_SEARCH_VECTORS:
                parent = getattr(self, PARENT_SEARCH_VECTOR)
                if hasattr(parent, "all"):  # m2m
                    for p in parent.all():
                        search_vectors.append(p.search_vector)
                elif parent:
                    search_vectors.append(parent.search_vector)
            for PARENT_ONLY_SEARCH_VECTOR in self.PARENT_ONLY_SEARCH_VECTORS:
                parent = getattr(self, PARENT_ONLY_SEARCH_VECTOR)
                if hasattr(parent, "all"):  # m2m
                    for p in parent.all():
                        search_vectors.append(
                            p.update_search_vector(save=False, exclude_parent=True)
                        )
                elif parent:
                    search_vectors.append(
                        parent.update_search_vector(save=False, exclude_parent=True)
                    )
        if self.BASE_SEARCH_VECTORS:
            # query "simple" fields
            q = base_q.values(*[sv.key for sv in self.BASE_SEARCH_VECTORS])
            res = q.all()[0]
            for base_search_vector in self.BASE_SEARCH_VECTORS:
                data = res[base_search_vector.key]
                data = unidecode(str(data))
                self._update_search_field(base_search_vector, search_vectors, data)
        if self.PROPERTY_SEARCH_VECTORS:
            for property_search_vector in self.PROPERTY_SEARCH_VECTORS:
                data = getattr(self, property_search_vector.key)
                if callable(data):
                    data = data()
                if not data:
                    continue
                data = str(data)
                self._update_search_field(property_search_vector, search_vectors, data)
        if hasattr(self, "data") and self.data:
            content_type = ContentType.objects.get_for_model(self)
            for json_field in JsonDataField.objects.filter(
                content_type=content_type, search_index=True
            ).all():
                data = copy.deepcopy(self.data)
                no_data = False
                for key in json_field.key.split("__"):
                    if key not in data:
                        no_data = True
                        break
                    data = data[key]
                if no_data or not data:
                    continue
                if json_field.value_type == "B":
                    if data is True:
                        data = json_field.name
                    else:
                        continue
                elif json_field.value_type in ("I", "F"):
                    self._update_search_number_field(search_vectors, data)
                    continue
                elif json_field.value_type == "D":
                    # only index year
                    self._update_search_number_field(search_vectors, data.year)
                    continue
                datas = [data]
                if json_field.value_type == "MC":
                    datas = data
                for d in datas:
                    for lang in ("simple", settings.ISHTAR_SEARCH_LANGUAGE):
                        with connection.cursor() as cursor:
                            cursor.execute("SELECT to_tsvector(%s, %s)", [lang, d])
                            row = cursor.fetchone()
                            search_vectors.append(row[0])
        new_search_vector = merge_tsvectors(search_vectors)
        changed = old_search != new_search_vector
        self.search_vector = new_search_vector
        if save and changed:
            self.__class__.objects.filter(pk=self.pk).update(
                search_vector=new_search_vector
            )
        elif not save:
            return new_search_vector
        return changed
class Imported(models.Model):
    imports = models.ManyToManyField(
        Import, blank=True, related_name="imported_%(app_label)s_%(class)s"
    )
    class Meta:
        abstract = True
class JsonData(models.Model, CachedGen):
    data = JSONField(default=dict, blank=True)
    class Meta:
        abstract = True
    def pre_save(self):
        if not self.data:
            self.data = {}
    @property
    def json_sections(self):
        sections = []
        try:
            content_type = ContentType.objects.get_for_model(self)
        except ContentType.DoesNotExists:
            return sections
        JsonDataField = apps.get_model("ishtar_common", "JsonDataField")
        fields = list(
            JsonDataField.objects.filter(
                content_type=content_type, display=True, section__isnull=True
            ).all()
        )  # no section fields
        fields += list(
            JsonDataField.objects.filter(
                content_type=content_type, display=True, section__isnull=False
            )
            .order_by("section__order", "order")
            .all()
        )
        for field in fields:
            value = None
            data = self.data.copy()
            for key in field.key.split("__"):
                if key in data:
                    value = copy.copy(data[key])
                    data = data[key]
                else:
                    value = None
                    break
            if value is None:
                continue
            if type(value) in (list, tuple):
                value = " ; ".join([str(v) for v in value])
            section_name = field.section.name if field.section else None
            if not sections or section_name != sections[-1][0]:
                # if section name is identical it is the same
                sections.append((section_name, []))
            sections[-1][1].append((field.name, value))
        return sections
    @classmethod
    def refresh_cache(cls):
        __, refreshed = get_cache(cls, ["cache_refreshed"])
        if refreshed and time.time() - refreshed < 1:
            return
        cache_ckey, current_keys = get_cache(cls, ["_current_keys"])
        if not current_keys:
            return
        for keys in current_keys:
            if isinstance(keys, (list, tuple)) and keys[0] == "__get_dynamic_choices":
                cls._get_dynamic_choices(keys[1], force=True)
    @classmethod
    def _get_dynamic_choices(cls, key, force=False):
        """
        Get choice from existing values
        :param key: data key
        :param force: if set to True do not use cache
        :return: tuple of choices (id, value)
        """
        cache_key, value = get_cache(cls, ["__get_dynamic_choices", key])
        if not force and value:
            return value
        choices = set()
        splitted_key = key[len("data__") :].split("__")
        q = cls.objects.filter(data__has_key=key[len("data__") :]).values_list(
            "data", flat=True
        )
        multi = False
        for value in q.all():
            for k in splitted_key:
                value = value[k]
            if isinstance(value, list):
                multi = True
                for v in value:
                    if v:
                        choices.add(v)
            else:
                choices.add(value)
        c = []
        if not multi:
            c = [("", "")]
        c += [(v, v) for v in sorted(list(choices))]
        cache.set(cache_key, c, settings.CACHE_SMALLTIMEOUT)
        return c
class FixAssociated:
    ASSOCIATED = {}
    def fix_associated(self):
        for key in self.ASSOCIATED:
            item = getattr(self, key)
            if not item:
                continue
            dct = self.ASSOCIATED[key]
            for dct_key in dct:
                subkey, ctype = dct_key
                expected_values = dct[dct_key]
                if not isinstance(expected_values, (list, tuple)):
                    expected_values = [expected_values]
                if hasattr(ctype, "txt_idx"):
                    try:
                        expected_values = [
                            ctype.objects.get(txt_idx=v) for v in expected_values
                        ]
                    except ctype.DoesNotExist:
                        # type not yet initialized
                        return
                current_vals = getattr(item, subkey)
                is_many = False
                if hasattr(current_vals, "all"):
                    is_many = True
                    current_vals = current_vals.all()
                else:
                    current_vals = [current_vals]
                is_ok = False
                for current_val in current_vals:
                    if current_val in expected_values:
                        is_ok = True
                        break
                if is_ok:
                    continue
                # the first value is used
                new_value = expected_values[0]
                if is_many:
                    getattr(item, subkey).add(new_value)
                else:
                    setattr(item, subkey, new_value)
class CascasdeUpdate:
    DOWN_MODEL_UPDATE = []
    def cascade_update(self):
        for down_model in self.DOWN_MODEL_UPDATE:
            if not settings.USE_BACKGROUND_TASK:
                rel = getattr(self, down_model)
                if hasattr(rel.model, "need_update"):
                    rel.update(need_update=True)
                    continue
            for item in getattr(self, down_model).all():
                cached_label_changed(item.__class__, instance=item)
                if hasattr(item, "point_2d"):
                    post_save_geo(item.__class__, instance=item)
class SearchAltName(object):
    def __init__(
        self, search_key, search_query, extra_query=None, distinct_query=False
    ):
        self.search_key = search_key
        self.search_query = search_query
        self.extra_query = extra_query or {}
        self.distinct_query = distinct_query
class HistoryError(Exception):
    def __init__(self, value):
        self.value = value
    def __str__(self):
        return repr(self.value)
class HistoricalRecords(BaseHistoricalRecords):
    def get_extra_fields(self, model, fields):
        def get_history_m2m(attr):
            def _get_history_m2m(self):
                q = model.objects.filter(pk=getattr(self, model._meta.pk.attname))
                if q.count():
                    item = q.all()[0]
                    if attr in item.history_m2m:
                        return item.history_m2m[attr]
            return _get_history_m2m
        def get_serialize_call(attr):
            def _get_serialize_call(self):
                q = model.objects.filter(pk=getattr(self, model._meta.pk.attname))
                if q.count():
                    return getattr(q.all()[0], attr)()
            return _get_serialize_call
        def get_serialize_properties(attr):
            def _get_serialize_properties(self):
                q = model.objects.filter(pk=getattr(self, model._meta.pk.attname))
                if q.count():
                    return getattr(q.all()[0], attr)
            return _get_serialize_properties
        extra_fields = super().get_extra_fields(model, fields)
        # initialize default empty fields
        fields = [f.name for f in model._meta.fields]
        lst = ["documents"]
        for key in lst:
            extra_fields[key] = ""
        for k in getattr(model, "SERIALIZE_PROPERTIES", []):
            if k not in fields:
                extra_fields[k] = get_serialize_properties(k)
        for k in getattr(model, "SERIALIZE_CALL", []):
            if k not in fields:
                extra_fields[k] = get_serialize_call(k)
        for k in getattr(model, "HISTORICAL_M2M", []):
            if k not in fields:
                extra_fields[k] = get_history_m2m(k)
        return extra_fields
    def _save_historic(
        self,
        manager,
        instance,
        history_date,
        history_type,
        history_user,
        history_change_reason,
        using,
        attrs,
    ):
        history_instance = manager.model(
            history_date=history_date,
            history_type=history_type,
            history_user=history_user,
            history_change_reason=history_change_reason,
            **attrs,
        )
        pre_create_historical_record.send(
            sender=manager.model,
            instance=instance,
            history_date=history_date,
            history_user=history_user,
            history_change_reason=history_change_reason,
            history_instance=history_instance,
            using=using,
        )
        history_instance.save(using=using)
        post_create_historical_record.send(
            sender=manager.model,
            instance=instance,
            history_instance=history_instance,
            history_date=history_date,
            history_user=history_user,
            history_change_reason=history_change_reason,
            using=using,
        )
    def create_historical_record(self, instance, history_type, using=None):
        try:
            history_modifier = getattr(instance, "history_modifier", None)
            assert history_modifier
        except (User.DoesNotExist, AssertionError):
            # on batch removing of users, user could have disappeared
            return
        history_date = getattr(instance, "_history_date", datetime.datetime.now())
        history_change_reason = getattr(instance, "changeReason", None)
        force = getattr(instance, "_force_history", False)
        manager = getattr(instance, self.manager_name)
        attrs = {}
        for field in instance._meta.fields:
            attrs[field.attname] = getattr(instance, field.attname)
        q_history = instance.history.filter(
            history_modifier_id=history_modifier.pk
        ).order_by("-history_date", "-history_id")
        # instance.skip_history_when_saving = True
        if not q_history.count():
            if force:
                delattr(instance, "_force_history")
            self._save_historic(
                manager,
                instance,
                history_date,
                history_type,
                history_modifier,
                history_change_reason,
                using,
                attrs,
            )
            return
        old_instance = q_history.all()[0]
        # multiple saving by the same user in a very short time are generaly
        # caused by post_save signals it is not relevant to keep them
        min_history_date = datetime.datetime.now() - datetime.timedelta(seconds=5)
        q = q_history.filter(
            history_date__isnull=False, history_date__gt=min_history_date
        ).order_by("-history_date", "-history_id")
        if not force and q.count():
            return
        if force:
            delattr(instance, "_force_history")
        # record a new version only if data have been changed
        for field in instance._meta.fields:
            if getattr(old_instance, field.attname) != attrs[field.attname]:
                self._save_historic(
                    manager,
                    instance,
                    history_date,
                    history_type,
                    history_modifier,
                    history_change_reason,
                    using,
                    attrs,
                )
                return
class BaseHistorizedItem(
    StatisticItem,
    TemplateItem,
    FullSearch,
    Imported,
    JsonData,
    FixAssociated,
    CascasdeUpdate,
):
    """
    Historized item with external ID management.
    All historized items are searchable and have a data json field.
    Historized items can be "locked" for edition.
    """
    IS_BASKET = False
    SHOW_URL = None
    EXTERNAL_ID_KEY = ""
    EXTERNAL_ID_DEPENDENCIES = []
    HISTORICAL_M2M = []
    history_modifier = models.ForeignKey(
        User,
        related_name="+",
        on_delete=models.SET_NULL,
        verbose_name=_("Last editor"),
        blank=True,
        null=True,
    )
    history_creator = models.ForeignKey(
        User,
        related_name="+",
        on_delete=models.SET_NULL,
        verbose_name=_("Creator"),
        blank=True,
        null=True,
    )
    last_modified = models.DateTimeField(blank=True, default=datetime.datetime.now)
    history_m2m = JSONField(default=dict, blank=True)
    need_update = models.BooleanField(verbose_name=_("Need update"), default=False)
    locked = models.BooleanField(
        verbose_name=_("Item locked for edition"), default=False
    )
    lock_user = models.ForeignKey(
        User,
        related_name="+",
        on_delete=models.SET_NULL,
        verbose_name=_("Locked by"),
        blank=True,
        null=True,
    )
    DATED_FIELDS = [
        "last_modified__gte",
        "last_modified__lte",
    ]
    BOOL_FIELDS = ["locked"]
    ALT_NAMES = {
        "history_creator": SearchAltName(
            pgettext_lazy("key for text search", "created-by"),
            "history_creator__ishtaruser__person__cached_label__iexact",
        ),
        "history_modifier": SearchAltName(
            pgettext_lazy("key for text search", "modified-by"),
            "history_modifier__ishtaruser__person__cached_label__iexact",
        ),
        "modified_before": SearchAltName(
            pgettext_lazy("key for text search", "modified-before"),
            "last_modified__lte",
        ),
        "modified_after": SearchAltName(
            pgettext_lazy("key for text search", "modified-after"), "last_modified__gte"
        ),
        "locked": SearchAltName(
            pgettext_lazy("key for text search", "locked"), "locked"
        )
    }
    class Meta:
        abstract = True
    @classmethod
    def get_verbose_name(cls):
        return cls._meta.verbose_name
    def is_locked(self, user=None):
        if not user:
            return self.locked
        return self.locked and (not self.lock_user or self.lock_user != user)
    def merge(self, item, keep_old=False):
        merge_model_objects(self, item, keep_old=keep_old)
    def public_representation(self):
        return {}
    def duplicate(self, user=None, data=None):
        return duplicate_item(self, user, data)
    def update_external_id(self, save=False):
        if not self.EXTERNAL_ID_KEY or (
            self.external_id and not getattr(self, "auto_external_id", False)
        ):
            return
        external_id = get_generated_id(self.EXTERNAL_ID_KEY, self)
        if external_id == self.external_id:
            return
        self.auto_external_id = True
        self.external_id = external_id
        self._cached_label_checked = False
        if save:
            self.skip_history_when_saving = True
            self.save()
        return external_id
    def get_last_history_date(self):
        q = self.history.values("history_date").order_by("-history_date")
        if not q.count():
            return
        return q.all()[0]["history_date"]
    def get_previous(self, step=None, date=None, strict=False):
        """
        Get a "step" previous state of the item
        """
        assert step or date
        historized = self.history.all()
        item = None
        if step:
            if len(historized) <= step:
                # silently return the last step if too far in the history
                item = historized[len(historized) - 1]
            else:
                item = historized[step]
        else:
            for step, item in enumerate(historized):
                if item.history_date == date:
                    break
            # ended with no match
            if item.history_date != date:
                return
        item._step = step
        if len(historized) != (step + 1):
            item._previous = historized[step + 1].history_date
        else:
            item._previous = None
        if step > 0:
            item._next = historized[step - 1].history_date
        else:
            item._next = None
        item.history_date = historized[step].history_date
        model = self.__class__
        for k in get_all_field_names(model):
            field = model._meta.get_field(k)
            if hasattr(field, "rel") and field.rel:
                if not hasattr(item, k + "_id"):
                    setattr(item, k, getattr(self, k))
                    continue
                val = getattr(item, k + "_id")
                if not val:
                    setattr(item, k, None)
                    continue
                try:
                    val = field.remote_field.model.objects.get(pk=val)
                    setattr(item, k, val)
                except ObjectDoesNotExist:
                    if strict:
                        raise HistoryError(
                            "The class %s has no pk %d"
                            % (str(field.remote_field.model), val)
                        )
                    setattr(item, k, None)
        item.pk = self.pk
        return item
    @property
    def last_edition_date(self):
        try:
            return self.history.order_by("-history_date").all()[0].history_date
        except (AttributeError, IndexError):
            return
    @property
    def history_creation_date(self):
        try:
            return self.history.order_by("history_date").all()[0].history_date
        except (AttributeError, IndexError):
            return
    def rollback(self, date):
        """
        Rollback to a previous state
        """
        to_del, new_item = [], None
        for item in self.history.all():
            if item.history_date == date:
                new_item = item
                break
            to_del.append(item)
        if not new_item:
            raise HistoryError("The date to rollback to doesn't exist.")
        try:
            field_keys = [f.name for f in self._meta.fields]
            for k in field_keys:
                if k != "id" and hasattr(self, k):
                    if not hasattr(new_item, k):
                        k = k + "_id"
                    setattr(self, k, getattr(new_item, k))
            try:
                self.history_modifier = User.objects.get(
                    pk=new_item.history_modifier_id
                )
            except User.ObjectDoesNotExist:
                pass
            self.save()
            saved_m2m = new_item.history_m2m.copy()
            for hist_key in self.HISTORICAL_M2M:
                # after each association m2m is rewrite - force the original
                # to be reset
                new_item.history_m2m = saved_m2m
                values = new_item.m2m_listing(hist_key, create=True) or []
                hist_field = getattr(self, hist_key)
                hist_field.clear()
                for val in values:
                    hist_field.add(val)
            # force label regeneration
            self._cached_label_checked = False
            self.save()
        except ObjectDoesNotExist:
            raise HistoryError("The rollback has failed.")
        # clean the obsolete history
        for historized_item in to_del:
            historized_item.delete()
    def m2m_listing(self, key):
        return getattr(self, key).all()
    def values(self):
        values = {}
        for f in self._meta.fields:
            k = f.name
            if k != "id":
                values[k] = getattr(self, k)
        return values
    def get_absolute_url(self):
        try:
            return reverse("display-item", args=[self.SLUG, self.pk])
        except NoReverseMatch:
            return
    def get_show_url(self):
        show_url = self.SHOW_URL
        if not show_url:
            show_url = "show-" + self.__class__.__name__.lower()
        try:
            return reverse(show_url, args=[self.pk, ""])
        except NoReverseMatch:
            return
    @property
    def associated_filename(self):
        if [
            True
            for attr in (
                "get_town_label",
                "get_department",
                "reference",
                "short_class_name",
            )
            if not hasattr(self, attr)
        ]:
            return ""
        items = [
            slugify(self.get_department()),
            slugify(self.get_town_label()).upper(),
            slugify(self.short_class_name),
            slugify(self.reference),
            slugify(self.name or "").replace("-", "_").capitalize(),
        ]
        last_edition_date = self.last_edition_date
        if last_edition_date:
            items.append(last_edition_date.strftime("%Y%m%d"))
        else:
            items.append("00000000")
        return "-".join([str(item) for item in items])
    def save(self, *args, **kwargs):
        created = not self.pk
        if not getattr(self, "_no_last_modified_update", False) \
                or not self.last_modified:
            self.last_modified = datetime.datetime.now()
        if not getattr(self, "skip_history_when_saving", False):
            assert hasattr(self, "history_modifier")
            if created:
                self.history_creator = self.history_modifier
        # external ID can have related item not available before save
        external_id_updated = (
            kwargs.pop("external_id_updated")
            if "external_id_updated" in kwargs
            else False
        )
        if not created and not external_id_updated:
            self.update_external_id()
        super(BaseHistorizedItem, self).save(*args, **kwargs)
        if created and self.update_external_id():
            # force resave for external ID creation
            self.skip_history_when_saving = True
            self._updated_id = True
            return self.save(external_id_updated=True)
        for dep in self.EXTERNAL_ID_DEPENDENCIES:
            for obj in getattr(self, dep).all():
                obj.update_external_id(save=True)
        self.fix_associated()
        return True
class LightHistorizedItem(BaseHistorizedItem):
    history_date = models.DateTimeField(default=datetime.datetime.now)
    class Meta:
        abstract = True
    def save(self, *args, **kwargs):
        super(LightHistorizedItem, self).save(*args, **kwargs)
        return self
class OwnPerms(object):
    """
    Manage special permissions for object's owner
    """
    @classmethod
    def get_query_owns(cls, ishtaruser):
        """
        Query object to get own items
        """
        return None  # implement for each object
    def can_view(self, request):
        if hasattr(self, "LONG_SLUG"):
            perm = "view_" + self.LONG_SLUG
        else:
            perm = "view_" + self.SLUG
        return self.can_do(request, perm)
    def can_edit(self, request):
        if not getattr(request.user, "ishtaruser", None):
            return False
        ishtaruser = request.user.ishtaruser
        slug = self.LONG_SLUG if hasattr(self, "LONG_SLUG") else self.SLUG
        if ishtaruser.has_perm("change_" + slug, session=request.session):
            return True
        if not ishtaruser.has_perm("change_own_" + slug, session=request.session):
            return False
        return self.is_own(ishtaruser)
    def can_do(self, request, action_name):
        """
        Check permission availability for the current object.
        :param request: request object
        :param action_name: action name eg: "change_find" - "own" variation is
        checked
        :return: boolean
        """
        if not getattr(request.user, "ishtaruser", None):
            return False
        splited = action_name.split("_")
        action_own_name = splited[0] + "_own_" + "_".join(splited[1:])
        user = request.user
        if action_name == "view_findbasket":
            action_own_name = "view_own_find"
            action_name = "view_find"
        return user.ishtaruser.has_right(action_name, request.session) or (
            user.ishtaruser.has_right(action_own_name, request.session)
            and self.is_own(user.ishtaruser)
        )
    def is_own(self, user, alt_query_own=None):
        """
        Check if the current object is owned by the user
        """
        IshtarUser = apps.get_model("ishtar_common", "IshtarUser")
        if isinstance(user, IshtarUser):
            ishtaruser = user
        elif hasattr(user, "ishtaruser"):
            ishtaruser = user.ishtaruser
        else:
            return False
        if not alt_query_own:
            query = self.get_query_owns(ishtaruser)
        else:
            query = getattr(self, alt_query_own)(ishtaruser)
        if not query:
            return False
        query &= Q(pk=self.pk)
        return self.__class__.objects.filter(query).count()
    @classmethod
    def has_item_of(cls, user):
        """
        Check if the user own some items
        """
        IshtarUser = apps.get_model("ishtar_common", "IshtarUser")
        if isinstance(user, IshtarUser):
            ishtaruser = user
        elif hasattr(user, "ishtaruser"):
            ishtaruser = user.ishtaruser
        else:
            return False
        query = cls.get_query_owns(ishtaruser)
        if not query:
            return False
        return cls.objects.filter(query).count()
    @classmethod
    def _return_get_owns(
        cls, owns, values, get_short_menu_class, label_key="cached_label"
    ):
        if not owns:
            return []
        sorted_values = []
        if hasattr(cls, "BASKET_MODEL"):
            owns_len = len(owns)
            for idx, item in enumerate(reversed(owns)):
                if get_short_menu_class:
                    item = item[0]
                if type(item) == cls.BASKET_MODEL:
                    basket = owns.pop(owns_len - idx - 1)
                    sorted_values.append(basket)
            sorted_values = list(reversed(sorted_values))
        if not values:
            if not get_short_menu_class:
                return sorted_values + list(
                    sorted(owns, key=lambda x: getattr(x, label_key) or "")
                )
            return sorted_values + list(
                sorted(owns, key=lambda x: getattr(x[0], label_key) or "")
            )
        if not get_short_menu_class:
            return sorted_values + list(sorted(owns, key=lambda x: x[label_key] or ""))
        return sorted_values + list(sorted(owns, key=lambda x: x[0][label_key] or ""))
    @classmethod
    def get_owns(
        cls,
        user,
        replace_query=None,
        limit=None,
        values=None,
        get_short_menu_class=False,
        menu_filtr=None,
    ):
        """
        Get Own items
        """
        if not replace_query:
            replace_query = {}
        if hasattr(user, "is_authenticated") and not user.is_authenticated:
            returned = cls.objects.filter(pk__isnull=True)
            if values:
                returned = []
            return returned
        IshtarUser = apps.get_model("ishtar_common", "IshtarUser")
        if isinstance(user, User):
            try:
                ishtaruser = IshtarUser.objects.get(user_ptr=user)
            except IshtarUser.DoesNotExist:
                returned = cls.objects.filter(pk__isnull=True)
                if values:
                    returned = []
                return returned
        elif isinstance(user, IshtarUser):
            ishtaruser = user
        else:
            if values:
                return []
            return cls.objects.filter(pk__isnull=True)
        items = []
        if hasattr(cls, "BASKET_MODEL"):
            items = list(cls.BASKET_MODEL.objects.filter(user=ishtaruser).all())
        query = cls.get_query_owns(ishtaruser)
        if not query and not replace_query:
            returned = cls.objects.filter(pk__isnull=True)
            if values:
                returned = []
            return returned
        if query:
            q = cls.objects.filter(query)
        else:  # replace_query
            q = cls.objects.filter(replace_query)
        if values:
            q = q.values(*values)
        if limit:
            items += list(q.order_by("-pk")[:limit])
        else:
            items += list(q.order_by(*cls._meta.ordering).all())
        if get_short_menu_class:
            if values:
                if "id" not in values:
                    raise NotImplementedError(
                        "Call of get_owns with get_short_menu_class option and"
                        " no 'id' in values is not implemented"
                    )
                my_items = []
                for i in items:
                    if hasattr(cls, "BASKET_MODEL") and type(i) == cls.BASKET_MODEL:
                        dct = dict([(k, getattr(i, k)) for k in values])
                        my_items.append(
                            (dct, cls.BASKET_MODEL.get_short_menu_class(i.pk))
                        )
                    else:
                        my_items.append((i, cls.get_short_menu_class(i["id"])))
                items = my_items
            else:
                items = [(i, cls.get_short_menu_class(i.pk)) for i in items]
        return items
    @classmethod
    def _get_query_owns_dicts(cls, ishtaruser):
        """
        List of query own dict to construct the query.
        Each dict is joined with an AND operator, each dict key, values are
        joined with OR operator
        """
        return []
    @classmethod
    def _construct_query_own(cls, prefix, dct_list):
        q = None
        for subquery_dict in dct_list:
            subquery = None
            for k in subquery_dict:
                subsubquery = Q(**{prefix + k: subquery_dict[k]})
                if subquery:
                    subquery |= subsubquery
                else:
                    subquery = subsubquery
            if not subquery:
                continue
            if q:
                q &= subquery
            else:
                q = subquery
        return q
class NumberManager(models.Manager):
    def get_by_natural_key(self, number):
        return self.get(number=number)
class State(models.Model):
    label = models.CharField(_("Label"), max_length=30)
    number = models.CharField(_("Number"), unique=True, max_length=3)
    objects = NumberManager()
    class Meta:
        verbose_name = _("State")
        ordering = ["number"]
    def __str__(self):
        return self.label
    def natural_key(self):
        return (self.number,)
class Department(models.Model):
    label = models.CharField(_("Label"), max_length=30)
    number = models.CharField(_("Number"), unique=True, max_length=3)
    state = models.ForeignKey(
        "State",
        verbose_name=_("State"),
        blank=True,
        null=True,
        on_delete=models.SET_NULL,
    )
    objects = NumberManager()
    class Meta:
        verbose_name = _("Department")
        verbose_name_plural = _("Departments")
        ordering = ["number"]
    def __str__(self):
        return self.label
    def natural_key(self):
        return (self.number,)
    def history_compress(self):
        return self.number
    @classmethod
    def history_decompress(cls, full_value, create=False):
        if not full_value:
            return []
        res = []
        for value in full_value:
            try:
                res.append(cls.objects.get(number=value))
            except cls.DoesNotExist:
                continue
        return res
class Arrondissement(models.Model):
    name = models.CharField("Nom", max_length=30)
    department = models.ForeignKey(
        Department, verbose_name="Département", on_delete=models.CASCADE
    )
    def __str__(self):
        return settings.JOINT.join((self.name, str(self.department)))
class Canton(models.Model):
    name = models.CharField("Nom", max_length=30)
    arrondissement = models.ForeignKey(
        Arrondissement, verbose_name="Arrondissement", on_delete=models.CASCADE
    )
    def __str__(self):
        return settings.JOINT.join((self.name, str(self.arrondissement)))
class SpatialReferenceSystem(GeneralType):
    order = models.IntegerField(_("Order"), default=10)
    auth_name = models.CharField(_("Authority name"), default="EPSG", max_length=256)
    srid = models.IntegerField(_("Authority SRID"))
    class Meta:
        verbose_name = _("Geographic - Spatial reference system")
        verbose_name_plural = _("Geographic - Spatial reference systems")
        ordering = (
            "order",
            "label",
        )
    @classmethod
    def get_documentation_string(cls):
        """
        Used for automatic documentation generation
        """
        doc = super(SpatialReferenceSystem, cls).get_documentation_string()
        doc += ", **srid** {}, **auth_name** {}".format(
            _("Authority SRID"), _("Authority name")
        )
        return doc
post_save.connect(post_save_cache, sender=SpatialReferenceSystem)
post_delete.connect(post_save_cache, sender=SpatialReferenceSystem)
class GeoOriginType(HierarchicalType):
    """
    ex: topographical surveys, georeferencing, ...
    """
    order = models.IntegerField(_("Order"), default=10)
    class Meta:
        verbose_name = _("Geographic - Origin type")
        verbose_name_plural = _("Geographic - Origin types")
        ordering = (
            "order",
            "label",
        )
class GeoDataType(HierarchicalType):
    """
    ex: outline, z-sup, ...
    """
    order = models.IntegerField(_("Order"), default=10)
    class Meta:
        verbose_name = _("Geographic - Data type")
        verbose_name_plural = _("Geographic - Data types")
        ordering = (
            "order",
            "label",
        )
class GeoProviderType(HierarchicalType):
    """
    ex: GeoNames, IGN, ...
    """
    order = models.IntegerField(_("Order"), default=10)
    class Meta:
        verbose_name = _("Geographic - Provider type")
        verbose_name_plural = _("Geographic - Provider types")
        ordering = (
            "order",
            "label",
        )
class GeoBufferType(GeneralType):
    order = models.IntegerField(_("Order"), default=10)
    class Meta:
        verbose_name = _("Geographic - Buffer type")
        verbose_name_plural = _("Geographic - Buffer types")
        ordering = (
            "order",
            "label",
        )
GEOJSON_POINT_TPL = {
    "type": "FeatureCollection",
    "features": [
        {
            "type": "Feature",
            "geometry": {
                "type": "Point",
                "coordinates": []
            },
            "properties": {
            }
        }
    ]
}
GEOMETRY_TYPE_LBL = {
    "POINT": _("Point"),
    "MULTILINE": _("Line(s)"),
    "MULTIPOINTS": _("Point(s)"),
    "MULTIPOLYGON": _("Polygon(s)"),
}
GEOTYPE_TO_GEOVECTOR = {
    # key: (geom attr, need list convert)
    "Point": ("point_2d", False),
    "LineString": ("multi_line", True),
    "Polygon": ("multi_polygon", True),
    "MultiPoint": ("multi_points", False),
    "MultiLineString": ("multi_line", False),
    "MultiPolygon": ("multi_polygon", False),
}
class GeoVectorData(Imported, OwnPerms):
    SLUG = "geovectordata"
    RELATED_MODELS = [
        "related_items_ishtar_common_town",
        "related_items_archaeological_operations_operation",
        "related_items_archaeological_operations_archaeologicalsite",
        "related_items_archaeological_context_records_contextrecord",
        "related_items_archaeological_finds_basefind",
        "related_items_archaeological_warehouse_warehouse",
        "related_items_archaeological_warehouse_container",
    ]
    buffer = models.FloatField(
        _("Buffer"), blank=True, null=True
    )
    buffer_type = models.ForeignKey(GeoBufferType, blank=True, null=True,
                                    on_delete=models.CASCADE)
    need_update = models.BooleanField(_("Need update"), default=False)
    name = models.TextField(_("Name"), default="-")
    source_content_type = models.ForeignKey(
        ContentType, related_name="content_type_geovectordata", on_delete=models.CASCADE
    )
    source_id = models.PositiveIntegerField()
    source = GenericForeignKey("source_content_type", "source_id")
    import_key = models.TextField(_("Import key"), blank=True, null=True,
                                  help_text=_("Use this for update imports"))
    origin = models.ForeignKey(
        GeoOriginType,
        blank=True,
        null=True,
        on_delete=models.PROTECT,
        verbose_name=_("Origin"),
        help_text=_("For instance: topographical survey, georeferencing, ..."),
    )
    data_type = models.ForeignKey(
        GeoDataType,
        blank=True,
        null=True,
        on_delete=models.PROTECT,
        verbose_name=_("Data type"),
        help_text=_("For instance: outline, z-sup, ..."),
    )
    provider = models.ForeignKey(
        GeoProviderType,
        blank=True,
        null=True,
        on_delete=models.PROTECT,
        verbose_name=_("Provider"),
        help_text=_("Data provider"),
    )
    comment = models.TextField(_("Comment"), default="", blank=True)
    x = models.FloatField(_("X"), blank=True, null=True, help_text=_("User input"))
    y = models.FloatField(_("Y"), blank=True, null=True, help_text=_("User input"))
    z = models.FloatField(_("Z"), blank=True, null=True, help_text=_("User input"))
    # x == cached_x if user input else get it from other sources
    # cached is converted to the display SRID
    cached_x = models.FloatField(_("X (cached)"), blank=True, null=True)
    cached_y = models.FloatField(_("Y (cached)"), blank=True, null=True)
    cached_z = models.FloatField(_("Z (cached)"), blank=True, null=True)
    estimated_error_x = models.FloatField(
        _("Estimated error for X"), blank=True, null=True
    )
    estimated_error_y = models.FloatField(
        _("Estimated error for Y"), blank=True, null=True
    )
    estimated_error_z = models.FloatField(
        _("Estimated error for Z"), blank=True, null=True
    )
    spatial_reference_system = models.ForeignKey(
        SpatialReferenceSystem,
        verbose_name=_("Spatial Reference System"),
        blank=True,
        null=True,
        on_delete=models.PROTECT,
    )
    point_2d = models.PointField(_("Point (2D)"), blank=True, null=True)
    point_3d = models.PointField(_("Point (3D)"), blank=True, null=True, dim=3)
    multi_points = models.MultiPointField(_("Multi points"), blank=True, null=True)
    multi_line = models.MultiLineStringField(_("Multi lines"), blank=True, null=True)
    multi_polygon = models.MultiPolygonField(_("Multi polygons"), blank=True, null=True)
    class Meta:
        verbose_name = _("Geographic - Vector data")
        verbose_name_plural = _("Geographic - Vector data")
        unique_together = ("source_content_type", "source_id", "import_key")
        permissions = (
            ("view_own_geovectordata", "Can view own Geographic - Vector data"),
            ("add_own_geovectordata", "Can add own Geographic - Vector data"),
            ("change_own_geovectordata", "Can change own Geographic - Vector data"),
            ("delete_own_geovectordata", "Can delete own Geographic - Vector data"),
        )
    def __str__(self):
        name = self.name
        if not name or name == "-":
            for related_model in self.RELATED_MODELS:
                q = getattr(self, related_model)
                cached_label_key = "cached_label"
                if getattr(q.model, "GEO_LABEL", None):
                    cached_label_key = q.model.GEO_LABEL
                if q.count():  # arbitrary return the first item
                    name = str(q.values_list(cached_label_key, flat=True).all()[0])
                    break
        if self.data_type:
            name += f" ({str(self.data_type).lower()})"
        return name
    def is_own(self, ishtaruser, alt_query_own=None):
        ct = self.source_content_type
        model = apps.get_model(ct.app_label, ct.model)
        if not hasattr(model, "_get_query_owns_dicts"):
            return False
        sub_q = model.get_query_owns(ishtaruser)
        if not sub_q:
            return False
        return self.source_id in list(
            model.objects.filter(sub_q).values_list("id", flat=True)
        )
    SERIALIZE_EXCLUDE = []
    def full_serialize(self, search_model, recursion=False):
        dct = {}
        fields = [
            "id",
            "buffer",
            "buffer_type",
            "name",
            "origin",
            "data_type",
            "provider",
            "comment",
            "spatial_reference_system",
            "source",
            "estimated_error_x",
            "estimated_error_y",
            "estimated_error_z",
        ]
        for field_name in fields:
            value = getattr(self, field_name)
            dct[field_name] = str(value) if value else ""
        dct["geojson"] = self.geojson
        return dct
    @classmethod
    def get_query_owns(cls, ishtaruser):
        q = None
        for app_label, model_name in (
                ("archaeological_operations", "Operation"),
                ("archaeological_operations", "ArchaeologicalSite"),
                ("archaeological_context_records", "ContextRecord"),
                ("archaeological_finds", "BaseFind"),
                ("archaeological_warehouse", "Warehouse"),
                ("archaeological_warehouse", "Container"),
        ):
            model = apps.get_model(app_label, model_name)
            sub_q = cls._construct_query_own(
                "", model._get_query_owns_dicts(ishtaruser)
            )
            q2 = Q(
                source_id__in=list(sub_q.values_list("id", flat=True)),
                source_content_type__app_label=app_label,
                source_content_type__model=model_name.lower(),
            )
            if not q:
                q = q2
            else:
                q |= q2
        return q
    @property
    def source_label(self):
        return str(self.source)
    def display_coordinates(self, rounded=5, dim=2, srid=None, cache=True):
        if not srid:
            if self.spatial_reference_system and self.spatial_reference_system.srid:
                srid = self.spatial_reference_system.srid
            else:
                profile = get_current_profile()
                if profile.display_srs and profile.display_srs.srid:
                    srid = profile.display_srs.srid
        if not srid:
            srid = 4326
        return self.get_coordinates(rounded=rounded, srid=srid, dim=dim, cache=cache)
    def get_coordinates(self, rounded=5, srid: int = None, dim=2, cache=False):
        if dim not in (2, 3):
            raise ValueError(_("Only 2 or 3 dimensions"))
        if cache and srid == 4326:
            coordinates = [self.cached_x, self.cached_y]
            if dim == 3:
                coordinates.append(self.cached_z)
        else:
            if self.x or self.y:  # user input
                if not srid or not self.spatial_reference_system or \
                        srid == self.spatial_reference_system.srid:
                    coordinates = [self.x, self.y]
                    if dim == 3:
                        coordinates.append(self.z)
                    if not self.spatial_reference_system:
                        q = SpatialReferenceSystem.objects.filter(srid=4326)
                        if q.count():
                            self.spatial_reference_system = q.all()[0]
                else:
                    args = {
                        "x": self.x,
                        "y": self.y,
                        "srid": self.spatial_reference_system.srid,
                    }
                    if dim == 3:
                        args["z"] = self.z
                    point = Point(**args).transform(srid, clone=True)
                    coordinates = [point.x, point.y]
                    if srid == 4326:
                        coordinates = list(reversed(coordinates))
                    if dim == 3:
                        coordinates.append(point.z)
            else:
                if self.point_2d and dim == 2:
                    geom = self.point_2d
                elif self.point_3d and dim == 3:
                    geom = self.point_3d
                elif self.multi_points:
                    geom = self.multi_points.centroid
                elif self.multi_line:
                    geom = self.multi_line.centroid
                elif self.multi_polygon:
                    geom = self.multi_polygon.centroid
                else:
                    if dim == 2:
                        return [None, None]
                    return [None, None, self.z or None]
                point = geom
                if not srid or srid != geom.srid:
                    point = geom.transform(srid, clone=True)
                    x, y = point.x, point.y
                else:
                    x, y = point.x, point.y
                if dim == 2:
                    coordinates = [x, y]
                else:
                    coordinates = [x, y, point.z]
        if not rounded:
            return coordinates
        return [round(coord or 0, rounded) for coord in coordinates]
    def get_coordinates_from_polygon(self, rounded=5, srid: int = None):
        if self.multi_polygon:
            return self.convert_coordinates(
                self.multi_polygon.centroid, rounded=rounded, srid=srid
            )
    def get_x(self, srid: int = None) -> float:
        coord = self.get_coordinates(srid)
        if coord:
            return coord[0]
    def get_y(self, srid: int = None) -> float:
        coord = self.get_coordinates(srid)
        if coord:
            return coord[1]
    def get_z(self, srid: int = None) -> float:
        coord = self.get_coordinates(srid, dim=3)
        if coord:
            return coord[2]
    @property
    def display_spatial_reference_system(self):
        if self.spatial_reference_system:
            return self.spatial_reference_system
        profile = get_current_profile()
        return profile.srs
    @classmethod
    def _get_geo_item_list(cls, q, current_geodata, url, precision, rounded):
        collection_id = []
        items_id = []
        q = q.values("main_geodata", "id")
        for item in q.distinct().all():
            geodata_id = item["main_geodata"]
            if geodata_id not in current_geodata:
                collection_id.append(geodata_id)
                items_id.append(item["id"])
                current_geodata.append(geodata_id)
        collection = []
        for idx in range(len(collection_id)):
            geo = json.loads(GeoVectorData.objects.get(pk=collection_id[idx]).geojson)
            geo_type = geo.get("type", None)
            url_geo = url.format(items_id[idx])
            if geo_type == "FeatureCollection":
                for feat in geo["features"]:
                    if "properties" in feat:
                        feat["properties"]["url"] = url_geo
                collection += geo["features"]
            elif geo_type:
                if "properties" in geo:
                    geo["properties"]["url"] = url_geo
                collection.append(geo)
        if not precision and rounded:
            precision = 6
        r = re.compile(r"(\d+)\.(\d{6})(\d*)")
        new_collection = []
        for feat in collection:
            geom_type = feat["geometry"].get("type", None)
            if geom_type == "Point":
                if precision is not None:
                    feat["geometry"]["coordinates"] = [
                        round(feat["geometry"]["coordinates"][0], precision),
                        round(feat["geometry"]["coordinates"][1], precision),
                    ]
                if not (-90 <= feat["geometry"]["coordinates"][1] <= 90) or not (
                        -180 <= feat["geometry"]["coordinates"][0] <= 180):
                    # probably a bad projection
                    continue
            new_collection.append(feat)
        return new_collection
    def get_geo_items(self, rounded=5):
        dct = {"type": "Feature", "geometry": {},
               "properties": {"label": str(self)}}
        if self.multi_polygon:
            list_coords = []
            for polygon in self.multi_polygon:
                list_coords.append([])
                for linear_ring in range(len(polygon)):
                    list_coords[-1].append([])
                    for coords in polygon[linear_ring].coords:
                        point_2d = Point(
                            coords[0], coords[1], srid=self.multi_polygon.srid
                        )
                        list_coords[-1][linear_ring].append(
                            self.convert_coordinates(point_2d, rounded)
                        )
            dct["geometry"]["type"] = "MultiPolygon"
            dct["geometry"]["coordinates"] = list_coords
        elif self.multi_points:
            list_coords = []
            for coords in self.multi_points:
                point_2d = Point(
                    coords.x, coords.y, srid=self.multi_points.srid
                )
                list_coords.append(
                    self.convert_coordinates(point_2d, rounded)
                )
            dct["geometry"]["type"] = "MultiPoint"
            dct["geometry"]["coordinates"] = list_coords
        elif self.multi_line:
            list_coords = []
            for idx, line in enumerate(self.multi_line):
                if not idx:
                    list_coords.append([])
                for coords in line:
                    point_2d = Point(
                        coords.x, coords.y, srid=self.multi_line.srid
                    )
                    list_coords[-1].append(
                        self.convert_coordinates(point_2d, rounded)
                    )
            dct["geometry"]["type"] = "MultiLine"
            dct["geometry"]["coordinates"] = list_coords
        else:
            dct["geometry"]["type"] = "Point"
            coords = self.display_coordinates(srid=4326)
            if coords:
                dct["geometry"]["coordinates"] = coords
            else:
                return {}
        return dct
    def convert_coordinates(self, point_2d, rounded=5, srid=None):
        if not srid:
            profile = get_current_profile()
            if profile.display_srs and profile.display_srs.srid:
                srid = profile.display_srs.srid
        if not srid:
            srid = 4326
        point = point_2d.transform(srid, clone=True)
        x, y = point.x, point.y
        if rounded:
            return [round(x, rounded), round(y, rounded)]
        return [x, y]
    def _geojson_serialize(self, geom_attr):
        if not hasattr(self, geom_attr):
            return "{}"
        geojson = serialize(
            "geojson",
            self.__class__.objects.filter(pk=self.pk),
            geometry_field=geom_attr,
            fields=("name",),
        )
        geojson_dct = json.loads(geojson)
        return self._geojson_base_serialize(geojson_dct)
    def _geojson_base_serialize(self, geojson_dct):
        profile = get_current_profile()
        precision = profile.point_precision
        features = geojson_dct.pop("features")
        for idx in range(len(features)):
            feature = features[idx]
            lbl = self.name or ""
            feature["properties"]["name"] = lbl
            feature["properties"]["id"] = self.pk
            if precision is not None:
                geom_type = feature["geometry"].get("type", None)
                if geom_type == "Point":
                    feature["geometry"]["coordinates"] = [
                        round(coord, precision)
                        for coord in feature["geometry"]["coordinates"]
                    ]
        geojson_dct["features"] = features
        try:
            geojson_dct["link_template"] = simple_link_to_window(self).replace(
                "999999", ""
            )
        except NoReverseMatch:
            pass
        geojson = json.dumps(geojson_dct)
        return geojson
    @property
    def point_2d_geojson(self):
        return self._geojson_serialize("point_2d")
    @property
    def multi_polygon_geojson(self):
        return self._geojson_serialize("multi_polygon")
    @property
    def geometry_type(self):
        if self.x or self.y or self.z or self.point_2d or self.point_3d:
            return "POINT"
        if self.multi_line:
            return "MULTILINE"
        if self.multi_points:
            return "MULTIPOINTS"
        if self.multi_polygon:
            return "MULTIPOLYGON"
        return ""
    @property
    def geometry_type_label(self):
        return GEOMETRY_TYPE_LBL.get(self.geometry_type, "")
    @property
    def geojson(self):
        if self.x or self.y or self.z:
            geo = GEOJSON_POINT_TPL.copy()
            geo["features"][0]["geometry"]["coordinates"] = [
                self.cached_x, self.cached_y
            ]
            return self._geojson_base_serialize(geo)
        if self.point_2d:
            return self._geojson_serialize("point_2d")
        if self.point_3d:
            return self._geojson_serialize("point_3d")
        if self.multi_line:
            return self._geojson_serialize("multi_line")
        if self.multi_points:
            return self._geojson_serialize("multi_points")
        if self.multi_polygon:
            return self._geojson_serialize("multi_polygon")
        return "{}"
    def update_from_geojson(self, geojson: str, save=False) -> bool:
        """
        Update geometry from a geojson string
        :param geojson: geojson
        :param save: save the object if set to True
        :return: True if update
        """
        if not geojson:  # no data provided
            return False
        with fiona.open(geojson) as src:
            if len(src) != 1:
                # ambiguous -> need only one feature
                return False
            feature = src[0]
            geom = feature["geometry"]
            if geom["type"] not in GEOTYPE_TO_GEOVECTOR:
                # unknown geometry type
                return False
            feature_attr, need_list_convert = GEOTYPE_TO_GEOVECTOR[geom["type"]]
            if need_list_convert:
                geom["coordinates"] = [geom["coordinates"]]
            geom = json.dumps(geom)
            setattr(self, feature_attr, GEOSGeometry(geom))
            if save:
                self.save()
            geo_attrs = ["point_2d", "point_3d", "x", "y", "z", "multi_line",
                         "multi_points", "multi_polygon"]
            geo_attrs.pop(geo_attrs.index(feature_attr))
            for geo_attr in geo_attrs:
                setattr(self, geo_attr, None)
            if save:
                self.save()
        return True
    @classmethod
    def migrate_srid(cls, new_srid):
        fields = (
            "point_2d", "point_3d", "multi_points", "multi_line", "multi_polygon",
        )
        with connection.cursor() as cursor:
            for name in fields:
                cursor.execute(
                    "UPDATE ishtar_common_geovectordata SET %s=ST_SetSRID(%s, %s);",
                    [name, name, new_srid]
                )
        items = cls.objects.all()
        for item in items:
            for name in fields:
                getattr(item, name).transform(new_srid)
            item._no_geo_check = True
            item.save()
post_save.connect(post_save_geodata, sender=GeoVectorData)
def geodata_attached_post_add(model, instance, pk_set):
    item_pks = list(model.objects.filter(pk__in=pk_set).values_list("pk", flat=True))
    if not item_pks:
        return
    # use a cache to manage during geodata attach
    if not hasattr(instance, "_geodata"):
        instance._geodata = []
    if not instance.main_geodata_id:
        instance.main_geodata_id = item_pks[0]
        instance.skip_history_when_saving = True
        instance._no_move = True
        if not hasattr(instance, "_geodata"):
            instance._geodata = []
        instance._geodata += [pk for pk in item_pks if pk not in instance._geodata]
        instance.save()
    # for all sub item verify that the geo items are present
    for query in instance.geodata_child_item_queries():
        child_model = query.model
        m2m_model = child_model.geodata.through
        m2m_key = f"{child_model._meta.model_name}_id"
        geoitems = {}
        for child_id in query.values_list("id", flat=True):
            child = None
            for pk in item_pks:
                q = m2m_model.objects.filter(**{m2m_key: child_id,
                                                "geovectordata_id": pk})
                if not q.count():
                    if not child:
                        child = model.objects.get(pk=pk)
                    if pk not in geoitems:
                        geoitems[pk] = GeoVectorData.objects.get(pk=pk)
                    child_model.objects.get(pk=child_id).geodata.add(geoitems[pk])
def geodata_attached_remove(model, instance, pk_set=None, clear=False):
    if clear:
        item_pks = getattr(instance, "_geodata_clear_item_pks", [])
    else:
        item_pks = list(model.objects.filter(pk__in=pk_set).values_list("pk", flat=True))
    if not item_pks:
        return
    # use a cache to manage during geodata attach
    if instance.main_geodata_id in item_pks:
        instance.main_geodata_id = None
        instance.skip_history_when_saving = True
        instance._no_move = True
        instance.save()
    # for all sub item verify that the geo items are present
    for query in instance.geodata_child_item_queries():
        child_model = query.model
        m2m_model = child_model.geodata.through
        m2m_key = f"{child_model._meta.model_name}_id"
        geoitems = {}
        for child_id in query.values_list("id", flat=True):
            child = None
            for pk in item_pks:
                q = m2m_model.objects.filter(**{m2m_key: child_id,
                                                "geovectordata_id": pk})
                if q.count():
                    if not child:
                        child = model.objects.get(pk=pk)
                    if pk not in geoitems:
                        geoitems[pk] = GeoVectorData.objects.get(pk=pk)
                    child_model.objects.get(pk=child_id).geodata.remove(geoitems[pk])
def geodata_attached_changed(sender, **kwargs):
    if getattr(settings, "ISHTAR_MIGRATE_V4", False):
        # disable on first migration
        return
    # manage main geoitem and cascade association
    profile = get_current_profile()
    if not profile.mapping:
        return
    instance = kwargs.get("instance", None)
    model = kwargs.get("model", None)
    pk_set = kwargs.get("pk_set", None)
    action = kwargs.get("action", None)
    if not instance or not model:
        return
    if model != GeoVectorData:  # reverse post attributes
        instance_pk = instance.pk
        instance = model.objects.get(pk=list(pk_set)[0])
        model = GeoVectorData
        pk_set = {instance_pk}
    if action == "post_add":
        geodata_attached_post_add(model, instance, pk_set)
    elif action == "post_remove":
        geodata_attached_remove(model, instance, pk_set)
    elif action == "pre_clear":
        instance._geodata_clear_item_pks = list(
            instance.geodata.values_list("id", flat=True)
        )
    elif action == "post_clear":
        geodata_attached_remove(model, instance, clear=True)
class GeographicItem(models.Model):
    main_geodata = models.ForeignKey(
        GeoVectorData,
        on_delete=models.SET_NULL,
        blank=True,
        null=True,
        related_name="main_related_items_%(app_label)s_%(class)s",
        verbose_name=_("Main geodata")
    )
    geodata = models.ManyToManyField(
        GeoVectorData, blank=True, related_name="related_items_%(app_label)s_%(class)s",
        verbose_name=_("Geodata")
    )
    ALT_NAMES = {
        "geodata__name": SearchAltName(
            pgettext_lazy("key for text search", "geo-name"), "geodata__name__iexact"
        ),
        "geodata__data_type": SearchAltName(
            pgettext_lazy("key for text search", "geo-type"), "geodata__data_type__label__iexact"
        ),
        "geodata__origin": SearchAltName(
            pgettext_lazy("key for text search", "geo-origin"), "geodata__origin__label__iexact"
        ),
        "geodata__provider": SearchAltName(
            pgettext_lazy("key for text search", "geo-provider"), "geodata__provider__label__iexact"
        ),
        "geodata__comment": SearchAltName(
            pgettext_lazy("key for text search", "geo-comment"), "geodata__comment__iexact"
        ),
    }
    @classmethod
    def ALT_NAMES_FOR_FIND(cls):
        dct = {}
        for k in cls.ALT_NAMES:
            sa = cls.ALT_NAMES[k]
            dct[k] = SearchAltName(sa.search_key, "base_finds__" + sa.search_query)
        return dct
    class Meta:
        abstract = True
    def get_add_geo_action(self):
        return (
            reverse("create-pre-geo", args=[
                self.__class__._meta.app_label,
                self.__class__._meta.model_name,
                self.pk
            ]),
            _("Add geographic item"),
            "fa fa-plus",
            _("geo."),
            "",
            False
        )
    def geodata_child_item_queries(self):
        """
        :return: list of queries associated geographically with this item. When
        geographic data is add to this item all sub items get the geographic data.
        For instance an operation return the list of context records associated, so
        when you add the syrvey limit, it is associated to all context records of
        the operation.
        """
        return []
    def save(
        self, force_insert=False, force_update=False, using=None, update_fields=None
    ):
        super(GeographicItem, self).save(
            force_insert=force_insert,
            force_update=force_update,
            using=using,
            update_fields=update_fields,
        )
        if self.main_geodata and not self.geodata.filter(
                pk=self.main_geodata.pk).count():
            try:
                with transaction.atomic():
                    self.geodata.add(self.main_geodata)
            except (OperationalError, IntegrityError):
                pass
        elif not self.main_geodata and self.geodata.count():
            # arbitrary associate the first to geodata
            self.main_geodata = self.geodata.order_by("pk").all()[0]
            self.skip_history_when_saving = True
            self._no_move = True
            self.save()
    @property
    def geodata_list(self):
        lst = []
        if self.main_geodata:
            lst.append(self.main_geodata)
        for geo in self.geodata.all():
            if geo != self.main_geodata:
                lst.append(geo)
        return lst
class TownManager(models.Manager):
    def get_by_natural_key(self, numero_insee, year):
        return self.get(numero_insee=numero_insee, year=year)
class Town(GeographicItem, Imported, models.Model):
    name = models.CharField(_("Name"), max_length=100)
    surface = models.IntegerField(_("Surface (m2)"), blank=True, null=True)
    center = models.PointField(
        _("Localisation"), srid=settings.SRID, blank=True, null=True
    )
    limit = models.MultiPolygonField(_("Limit"), blank=True, null=True)
    numero_insee = models.CharField("Code commune (numéro INSEE)", max_length=120)
    departement = models.ForeignKey(
        Department,
        verbose_name=_("Department"),
        on_delete=models.SET_NULL,
        null=True,
        blank=True,
    )
    year = models.IntegerField(
        _("Year of creation"),
        null=True,
        blank=True,
        help_text=_(
            "Filling this field is relevant to distinguish old towns " "from new towns."
        ),
    )
    children = models.ManyToManyField(
        "Town", verbose_name=_("Town children"), blank=True, related_name="parents"
    )
    cached_label = models.CharField(
        _("Cached name"), max_length=500, null=True, blank=True, db_index=True
    )
    objects = TownManager()
    class Meta:
        verbose_name = _("Town")
        verbose_name_plural = _("Towns")
        if settings.COUNTRY == "fr":
            ordering = ["numero_insee"]
            unique_together = (("numero_insee", "year"),)
    def natural_key(self):
        return (self.numero_insee, self.year)
    def history_compress(self):
        return {"numero_insee": self.numero_insee, "year": self.year or ""}
    @classmethod
    def get_documentation_string(cls):
        """
        Used for automatic documentation generation
        """
        return "**name** {}, **numero_insee** {}, **cached_label** {}".format(
            _("Name"), "Code commune (numéro INSEE)", _("Cached name")
        )
    def get_values(self, prefix="", **kwargs):
        return {
            prefix or "label": str(self),
            prefix + "name": self.name,
            prefix + "numero_insee": self.numero_insee,
        }
    @classmethod
    def history_decompress(cls, full_value, create=False):
        if not full_value:
            return []
        res = []
        for value in full_value:
            try:
                res.append(
                    cls.objects.get(
                        numero_insee=value["numero_insee"], year=value["year"] or None
                    )
                )
            except cls.DoesNotExist:
                continue
        return res
    def __str__(self):
        return self.cached_label or ""
    def geodata_child_item_queries(self):
        return [self.sites, self.operations]
    @property
    def label_with_areas(self):
        label = [self.name]
        if self.numero_insee:
            label.append("({})".format(self.numero_insee))
        for area in self.areas.all():
            label.append(" - ")
            label.append(area.full_label)
        return " ".join(label)
    def generate_geo(self, force=False):
        force = self.generate_limit(force=force)
        self.generate_center(force=force)
        self.generate_area(force=force)
    def generate_limit(self, force=False):
        if not force and self.limit:
            return
        parents = None
        if not self.parents.count():
            return
        for parent in self.parents.all():
            if not parent.limit:
                return
            if not parents:
                parents = parent.limit
            else:
                parents = parents.union(parent.limit)
        # if union is a simple polygon make it a multi
        if "MULTI" not in parents.wkt:
            parents = parents.wkt.replace("POLYGON", "MULTIPOLYGON(") + ")"
        if not parents:
            return
        self.limit = parents
        self.save()
        return True
    def generate_center(self, force=False):
        if not force and (self.center or not self.limit):
            return
        self.center = self.limit.centroid
        if not self.center:
            return False
        self.save()
        return True
    def generate_area(self, force=False):
        if not force and (self.surface or not self.limit):
            return
        try:
            surface = self.limit.transform(settings.SURFACE_SRID, clone=True).area
        except GDALException:
            return False
        if surface > 214748364 or not surface:
            return False
        self.surface = surface
        self.save()
        return True
    def update_town_code(self):
        if not self.numero_insee or not self.children.count() or not self.year:
            return
        old_num = self.numero_insee[:]
        numero = old_num.split("-")[0]
        base_insee = "{}-{}".format(numero, self.year)
        self.numero_insee = base_insee
        idx = 0
        while Town.objects.filter(
                year=self.year, numero_insee=self.numero_insee).exclude(
                pk=self.pk).count():
            idx += 1
            self.numero_insee = base_insee + "-" + str(idx)
        if self.numero_insee != old_num:
            return True
    def _generate_cached_label(self):
        cached_label = self.name
        if settings.COUNTRY == "fr" and self.numero_insee:
            dpt_len = 2
            if (
                self.numero_insee.startswith("97")
                or self.numero_insee.startswith("98")
                or self.numero_insee[0]
                not in ("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
            ):
                dpt_len = 3
            cached_label = "%s - %s" % (self.name, self.numero_insee[:dpt_len])
        if self.year and self.children.count():
            cached_label += " ({})".format(self.year)
        return cached_label
def post_save_town(sender, **kwargs):
    cached_label_changed(sender, **kwargs)
    town = kwargs["instance"]
    town.generate_geo()
    if town.update_town_code():
        town.save()
post_save.connect(post_save_town, sender=Town)
m2m_changed.connect(geodata_attached_changed, sender=Town.geodata.through)
def town_child_changed(sender, **kwargs):
    town = kwargs["instance"]
    if town.update_town_code():
        town.save()
m2m_changed.connect(town_child_changed, sender=Town.children.through)
def geotown_attached_changed(sender, **kwargs):
    # manage associated geoitem
    profile = get_current_profile()
    if not profile.mapping:
        return
    instance = kwargs.get("instance", None)
    model = kwargs.get("model", None)
    pk_set = kwargs.get("pk_set", None)
    action = kwargs.get("action", None)
    if not instance or not model or not hasattr(instance, "post_save_geo"):
        return
    instance._post_save_geo_ok = False
    if action in ("post_add", "post_remove", "post_clear"):
        instance.post_save_geo(save=True)
class Address(BaseHistorizedItem):
    FIELDS = (
        "address",
        "address_complement",
        "postal_code",
        "town",
        "precise_town",
        "country",
        "alt_address",
        "alt_address_complement",
        "alt_postal_code",
        "alt_town",
        "alt_country",
        "phone",
        "phone_desc",
        "phone2",
        "phone_desc2",
        "phone3",
        "phone_desc3",
        "raw_phone",
        "mobile_phone",
        "email",
        "alt_address_is_prefered",
    )
    address = models.TextField(_("Address"), blank=True, default="")
    address_complement = models.TextField(
        _("Address complement"), blank=True, default=""
    )
    postal_code = models.CharField(
        _("Postal code"), max_length=10, null=True, blank=True
    )
    town = models.CharField(_("Town (freeform)"), max_length=150, null=True, blank=True)
    precise_town = models.ForeignKey(
        Town,
        verbose_name=_("Town (precise)"),
        null=True,
        blank=True,
        on_delete=models.SET_NULL,
    )
    country = models.CharField(_("Country"), max_length=30, null=True, blank=True)
    alt_address = models.TextField(_("Other address: address"), blank=True, default="")
    alt_address_complement = models.TextField(
        _("Other address: address complement"), blank=True, default=""
    )
    alt_postal_code = models.CharField(
        _("Other address: postal code"), max_length=10, null=True, blank=True
    )
    alt_town = models.CharField(
        _("Other address: town"), max_length=70, null=True, blank=True
    )
    alt_country = models.CharField(
        _("Other address: country"), max_length=30, null=True, blank=True
    )
    phone = models.CharField(_("Phone"), max_length=32, null=True, blank=True)
    phone_desc = models.CharField(
        _("Phone description"), max_length=300, null=True, blank=True
    )
    phone2 = models.CharField(
        _("Phone description 2"), max_length=32, null=True, blank=True
    )
    phone_desc2 = models.CharField(
        _("Phone description 2"), max_length=300, null=True, blank=True
    )
    phone3 = models.CharField(_("Phone 3"), max_length=32, null=True, blank=True)
    phone_desc3 = models.CharField(
        _("Phone description 3"), max_length=300, null=True, blank=True
    )
    raw_phone = models.TextField(_("Raw phone"), blank=True, default="")
    mobile_phone = models.CharField(
        _("Mobile phone"), max_length=32, null=True, blank=True
    )
    email = models.EmailField(_("Email"), max_length=300, blank=True, null=True)
    alt_address_is_prefered = models.BooleanField(
        _("Alternative address is prefered"), default=False
    )
    history = HistoricalRecords(inherit=True)
    SUB_ADDRESSES = []
    class Meta:
        abstract = True
    def get_short_html_items(self):
        items = []
        if self.address:
            items.append("""{}""".format(self.address))
        if self.address_complement:
            items.append(
                """{}""".format(
                    self.address_complement
                )
            )
        if self.postal_code:
            items.append(
                """{}""".format(self.postal_code)
            )
        if self.precise_town:
            items.append(
                """{}""".format(self.precise_town.name)
            )
        elif self.town:
            items.append("""{}""".format(self.town))
        if self.country:
            items.append("""{}""".format(self.country))
        return items
    def get_short_html_detail(self):
        html = """"""
        items = self.get_short_html_items()
        if not items:
            items = [
                "{}".format(_("No associated address"))
            ]
        html += "".join(items)
        html += """
"""
        return html
    def get_town_centroid(self):
        if self.precise_town:
            return self.precise_town.center, self._meta.verbose_name
        for sub_address in self.SUB_ADDRESSES:
            sub_item = getattr(self, sub_address)
            if sub_item and sub_item.precise_town:
                return sub_item.precise_town.center, sub_item._meta.verbose_name
    def get_town_polygons(self):
        if self.precise_town:
            return self.precise_town.limit, self._meta.verbose_name
        for sub_address in self.SUB_ADDRESSES:
            sub_item = getattr(self, sub_address)
            if sub_item and sub_item.precise_town:
                return sub_item.precise_town.limit, sub_item._meta.verbose_name
    def get_attribute(self, attr):
        if self.town or self.precise_town:
            return getattr(self, attr)
        for sub_address in self.SUB_ADDRESSES:
            sub_item = getattr(self, sub_address)
            if not sub_item:
                continue
            if sub_item.town or sub_item.precise_town:
                return getattr(sub_item, attr)
        return getattr(self, attr)
    def get_address(self):
        return self.get_attribute("address")
    def get_address_complement(self):
        return self.get_attribute("address_complement")
    def get_postal_code(self):
        return self.get_attribute("postal_code")
    def get_town(self):
        return self.get_attribute("town")
    def get_precise_town(self):
        return self.get_attribute("precise_town")
    def get_country(self):
        return self.get_attribute("country")
    def simple_lbl(self):
        return str(self)
    def full_address(self):
        lbl = self.simple_lbl()
        if lbl:
            lbl += "\n"
        lbl += self.address_lbl()
        return lbl
    def address_lbl(self, list=False):
        lbls = []
        prefix = ""
        if self.alt_address_is_prefered:
            prefix = "alt_"
        if getattr(self, prefix + "address"):
            lbls.append((
                getattr(self, prefix + "address"),
                _("Address")
            ))
        if getattr(self, prefix + "address_complement"):
            lbls.append((
                getattr(self, prefix + "address_complement"),
                _("Address complement")
            ))
        postal_code = getattr(self, prefix + "postal_code")
        town = getattr(self, prefix + "town")
        if postal_code or town:
            lbls.append((
                " ".join([postal_code, town]),
                _("Postal code - Town")
            ))
        if self.phone:
            lbls.append((
                self.phone,
                _("Phone")
            ))
        if self.mobile_phone:
            lbls.append((
                self.mobile_phone,
                _("Mobile")
            ))
        if self.email:
            lbls.append((
                self.email,
                _("Email")
            ))
        if list:
            return lbls
        return "\n".join([
            value for value, lbl in lbls
        ])
    def address_lbl_list(self):
        return self.address_lbl(list=True)
class Merge(models.Model):
    merge_key = models.TextField(_("Merge key"), blank=True, null=True)
    merge_candidate = models.ManyToManyField("self", blank=True)
    merge_exclusion = models.ManyToManyField("self", blank=True)
    archived = models.NullBooleanField(default=False, blank=True, null=True)
    # 1 for one word similarity, 2 for two word similarity, etc.
    MERGE_CLEMENCY = None
    EMPTY_MERGE_KEY = "--"
    MERGE_ATTRIBUTE = "name"
    class Meta:
        abstract = True
    def generate_merge_key(self):
        if self.archived:
            return
        merge_attr = getattr(self, self.MERGE_ATTRIBUTE)
        self.merge_key = slugify(merge_attr if merge_attr else "")
        if not self.merge_key:
            self.merge_key = self.EMPTY_MERGE_KEY
        self.merge_key = self.merge_key
    def generate_merge_candidate(self):
        if self.archived:
            return
        if not self.merge_key:
            self.generate_merge_key()
            self.save(merge_key_generated=True)
        if not self.pk or self.merge_key == self.EMPTY_MERGE_KEY:
            return
        q = (
            self.__class__.objects.exclude(pk=self.pk)
            .exclude(merge_exclusion=self)
            .exclude(merge_candidate=self)
            .exclude(archived=True)
        )
        if not self.MERGE_CLEMENCY:
            q = q.filter(merge_key=self.merge_key)
        else:
            subkeys_front = "-".join(self.merge_key.split("-")[: self.MERGE_CLEMENCY])
            subkeys_back = "-".join(self.merge_key.split("-")[-self.MERGE_CLEMENCY :])
            q = q.filter(
                Q(merge_key__istartswith=subkeys_front)
                | Q(merge_key__iendswith=subkeys_back)
            )
        for item in q.all():
            self.merge_candidate.add(item)
    def save(self, *args, **kwargs):
        # prevent circular save
        merge_key_generated = False
        if "merge_key_generated" in kwargs:
            merge_key_generated = kwargs.pop("merge_key_generated")
        self.generate_merge_key()
        item = super(Merge, self).save(*args, **kwargs)
        if not merge_key_generated:
            self.merge_candidate.clear()
            self.generate_merge_candidate()
        return item
    def archive(self):
        self.archived = True
        self.save()
        self.merge_candidate.clear()
        self.merge_exclusion.clear()
    def merge(self, item, keep_old=False, exclude_fields=None):
        merge_model_objects(
            self, item, keep_old=keep_old, exclude_fields=exclude_fields
        )
        self.generate_merge_candidate()
def __get_stats_cache_values(model_name, model_pk):
    StatsCache = apps.get_model("ishtar_common", "StatsCache")
    q = StatsCache.objects.filter(model=model_name, model_pk=model_pk)
    nb = q.count()
    if nb >= 1:
        sc = q.all()[0]
        for extra in q.order_by("-id").all()[1:]:
            extra.delete()
    else:
        sc = StatsCache.objects.create(model=model_name, model_pk=model_pk)
    values = sc.values
    if not values:
        values = {}
    return sc, values
@task()
def _update_stats(app, model, model_pk, funcname):
    model_name = app + "." + model
    model = apps.get_model(app, model)
    try:
        item = model.objects.get(pk=model_pk)
    except model.DoesNotExist:
        return
    value = getattr(item, funcname)()
    sc, current_values = __get_stats_cache_values(model_name, model_pk)
    current_values[funcname] = value
    sc.values = current_values
    sc.update_requested = None
    sc.updated = datetime.datetime.now()
    sc.save()
def update_stats(statscache, item, funcname):
    if not settings.USE_BACKGROUND_TASK:
        current_values = statscache.values
        if not current_values:
            current_values = {}
        value = getattr(item, funcname)()
        current_values[funcname] = value
        statscache.values = current_values
        statscache.updated = datetime.datetime.now()
        statscache.save()
        return current_values
    now = datetime.datetime.now()
    app_name = item._meta.app_label
    model_name = item._meta.model_name
    statscache.update_requested = now.isoformat()
    statscache.save()
    _update_stats.delay(app_name, model_name, item.pk, funcname)
    return statscache.values
class DashboardFormItem:
    """
    Provide methods to manage statistics
    """
    def last_stats_update(self):
        model_name = self._meta.app_label + "." + self._meta.model_name
        StatsCache = apps.get_model("ishtar_common", "StatsCache")
        q = StatsCache.objects.filter(model=model_name, model_pk=self.pk).order_by(
            "-updated"
        )
        if not q.count():
            return
        return q.all()[0].updated
    def _get_or_set_stats(self, funcname, update=False, expected_type=None):
        model_name = self._meta.app_label + "." + self._meta.model_name
        StatsCache = apps.get_model("ishtar_common", "StatsCache")
        sc, __ = StatsCache.objects.get_or_create(model=model_name, model_pk=self.pk)
        if not update:
            values = sc.values
            if funcname not in values:
                if expected_type is not None:
                    return expected_type()
                return 0
        else:
            values = update_stats(sc, self, funcname)
        if funcname in values:
            values = values[funcname]
        else:
            values = 0
        if expected_type is not None and not isinstance(values, expected_type):
            return expected_type()
        return values
    @classmethod
    def get_periods(cls, slice="month", fltr={}, date_source="creation"):
        date_var = date_source + "_date"
        q = cls.objects.filter(**{date_var + "__isnull": False})
        if fltr:
            q = q.filter(**fltr)
        if slice == "year":
            return [
                res[date_var].year
                for res in list(q.values(date_var).annotate(Count("id")).order_by())
            ]
        elif slice == "month":
            return [
                (res[date_var].year, res[date_var].month)
                for res in list(q.values(date_var).annotate(Count("id")).order_by())
            ]
        return []
    @classmethod
    def get_by_year(cls, year, fltr={}, date_source="creation"):
        date_var = date_source + "_date"
        q = cls.objects.filter(**{date_var + "__isnull": False})
        if fltr:
            q = q.filter(**fltr)
        return q.filter(**{date_var + "__year": year}).order_by("pk").distinct("pk")
    @classmethod
    def get_by_month(cls, year, month, fltr={}, date_source="creation"):
        date_var = date_source + "_date"
        q = cls.objects.filter(**{date_var + "__isnull": False})
        if fltr:
            q = q.filter(**fltr)
        q = q.filter(**{date_var + "__year": year, date_var + "__month": month})
        return q.order_by("pk").distinct("pk")
    @classmethod
    def get_total_number(cls, fltr=None):
        q = cls.objects
        if fltr:
            q = q.filter(**fltr)
        return q.order_by("pk").distinct("pk").count()
class DocumentItem:
    ALT_NAMES = {
        "documents__image__isnull": SearchAltName(
            pgettext_lazy("key for text search", "has-image"),
            "documents__image__isnull",
        ),
        "documents__associated_url__isnull": SearchAltName(
            pgettext_lazy("key for text search", "has-url"),
            "documents__associated_url__isnull",
        ),
        "documents__associated_file__isnull": SearchAltName(
            pgettext_lazy("key for text search", "has-attached-file"),
            "documents__associated_file__isnull",
        ),
    }
    def documents_list(self) -> list:
        Document = apps.get_model("ishtar_common", "Document")
        return self.get_associated_main_item_list("documents", Document)
    def public_representation(self):
        images = []
        if getattr(self, "main_image", None):
            images.append(self.main_image.public_representation())
        images += [
            image.public_representation()
            for image in self.images_without_main_image.all()
        ]
        return {"images": images}
    @property
    def images(self):
        if not hasattr(self, "documents"):
            Document = apps.get_model("ishtar_common", "Document")
            return Document.objects.none()
        return (
            self.documents.filter(image__isnull=False).exclude(image="").order_by("pk")
        )
    @property
    def images_number(self):
        return self.images.count()
    @property
    def images_without_main_image(self):
        if not hasattr(self, "main_image") or not hasattr(self, "documents"):
            return self.images
        if not self.main_image:
            return (
                self.documents.filter(image__isnull=False)
                .exclude(image="")
                .order_by("pk")
            )
        return (
            self.documents.filter(image__isnull=False)
            .exclude(image="")
            .exclude(pk=self.main_image.pk)
            .order_by("pk")
        )
    @property
    def pdf_attached(self):
        for document in self.documents.filter(
            Q(associated_file__isnull=False) | Q(source__associated_file__isnull=False)
        ).all():
            return document.pdf_attached
    def get_extra_actions(self, request):
        """
        For sheet template: return "Add document / image" action
        """
        # url, base_text, icon, extra_text, extra css class, is a quick action
        try:
            actions = super(DocumentItem, self).get_extra_actions(request)
        except AttributeError:
            actions = []
        if not hasattr(self, "SLUG"):
            return actions
        can_add_doc = self.can_do(request, "add_document")
        if can_add_doc and (
            not hasattr(self, "is_locked") or not self.is_locked(request.user)
        ):
            actions += [
                (
                    reverse("create-document") + "?{}={}".format(self.SLUG, self.pk),
                    _("Add document/image"),
                    "fa fa-plus",
                    _("doc./image"),
                    "",
                    False,
                )
            ]
        return actions
def clean_duplicate_association(document, related_item, action):
    profile = get_current_profile()
    if not profile.clean_redundant_document_association or action != "post_add":
        return
    class_name = related_item.__class__.__name__
    if class_name not in ("Find", "ContextRecord", "Operation"):
        return
    if class_name == "Find":
        for cr in document.context_records.filter(
            base_finds__find__pk=related_item.pk
        ).all():
            document.context_records.remove(cr)
        for ope in document.operations.filter(
            context_record__base_finds__find__pk=related_item.pk
        ).all():
            document.operations.remove(ope)
        return
    if class_name == "ContextRecord":
        for ope in document.operations.filter(context_record__pk=related_item.pk).all():
            document.operations.remove(ope)
        if document.finds.filter(base_finds__context_record=related_item.pk).count():
            document.context_records.remove(related_item)
        return
    if class_name == "Operation":
        if document.context_records.filter(operation=related_item.pk).count():
            document.operations.remove(related_item)
            return
        if document.finds.filter(
            base_finds__context_record__operation=related_item.pk
        ).count():
            document.operations.remove(related_item)
        return
def document_attached_changed(sender, **kwargs):
    # associate a default main image
    instance = kwargs.get("instance", None)
    model = kwargs.get("model", None)
    pk_set = kwargs.get("pk_set", None)
    if not instance or not model:
        return
    if hasattr(instance, "documents"):
        items = [instance]
    else:
        if not pk_set:
            return
        try:
            items = [model.objects.get(pk=pk) for pk in pk_set]
        except model.DoesNotExist:
            return
    for item in items:
        clean_duplicate_association(instance, item, kwargs.get("action", None))
        for doc in item.documents.all():
            doc.regenerate_all_ids()
        q = item.documents.filter(image__isnull=False).exclude(image="")
        if item.main_image:
            if q.filter(pk=item.main_image.pk).count():
                return
            # the association has disappear not the main image anymore
            item.main_image = None
            item.skip_history_when_saving = True
            item.save()
        if not q.count():
            return
        # by default get the lowest pk
        item.main_image = q.order_by("pk").all()[0]
        item.skip_history_when_saving = True
        item.save()
class QuickAction:
    """
    Quick action available from tables
    """
    def __init__(
        self,
        url,
        icon_class="",
        text="",
        target=None,
        rights=None,
        module=None,
        is_popup=True,
    ):
        self.url = url
        self.icon_class = icon_class
        self.text = text
        self.rights = rights
        self.target = target
        self.module = module
        self.is_popup = is_popup
        assert self.target in ("one", "many", None)
    def is_available(self, user, session=None, obj=None):
        if self.module and not getattr(get_current_profile(), self.module):
            return False
        if not self.rights:  # no restriction
            return True
        if not user or not hasattr(user, "ishtaruser") or not user.ishtaruser:
            return False
        user = user.ishtaruser
        for right in self.rights:
            if user.has_perm(right, session=session, obj=obj):
                return True
        return False
    @property
    def rendered_icon(self):
        if not self.icon_class:
            return ""
        return "".format(self.icon_class)
    @property
    def base_url(self):
        if self.target is None:
            url = reverse(self.url)
        else:
            # put arbitrary pk for the target
            url = reverse(self.url, args=[0])
            url = url[:-2]  # all quick action url have to finish with the
            # pk of the selected item and a "/"
        return url
class DynamicRequest:
    def __init__(
        self,
        label,
        app_name,
        model_name,
        form_key,
        search_key,
        type_query,
        search_query,
    ):
        self.label = label
        self.form_key = form_key
        self.search_key = search_key
        self.app_name = app_name
        self.model_name = model_name
        self.type_query = type_query
        self.search_query = search_query
    def get_all_types(self):
        model = apps.get_app_config(self.app_name).get_model(self.model_name)
        return model.objects.filter(available=True)
    def get_form_fields(self):
        fields = {}
        for item in self.get_all_types().all():
            fields[self.form_key + "-" + item.txt_idx] = forms.CharField(
                label=str(self.label) + " " + str(item), required=False
            )
        return fields
    def get_extra_query(self, slug):
        return {self.type_query: slug}
    def get_alt_names(self):
        alt_names = {}
        for item in self.get_all_types().all():
            alt_names[self.form_key + "-" + item.txt_idx] = SearchAltName(
                self.search_key + "-" + item.txt_idx,
                self.search_query,
                self.get_extra_query(item.txt_idx),
                distinct_query=True,
            )
        return alt_names
class GeoItem(GeographicItem):
    # gis - to be removed
    GEO_SOURCE = (("T", _("Town")), ("P", _("Precise")), ("M", _("Polygon")))
    x = models.FloatField(_("X"), blank=True, null=True)
    y = models.FloatField(_("Y"), blank=True, null=True)
    z = models.FloatField(_("Z"), blank=True, null=True)
    estimated_error_x = models.FloatField(
        _("Estimated error for X"), blank=True, null=True
    )
    estimated_error_y = models.FloatField(
        _("Estimated error for Y"), blank=True, null=True
    )
    estimated_error_z = models.FloatField(
        _("Estimated error for Z"), blank=True, null=True
    )
    spatial_reference_system = models.ForeignKey(
        SpatialReferenceSystem,
        verbose_name=_("Spatial Reference System"),
        blank=True,
        null=True,
        on_delete=models.PROTECT,
    )
    point = models.PointField(_("Point"), blank=True, null=True, dim=3)
    point_2d = models.PointField(_("Point (2D)"), blank=True, null=True)
    point_source = models.CharField(
        _("Point source"), choices=GEO_SOURCE, max_length=1, blank=True, null=True
    )
    point_source_item = models.CharField(
        _("Point source item"), max_length=100, blank=True, null=True
    )
    multi_polygon = models.MultiPolygonField(_("Multi polygon"), blank=True, null=True)
    multi_polygon_source = models.CharField(
        _("Multi-polygon source"),
        choices=GEO_SOURCE,
        max_length=1,
        blank=True,
        null=True,
    )
    multi_polygon_source_item = models.CharField(
        _("Multi polygon source item"), max_length=100, blank=True, null=True
    )
    GEO_LABEL = ""
    class Meta:
        abstract = True
    def get_town_centroid(self):
        raise NotImplementedError
    def get_town_polygons(self):
        raise NotImplementedError
    @property
    def X(self):
        """x coordinates using the default SRS"""
        coord = self.display_coordinates
        if not coord:
            return
        return coord[0]
    @property
    def Y(self):
        """y coordinates using the default SRS"""
        coord = self.display_coordinates
        if not coord:
            return
        return coord[1]
    @property
    def display_coordinates(self, rounded=True):
        if not self.main_geodata:
            return ""
        return self.main_geodata.display_coordinates(rounded=rounded)
    @property
    def display_spatial_reference_system(self):
        profile = get_current_profile()
        if not profile.display_srs or not profile.display_srs.srid:
            return self.spatial_reference_system
        return profile.display_srs
    def get_precise_points(self):
        if self.point_source == "P" and self.point_2d:
            return self.point_2d, self.point, self.point_source_item
    def get_precise_polygons(self):
        if self.multi_polygon_source == "P" and self.multi_polygon:
            return self.multi_polygon, self.multi_polygon_source_item
    def get_geo_items(self, rounded=5):
        if not self.main_geodata:
            return {}
        return self.main_geodata.get_geo_items(rounded)
    def convert_coordinates(self, point_2d, rounded):
        profile = get_current_profile()
        if (
            not profile.display_srs
            or not profile.display_srs.srid
            or (
                profile.display_srs == self.spatial_reference_system
                and point_2d.x
                and point_2d.y
            )
        ):
            x, y = point_2d.x, point_2d.y
        else:
            point = point_2d.transform(profile.display_srs.srid, clone=True)
            x, y = point.x, point.y
        if rounded:
            return [round(x, 5), round(y, 5)]
        return [x, y]
    def _geojson_serialize(self, geom_attr):
        if not hasattr(self, geom_attr):
            return ""
        cached_label_key = "cached_label"
        if self.GEO_LABEL:
            cached_label_key = self.GEO_LABEL
        if getattr(self, "CACHED_LABELS", None):
            cached_label_key = self.CACHED_LABELS[-1]
        geojson = serialize(
            "geojson",
            self.__class__.objects.filter(pk=self.pk),
            geometry_field=geom_attr,
            fields=(cached_label_key,),
        )
        geojson_dct = json.loads(geojson)
        profile = get_current_profile()
        precision = profile.point_precision
        features = geojson_dct.pop("features")
        for idx in range(len(features)):
            feature = features[idx]
            lbl = feature["properties"].pop(cached_label_key)
            feature["properties"]["name"] = lbl
            feature["properties"]["id"] = self.pk
            if precision is not None:
                geom_type = feature["geometry"].get("type", None)
                if geom_type == "Point":
                    feature["geometry"]["coordinates"] = [
                        round(coord, precision)
                        for coord in feature["geometry"]["coordinates"]
                    ]
        geojson_dct["features"] = features
        geojson_dct["link_template"] = simple_link_to_window(self).replace(
            "999999", ""
        )
        geojson = json.dumps(geojson_dct)
        return geojson
    @property
    def point_2d_geojson(self):
        return self._geojson_serialize("point_2d")
    @property
    def multi_polygon_geojson(self):
        return self._geojson_serialize("multi_polygon")
class ImageContainerModel:
    def _get_image_path(self, filename):
        return "{}/{}".format(self._get_base_image_path(), filename)
    def _get_base_image_path(self):
        n = datetime.datetime.now()
        return "upload/{}/{:02d}/{:02d}".format(n.year, n.month, n.day)
class CompleteIdentifierItem(models.Model, ImageContainerModel):
    HAS_QR_CODE = True
    complete_identifier = models.TextField(
        _("Complete identifier"), blank=True, default=""
    )
    custom_index = models.IntegerField("Custom index", blank=True, null=True)
    qrcode = models.ImageField(
        upload_to=get_image_path, blank=True, null=True, max_length=255
    )
    class Meta:
        abstract = True
    @property
    def qrcode_path(self):
        if not self.qrcode:
            self.generate_qrcode()
        if not self.qrcode:  # error on qrcode generation
            return ""
        return self.qrcode.path
    def generate_qrcode(self, request=None, secure=True, tmpdir=None):
        url = self.get_absolute_url()
        site = Site.objects.get_current()
        if request:
            scheme = request.scheme
        else:
            if secure:
                scheme = "https"
            else:
                scheme = "http"
        url = scheme + "://" + site.domain + url
        TinyUrl = apps.get_model("ishtar_common", "TinyUrl")
        tiny_url = TinyUrl()
        tiny_url.link = url
        tiny_url.save()
        short_url = (
            scheme
            + "://"
            + site.domain
            + reverse("tiny-redirect", args=[tiny_url.get_short_id()])
        )
        qr = pyqrcode.create(short_url, version=settings.ISHTAR_QRCODE_VERSION)
        tmpdir_created = False
        if not tmpdir:
            tmpdir = tempfile.mkdtemp("-qrcode")
            tmpdir_created = True
        filename = tmpdir + os.sep + "qrcode.png"
        qr.png(filename, scale=settings.ISHTAR_QRCODE_SCALE)
        self.skip_history_when_saving = True
        self._no_move = True
        self._no_geo_check = True
        with open(filename, "rb") as qrfile:
            self.qrcode.save("qrcode.png", File(qrfile))
        self.save()
        if tmpdir_created:
            shutil.rmtree(tmpdir)
    def generate_complete_identifier(self):
        SLUG = getattr(self, "SLUG", None)
        if not SLUG:
            return ""
        complete_identifier = get_generated_id(SLUG + "_complete_identifier", self)
        if complete_identifier:
            return complete_identifier
        cached_label_key = "cached_label"
        if getattr(self, "GEO_LABEL", None):
            cached_label_key = getattr(self, "GEO_LABEL", None)
        if hasattr(self, "CACHED_COMPLETE_ID"):
            cached_label_key = self.CACHED_COMPLETE_ID
        if not cached_label_key:
            return
        complete_identifier = getattr(self, cached_label_key)
        return complete_identifier
    def get_index_whole_db(self):
        q = self.__class__.objects.exclude(custom_index__isnull=True)
        q = q.order_by("-custom_index")
        if q.count():
            current_index = q.values_list("custom_index", flat=True).all()[0]
            return current_index + 1
        return 1
    def generate_custom_index(self, force=False):
        if not self.pk:
            return
        if self.custom_index and not force:
            return self.custom_index
        SLUG = getattr(self, "SLUG", None)
        if not SLUG:
            return
        k = SLUG + "_custom_index"
        profile = get_current_profile()
        if not hasattr(profile, k):
            return
        key = getattr(profile, k)
        if not key or not key.strip():
            return
        keys = key.strip().split(";")
        if len(keys) == 1 and hasattr(self, "get_index_" + keys[0]):
            # custom index generation
            return getattr(self, "get_index_" + key)()
        model = self.__class__
        try:
            self_keys = set(list(model.objects.filter(pk=self.pk).values_list(*keys)))
        except Exception:  # bad settings - not managed here
            print("Bad settings for custom_index {}".format(";".join(keys)))
            return
        if len(self_keys) != 1:  # key is not distinct
            return
        self_key = self_keys.pop()
        return self._get_index(keys, self_key)
    def _get_index(self, keys: list, self_keys: list):
        model = self.__class__
        q = model.objects
        if self.pk:
            q = model.objects.exclude(pk=self.pk)
        for idx, key in enumerate(keys):
            q = q.filter(**{key: self_keys[idx]})
        try:
            r = q.aggregate(max_index=Max("custom_index"))
        except Exception:  # bad settings
            return
        if not r["max_index"]:
            return 1
        return r["max_index"] + 1
    def save(self, *args, **kwargs):
        super(CompleteIdentifierItem, self).save(*args, **kwargs)
        self.regenerate_all_ids()
    def regenerate_all_ids(self):
        if getattr(self, "_prevent_loop", False):
            return
        modified = False
        custom_index = self.generate_custom_index()
        if custom_index != self.custom_index:
            modified = True
            self.custom_index = custom_index
        complete_id = self.generate_complete_identifier()
        if complete_id:
            modified = True
            self.complete_identifier = complete_id
        if modified:
            self._prevent_loop = True
            self.skip_history_when_saving = True
            self.save()
class SearchVectorConfig:
    def __init__(self, key, language=None, func=None):
        self.key = key
        if language:
            self.language = language
            if language == "local":
                self.language = settings.ISHTAR_SEARCH_LANGUAGE
        else:
            self.language = "simple"
        self.func = func
    def format(self, value):
        if value == "None":
            value = ""
        if not self.func:
            return [value]
        value = self.func(value)
        if not isinstance(value, list):
            return [value]
        return value
class ShortMenuItem:
    """
    Item available in the short menu
    """
    UP_MODEL_QUERY = {}
    @classmethod
    def get_short_menu_class(cls, pk):
        return ""
    @property
    def short_class_name(self):
        return ""
class SerializeItem:
    SERIALIZE_EXCLUDE = ["search_vector"]
    SERIALIZE_PROPERTIES = [
        "external_id",
        "multi_polygon_geojson",
        "point_2d_geojson",
        "images_number",
        "json_sections",
    ]
    SERIALIZE_CALL = {}
    SERIALIZE_DATES = []
    SERIALIZATION_FILES = []
    SERIALIZE_STRING = []
    def full_serialize(self, search_model=None, recursion=False) -> dict:
        """
        API serialization
        :return: data dict
        """
        full_result = {}
        serialize_fields = []
        exclude = []
        if search_model:
            exclude = [sf.key for sf in search_model.sheet_filters.distinct().all()]
        no_geodata = False
        for prop in ("main_geodata", "geodata", "geodata_list"):
            if prop in self.SERIALIZE_EXCLUDE or prop in exclude:
                no_geodata = True
                break
        for field in self._meta.get_fields():
            field_name = field.name
            if field_name in self.SERIALIZE_EXCLUDE or field_name in exclude:
                continue
            if field.many_to_one or field.one_to_one:
                try:
                    value = getattr(self, field_name)
                except (MultipleObjectsReturned, ObjectDoesNotExist):
                    value = None
                if value:
                    if (
                        field_name not in self.SERIALIZE_STRING
                        and hasattr(value, "full_serialize")
                        and not recursion
                    ):
                        # print(field.name, self.__class__, self)
                        if field_name == "main_geodata" and no_geodata:
                            continue
                        value = value.full_serialize(search_model, recursion=True)
                    elif field_name in self.SERIALIZATION_FILES:
                        try:
                            value = {"url": value.url}
                        except ValueError:
                            value = None
                    else:
                        value = str(value)
                else:
                    value = None
                full_result[field_name] = value
                if field_name == "main_geodata":
                    full_result["geodata_list"] = [value]
            elif field.many_to_many:
                values = getattr(self, field_name)
                if values.count():
                    first_value = values.all()[0]
                    if (
                        field_name not in self.SERIALIZE_STRING
                        and hasattr(first_value, "full_serialize")
                        and not recursion
                    ):
                        # print(field.name, self.__class__, self)
                        values = [
                            v.full_serialize(search_model, recursion=True)
                            for v in values.all()
                        ]
                    else:
                        if first_value in self.SERIALIZATION_FILES:
                            values = []
                            for v in values:
                                try:
                                    values.append({"url": v.url})
                                except ValueError:
                                    pass
                        else:
                            values = [str(v) for v in values.all()]
                else:
                    values = []
                full_result[field_name] = values
            else:
                if field_name in self.SERIALIZATION_FILES:
                    value = getattr(self, field_name)
                    try:
                        value = {"url": value.url}
                    except ValueError:
                        value = None
                    full_result[field.name] = value
                else:
                    serialize_fields.append(field_name)
        result = json.loads(serialize("json", [self], fields=serialize_fields))
        full_result.update(result[0]["fields"])
        for prop in self.SERIALIZE_PROPERTIES:
            if prop in self.SERIALIZE_EXCLUDE or prop in exclude:
                continue
            if hasattr(self, prop) and prop not in full_result:
                full_result[prop] = getattr(self, prop)
        if "point_2d_geojson" in full_result:
            full_result["point_2d"] = True
        if "multi_polygon_geojson" in full_result:
            full_result["multi_polygon"] = True
        for prop in self.SERIALIZE_DATES:
            if prop in self.SERIALIZE_EXCLUDE or prop in exclude:
                continue
            dt = getattr(self, prop) or ""
            if dt:
                dt = human_date(dt)
            full_result[prop] = dt
        for k in self.SERIALIZE_CALL:
            if k in self.SERIALIZE_EXCLUDE or k in exclude:
                continue
            full_result[k] = getattr(self, self.SERIALIZE_CALL[k])()
        full_result["SLUG"] = self.SLUG
        full_result["pk"] = f"external_{self.pk}"
        full_result["id"] = f"external_{self.id}"
        return full_result
    def get_associated_main_item_list(self, attr, model) -> list:
        items = getattr(self, attr)
        if not items.count():
            return []
        lst = []
        table_cols = model.TABLE_COLS
        if callable(table_cols):
            table_cols = table_cols()
        for colname in table_cols:
            if colname in model.COL_LABELS:
                lst.append(str(model.COL_LABELS[colname]))
            else:
                lst.append(model._meta.get_field(colname).verbose_name)
        lst = [lst]
        for values in items.values_list(*table_cols):
            lst.append(["-" if v is None else v for v in values])
        return lst
class MainItem(ShortMenuItem, SerializeItem):
    """
    Item with quick actions available from tables
    Extra actions are available from sheets
    """
    QUICK_ACTIONS = []
    SLUG = ""
    @classmethod
    def app_label(cls):
        return cls._meta.app_label
    @classmethod
    def model_name(cls):
        return cls._meta.model_name
    @classmethod
    def class_verbose_name(cls):
        return cls._meta.verbose_name
    @classmethod
    def get_columns(cls, table_cols_attr="TABLE_COLS", dict_col_labels=True):
        """
        :param table_cols_attr: "TABLE_COLS" if not specified
        :param dict_col_labels: (default: True) if set to False return list matching
        with table_cols list
        :return: (table_cols, table_col_labels)
        """
        return get_columns_from_class(cls, table_cols_attr=table_cols_attr,
                                      dict_col_labels=dict_col_labels)
    def get_search_url(self):
        if self.SLUG:
            return reverse(self.SLUG + "_search")
    @classmethod
    def get_quick_actions(cls, user, session=None, obj=None):
        """
        Get a list of (url, title, icon, target) actions for an user
        """
        qas = []
        for action in cls.QUICK_ACTIONS:
            if not action.is_available(user, session=session, obj=obj):
                continue
            qas.append(
                [
                    action.base_url,
                    mark_safe(action.text),
                    mark_safe(action.rendered_icon),
                    action.target or "",
                    action.is_popup,
                ]
            )
        return qas
    @classmethod
    def get_quick_action_by_url(cls, url):
        for action in cls.QUICK_ACTIONS:
            if action.url == url:
                return action
    def regenerate_external_id(self):
        if not hasattr(self, "external_id"):
            return
        self.skip_history_when_saving = True
        self._no_move = True
        self.external_id = ""
        self.auto_external_id = True
        self.save()
    def get_extra_actions(self, request):
        if not hasattr(self, "SLUG"):
            return []
        actions = []
        if request.user.is_superuser and hasattr(self, "auto_external_id"):
            actions += [
                (
                    reverse("regenerate-external-id")
                    + "?{}={}".format(self.SLUG, self.pk),
                    _("Regenerate ID"),
                    "fa fa-key",
                    _("regen."),
                    "btn-info",
                    True,
                    200,
                )
            ]
        return actions