#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Generic models and tools for models
"""
import copy
from collections import OrderedDict
import datetime
import json
import logging
import os
import pyqrcode
import shutil
import tempfile
import time
from django import forms
from django.apps import apps
from django.conf import settings
from django.contrib.auth.models import User, Group
from django.contrib.contenttypes.models import ContentType
from django.contrib.gis.db import models
from django.contrib.gis.geos import Point
from django.contrib.postgres.fields import JSONField
from django.contrib.postgres.search import SearchVectorField, SearchVector
from django.contrib.sites.models import Site
from django.core.cache import cache
from django.core.exceptions import ObjectDoesNotExist
from django.core.files import File
from django.core.serializers import serialize
from django.core.urlresolvers import reverse, NoReverseMatch
from django.core.validators import validate_slug
from django.db import connection
from django.db.models import Q, Count, Max
from django.db.models.signals import post_save, post_delete, m2m_changed
from django.template.defaultfilters import slugify
from django.utils.safestring import SafeText, mark_safe
from django.utils.translation import activate, deactivate
from ishtar_common.utils import ugettext_lazy as _, pgettext_lazy, get_image_path
from simple_history.models import HistoricalRecords as BaseHistoricalRecords
from simple_history.signals import (
post_create_historical_record,
pre_create_historical_record,
)
from unidecode import unidecode
from ishtar_common.model_managers import TypeManager
from ishtar_common.model_merging import merge_model_objects
from ishtar_common.models_imports import Import
from ishtar_common.templatetags.link_to_window import simple_link_to_window
from ishtar_common.utils import (
get_cache,
disable_for_loaddata,
get_all_field_names,
merge_tsvectors,
cached_label_changed,
post_save_geo,
task,
duplicate_item,
get_generated_id,
get_current_profile,
)
logger = logging.getLogger(__name__)
"""
from ishtar_common.models import GeneralType, get_external_id, \
LightHistorizedItem, OwnPerms, Address, post_save_cache, \
DashboardFormItem, document_attached_changed, SearchAltName, \
DynamicRequest, GeoItem, QRCodeItem, SearchVectorConfig, DocumentItem, \
QuickAction, MainItem, Merge
"""
class CachedGen(object):
@classmethod
def refresh_cache(cls):
raise NotImplementedError()
@classmethod
def _add_cache_key_to_refresh(cls, keys):
cache_ckey, current_keys = get_cache(cls, ["_current_keys"])
if type(current_keys) != list:
current_keys = []
if keys not in current_keys:
current_keys.append(keys)
cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT)
class Cached(CachedGen):
slug_field = "txt_idx"
@classmethod
def refresh_cache(cls):
cache_ckey, current_keys = get_cache(cls, ["_current_keys"])
if not current_keys:
return
for keys in current_keys:
if len(keys) == 2 and keys[0] == "__slug":
cls.get_cache(keys[1], force=True)
elif keys[0] == "__get_types":
default = None
empty_first = True
exclude = []
if len(keys) >= 2:
default = keys.pop()
if len(keys) > 1:
empty_first = bool(keys.pop())
exclude = keys[1:]
cls.get_types(
exclude=exclude,
empty_first=empty_first,
default=default,
force=True,
)
elif keys[0] == "__get_help":
cls.get_help(force=True)
@classmethod
def _add_cache_key_to_refresh(cls, keys):
cache_ckey, current_keys = get_cache(cls, ["_current_keys"])
if type(current_keys) != list:
current_keys = []
if keys not in current_keys:
current_keys.append(keys)
cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT)
@classmethod
def get_cache(cls, slug, force=False):
cache_key, value = get_cache(cls, ["__slug", slug])
if not force and value:
return value
try:
k = {cls.slug_field: slug}
obj = cls.objects.get(**k)
cache.set(cache_key, obj, settings.CACHE_TIMEOUT)
return obj
except cls.DoesNotExist:
cache.set(cache_key, None, settings.CACHE_TIMEOUT)
return None
@disable_for_loaddata
def post_save_cache(sender, **kwargs):
sender.refresh_cache()
class GeneralType(Cached, models.Model):
"""
Abstract class for "types"
"""
label = models.TextField(_("Label"))
txt_idx = models.TextField(
_("Textual ID"),
validators=[validate_slug],
unique=True,
help_text=_(
"The slug is the standardized version of the name. It contains "
"only lowercase letters, numbers and hyphens. Each slug must "
"be unique."
),
)
comment = models.TextField(_("Comment"), blank=True, default="")
available = models.BooleanField(_("Available"), default=True)
HELP_TEXT = ""
objects = TypeManager()
class Meta:
abstract = True
def __str__(self):
return self.label
def natural_key(self):
return (self.txt_idx,)
def history_compress(self):
return self.txt_idx
@classmethod
def get_documentation_string(cls):
"""
Used for automatic documentation generation
"""
s = "**label** {}, **txt_idx** {}".format(str(_("Label")), str(_("Textual ID")))
if hasattr(cls, "extra_documentation_string"):
s += cls.extra_documentation_string()
return s
@classmethod
def admin_url(cls):
return str(
reverse(
"admin:{}_{}_changelist".format(
cls._meta.app_label, cls._meta.model_name
)
)
)
@classmethod
def history_decompress(cls, value, create=False):
if not value:
return []
res = []
for txt_idx in value:
try:
res.append(cls.objects.get(txt_idx=txt_idx))
except cls.DoesNotExist:
continue
return res
@property
def explicit_label(self):
return "{} ({})".format(self.label, self._meta.verbose_name)
@classmethod
def create_default_for_test(cls):
return [cls.objects.create(label="Test %d" % i) for i in range(5)]
@property
def short_label(self):
return self.label
@property
def name(self):
return self.label
@classmethod
def get_or_create(cls, slug, label=""):
"""
Get or create a new item.
:param slug: textual id
:param label: label for initialization if the item doesn't exist (not
mandatory)
:return: instancied item of the base class
"""
item = cls.get_cache(slug)
if item:
return item
item, created = cls.objects.get_or_create(
txt_idx=slug, defaults={"label": label}
)
return item
@classmethod
def get_or_create_pk(cls, slug):
"""
Get an id from a slug. Create the associated item if needed.
:param slug: textual id
:return: id of the item (string)
"""
return str(cls.get_or_create(slug).pk)
@classmethod
def get_or_create_pks(cls, slugs):
"""
Get and merge a list of ids from a slug list. Create the associated
items if needed.
:param slugs: textual ids
:return: string with ids separated by "_"
"""
items = []
for slug in slugs:
items.append(str(cls.get_or_create(slug).pk))
return "_".join(items)
@classmethod
def get_help(cls, dct=None, exclude=None, force=False, full_hierarchy=None):
if not dct:
dct = {}
if not exclude:
exclude = []
keys = ["__get_help"]
keys += ["{}".format(ex) for ex in exclude]
keys += ["{}-{}".format(str(k), dct[k]) for k in dct]
cache_key, value = get_cache(cls, keys)
if value and not force:
return mark_safe(value)
help_text = cls.HELP_TEXT
c_rank = -1
help_items = "\n"
for item in cls.get_types(dct=dct, instances=True, exclude=exclude):
if hasattr(item, "__iter__"):
pk = item[0]
item = cls.objects.get(pk=pk)
item.rank = c_rank + 1
if hasattr(item, "parent"):
c_item = item
parents = []
while c_item.parent:
parents.append(c_item.parent.label)
c_item = c_item.parent
parents.reverse()
parents.append(item.label)
item.label = " / ".join(parents)
if not item.comment:
continue
if c_rank > item.rank:
help_items += "\n"
elif c_rank < item.rank:
help_items += "
\n"
c_rank = item.rank
help_items += "- %s
- %s
" % (
item.label,
"
".join(item.comment.split("\n")),
)
c_rank += 1
if c_rank:
help_items += c_rank * "
"
if help_text or help_items != "\n":
help_text = help_text + help_items
else:
help_text = ""
cache.set(cache_key, help_text, settings.CACHE_TIMEOUT)
return mark_safe(help_text)
@classmethod
def _get_initial_types(cls, initial, type_pks, instance=False):
new_vals = []
if not initial:
return []
if type(initial) not in (list, tuple):
initial = [initial]
for value in initial:
try:
pk = int(value)
except (ValueError, TypeError):
continue
if pk in type_pks:
continue
try:
extra_type = cls.objects.get(pk=pk)
if instance:
new_vals.append(extra_type)
else:
new_vals.append((extra_type.pk, str(extra_type)))
except cls.DoesNotExist:
continue
return new_vals
@classmethod
def get_types(
cls,
dct=None,
instances=False,
exclude=None,
empty_first=True,
default=None,
initial=None,
force=False,
full_hierarchy=False,
):
if not dct:
dct = {}
if not exclude:
exclude = []
types = []
if not instances and empty_first and not default:
types = [("", "--")]
types += cls._pre_get_types(
dct, instances, exclude, default, force, get_full_hierarchy=full_hierarchy
)
if not initial:
return types
new_vals = cls._get_initial_types(initial, [idx for idx, lbl in types])
types += new_vals
return types
@classmethod
def _pre_get_types(
cls,
dct=None,
instances=False,
exclude=None,
default=None,
force=False,
get_full_hierarchy=False,
):
if not dct:
dct = {}
if not exclude:
exclude = []
# cache
cache_key = None
if not instances:
keys = ["__get_types"]
keys += ["{}".format(ex) for ex in exclude] + ["{}".format(default)]
keys += ["{}-{}".format(str(k), dct[k]) for k in dct]
cache_key, value = get_cache(cls, keys)
if value and not force:
return value
base_dct = dct.copy()
if hasattr(cls, "parent"):
if not cache_key:
return cls._get_parent_types(
base_dct,
instances,
exclude=exclude,
default=default,
get_full_hierarchy=get_full_hierarchy,
)
vals = [
v
for v in cls._get_parent_types(
base_dct,
instances,
exclude=exclude,
default=default,
get_full_hierarchy=get_full_hierarchy,
)
]
cache.set(cache_key, vals, settings.CACHE_TIMEOUT)
return vals
if not cache_key:
return cls._get_types(base_dct, instances, exclude=exclude, default=default)
vals = [
v
for v in cls._get_types(
base_dct, instances, exclude=exclude, default=default
)
]
cache.set(cache_key, vals, settings.CACHE_TIMEOUT)
return vals
@classmethod
def _get_types(cls, dct=None, instances=False, exclude=None, default=None):
if not dct:
dct = {}
if not exclude:
exclude = []
dct["available"] = True
if default:
try:
default = cls.objects.get(txt_idx=default)
yield (default.pk, _(str(default)))
except cls.DoesNotExist:
pass
items = cls.objects.filter(**dct)
if default and default != "None":
if hasattr(default, "txt_idx"):
exclude.append(default.txt_idx)
else:
exclude.append(default)
if exclude:
items = items.exclude(txt_idx__in=exclude)
for item in items.order_by(*cls._meta.ordering).all():
if instances:
item.rank = 0
yield item
else:
yield item.pk, str(item) if item and str(item) else ""
@classmethod
def _get_childs_list(cls, dct=None, exclude=None, instances=False):
if not dct:
dct = {}
if not exclude:
exclude = []
if "parent" in dct:
dct.pop("parent")
childs = cls.objects.filter(**dct)
if exclude:
childs = childs.exclude(txt_idx__in=exclude)
if hasattr(cls, "order"):
childs = childs.order_by("order")
res = {}
if instances:
for item in childs.all():
parent_id = item.parent_id or 0
if parent_id not in res:
res[parent_id] = []
res[parent_id].append(item)
else:
for item in childs.values("id", "parent_id", "label").all():
parent_id = item["parent_id"] or 0
if item["id"] == item["parent_id"]:
parent_id = 0
if parent_id not in res:
res[parent_id] = []
res[parent_id].append((item["id"], item["label"]))
return res
PREFIX = "│ "
PREFIX_EMPTY = " "
PREFIX_MEDIUM = "├ "
PREFIX_LAST = "└ "
PREFIX_CODES = ["\u2502", "\u251C", "\u2514"]
@classmethod
def _get_childs(
cls,
item,
child_list,
prefix=0,
instances=False,
is_last=False,
last_of=None,
get_full_hierarchy=False,
):
if not last_of:
last_of = []
prefix += 1
current_child_lst = []
if item in child_list:
current_child_lst = child_list[item]
lst = []
total = len(current_child_lst)
full_hierarchy_initial = get_full_hierarchy
for idx, child in enumerate(current_child_lst):
mylast_of = last_of[:]
p = ""
if instances:
child.rank = prefix
lst.append(child)
else:
if full_hierarchy_initial:
if isinstance(full_hierarchy_initial, str):
p = full_hierarchy_initial + " > "
else:
p = ""
else:
cprefix = prefix
while cprefix:
cprefix -= 1
if not cprefix:
if (idx + 1) == total:
p += cls.PREFIX_LAST
else:
p += cls.PREFIX_MEDIUM
elif is_last:
if mylast_of:
clast = mylast_of.pop(0)
if clast:
p += cls.PREFIX_EMPTY
else:
p += cls.PREFIX
else:
p += cls.PREFIX_EMPTY
else:
p += cls.PREFIX
lst.append((child[0], SafeText(p + str(_(child[1])))))
clast_of = last_of[:]
clast_of.append(idx + 1 == total)
if instances:
child_id = child.id
else:
child_id = child[0]
if get_full_hierarchy:
if p:
if not p.endswith(" > "):
p += " > "
get_full_hierarchy = p + child[1]
else:
get_full_hierarchy = child[1]
for sub_child in cls._get_childs(
child_id,
child_list,
prefix,
instances,
is_last=((idx + 1) == total),
last_of=clast_of,
get_full_hierarchy=get_full_hierarchy,
):
lst.append(sub_child)
return lst
@classmethod
def _get_parent_types(
cls,
dct=None,
instances=False,
exclude=None,
default=None,
get_full_hierarchy=False,
):
if not dct:
dct = {}
if not exclude:
exclude = []
dct["available"] = True
child_list = cls._get_childs_list(dct, exclude, instances)
if 0 in child_list:
for item in child_list[0]:
if instances:
item.rank = 0
item_id = item.pk
yield item
else:
item_id = item[0]
yield item
if get_full_hierarchy:
get_full_hierarchy = item[1]
for child in cls._get_childs(
item_id,
child_list,
instances=instances,
get_full_hierarchy=get_full_hierarchy,
):
yield child
def save(self, *args, **kwargs):
ItemKey = apps.get_model("ishtar_common", "ItemKey")
if not self.id and not self.label:
txt_idx = self.txt_idx
if isinstance(txt_idx, list):
txt_idx = txt_idx[0]
self.txt_idx = txt_idx
self.label = " ".join(" ".join(self.txt_idx.split("-")).split("_")).title()
if not self.txt_idx:
self.txt_idx = slugify(self.label)[:100]
# clean old keys
if self.pk:
old = self.__class__.objects.get(pk=self.pk)
content_type = ContentType.objects.get_for_model(self.__class__)
if slugify(self.label) != slugify(old.label):
ItemKey.objects.filter(
object_id=self.pk, key=slugify(old.label), content_type=content_type
).delete()
if self.txt_idx != old.txt_idx:
ItemKey.objects.filter(
object_id=self.pk, key=old.txt_idx, content_type=content_type
).delete()
obj = super(GeneralType, self).save(*args, **kwargs)
self.generate_key(force=True)
return obj
def add_key(self, key, force=False, importer=None, group=None, user=None):
ItemKey = apps.get_model("ishtar_common", "ItemKey")
content_type = ContentType.objects.get_for_model(self.__class__)
if (
not importer
and not force
and ItemKey.objects.filter(key=key, content_type=content_type).count()
):
return
filtr = {"key": key, "content_type": content_type}
if group:
filtr["group"] = group
elif user:
filtr["user"] = user
else:
filtr["importer"] = importer
if force:
ItemKey.objects.filter(**filtr).exclude(object_id=self.pk).delete()
filtr["object_id"] = self.pk
ItemKey.objects.get_or_create(**filtr)
def generate_key(self, force=False):
for key in (slugify(self.label), self.txt_idx):
self.add_key(key)
def get_keys(self, importer):
ItemKey = apps.get_model("ishtar_common", "ItemKey")
keys = [self.txt_idx]
content_type = ContentType.objects.get_for_model(self.__class__)
base_q = Q(content_type=content_type, object_id=self.pk)
subquery = Q(importer__isnull=True, user__isnull=True, group__isnull=True)
subquery |= Q(user__isnull=True, group__isnull=True, importer=importer)
if importer.user:
subquery |= Q(user=importer.user, group__isnull=True, importer=importer)
if importer.associated_group:
subquery |= Q(
user__isnull=True, group=importer.associated_group, importer=importer
)
q = ItemKey.objects.filter(base_q & subquery)
for ik in q.exclude(key=self.txt_idx).all():
keys.append(ik.key)
return keys
@classmethod
def generate_keys(cls):
# content_type = ContentType.objects.get_for_model(cls)
for item in cls.objects.all():
item.generate_key()
class HierarchicalType(GeneralType):
parent = models.ForeignKey(
"self",
blank=True,
null=True,
on_delete=models.SET_NULL,
verbose_name=_("Parent"),
)
class Meta:
abstract = True
def full_label(self):
lbls = [self.label]
item = self
while item.parent:
item = item.parent
lbls.append(item.label)
return " > ".join(reversed(lbls))
@property
def first_parent(self):
parent = self.parent
parents = []
while parent:
if parent in parents: # prevent circular
return parent
parents.append(parent)
if not parent.parent:
return parent
parent = parent.parent
class StatisticItem:
STATISTIC_MODALITIES = [] # example: "year", "operation_type__label"
STATISTIC_MODALITIES_OPTIONS = OrderedDict() # example:
# OrderedDict([('year', _("Year")),
# ("operation_type__label", _("Operation type"))])
STATISTIC_SUM_VARIABLE = OrderedDict(
(("pk", (_("Number"), 1)),)
) # example: "Price", "Volume" - the number is a multiplier
class TemplateItem:
@classmethod
def _label_templates_q(cls):
model_name = "{}.{}".format(cls.__module__, cls.__name__)
q = Q(associated_model__klass=model_name, for_labels=True, available=True)
alt_model_name = model_name.replace("models_finds", "models").replace(
"models_treatments", "models"
)
if alt_model_name != model_name:
q |= Q(associated_model__klass=model_name, for_labels=True, available=True)
DocumentTemplate = apps.get_model("ishtar_common", "DocumentTemplate")
return DocumentTemplate.objects.filter(q)
@classmethod
def has_label_templates(cls):
return cls._label_templates_q().count()
@classmethod
def label_templates(cls):
return cls._label_templates_q()
def get_extra_templates(self, request):
cls = self.__class__
templates = []
name = str(cls.__name__)
module = str(cls.__module__)
if "archaeological_finds" in module:
if "models_finds" in name or "models_treatments" in name:
names = [
name,
name.replace("models_finds", "models").replace(
"models_treatments", "models"
),
]
else:
names = [
name,
name.replace("models", "models_finds"),
name.replace("models", "models_treatments"),
]
else:
names = [name]
model_names = ["{}.{}".format(module, name) for name in names]
DocumentTemplate = apps.get_model("ishtar_common", "DocumentTemplate")
q = DocumentTemplate.objects.filter(
associated_model__klass__in=model_names, for_labels=False, available=True
)
for template in q.all():
urlname = "generate-document"
templates.append(
(template.name, reverse(urlname, args=[template.slug, self.pk]))
)
return templates
class FullSearch(models.Model):
search_vector = SearchVectorField(
_("Search vector"), blank=True, null=True, help_text=_("Auto filled at save")
)
EXTRA_REQUEST_KEYS = {}
DYNAMIC_REQUESTS = {}
ALT_NAMES = {}
BASE_SEARCH_VECTORS = []
PROPERTY_SEARCH_VECTORS = []
INT_SEARCH_VECTORS = []
M2M_SEARCH_VECTORS = []
PARENT_SEARCH_VECTORS = []
# prevent circular dependency
PARENT_ONLY_SEARCH_VECTORS = []
class Meta:
abstract = True
@classmethod
def general_types(cls):
for k in get_all_field_names(cls):
field = cls._meta.get_field(k)
if not hasattr(field, "rel") or not field.rel:
continue
rel_model = field.rel.to
if issubclass(rel_model, (GeneralType, HierarchicalType)):
yield k
@classmethod
def get_alt_names(cls):
alt_names = cls.ALT_NAMES.copy()
for dr_k in cls.DYNAMIC_REQUESTS:
alt_names.update(cls.DYNAMIC_REQUESTS[dr_k].get_alt_names())
return alt_names
@classmethod
def get_query_parameters(cls):
query_parameters = {}
for v in cls.get_alt_names().values():
for language_code, language_lbl in settings.LANGUAGES:
activate(language_code)
query_parameters[str(v.search_key)] = v
deactivate()
return query_parameters
def _update_search_field(self, search_vector_conf, search_vectors, data):
for value in search_vector_conf.format(data):
with connection.cursor() as cursor:
cursor.execute(
"SELECT to_tsvector(%s, %s)", [search_vector_conf.language, value]
)
row = cursor.fetchone()
search_vectors.append(row[0])
def _update_search_number_field(self, search_vectors, val):
search_vectors.append("'{}':1".format(val))
def update_search_vector(self, save=True, exclude_parent=False):
"""
Update the search vector
:param save: True if you want to save the object immediately
:return: True if modified
"""
if not hasattr(self, "search_vector"):
return
if not self.pk:
# logger.warning("Cannot update search vector before save or "
# "after deletion.")
return
if (
not self.BASE_SEARCH_VECTORS
and not self.M2M_SEARCH_VECTORS
and not self.INT_SEARCH_VECTORS
and not self.PROPERTY_SEARCH_VECTORS
and not self.PARENT_SEARCH_VECTORS
):
logger.warning("No search_vectors defined for {}".format(self.__class__))
return
if getattr(self, "_search_updated", None):
return
JsonDataField = apps.get_model("ishtar_common", "JsonDataField")
self._search_updated = True
old_search = ""
if self.search_vector:
old_search = self.search_vector[:]
search_vectors = []
base_q = self.__class__.objects.filter(pk=self.pk)
# many to many have to be queried one by one otherwise only one is fetch
for m2m_search_vector in self.M2M_SEARCH_VECTORS:
key = m2m_search_vector.key.split("__")[0]
rel_key = getattr(self, key)
for item in rel_key.values("pk").all():
query_dct = {key + "__pk": item["pk"]}
q = copy.copy(base_q).filter(**query_dct)
q = q.annotate(
search=SearchVector(
m2m_search_vector.key, config=m2m_search_vector.language
)
).values("search")
search_vectors.append(q.all()[0]["search"])
# int/float are not well managed by the SearchVector
for int_search_vector in self.INT_SEARCH_VECTORS:
q = base_q.values(int_search_vector.key)
for val in int_search_vector.format(q.all()[0][int_search_vector.key]):
self._update_search_number_field(search_vectors, val)
if not exclude_parent:
# copy parent vector fields
for PARENT_SEARCH_VECTOR in self.PARENT_SEARCH_VECTORS:
parent = getattr(self, PARENT_SEARCH_VECTOR)
if hasattr(parent, "all"): # m2m
for p in parent.all():
search_vectors.append(p.search_vector)
elif parent:
search_vectors.append(parent.search_vector)
for PARENT_ONLY_SEARCH_VECTOR in self.PARENT_ONLY_SEARCH_VECTORS:
parent = getattr(self, PARENT_ONLY_SEARCH_VECTOR)
if hasattr(parent, "all"): # m2m
for p in parent.all():
search_vectors.append(
p.update_search_vector(save=False, exclude_parent=True)
)
elif parent:
search_vectors.append(
parent.update_search_vector(save=False, exclude_parent=True)
)
if self.BASE_SEARCH_VECTORS:
# query "simple" fields
q = base_q.values(*[sv.key for sv in self.BASE_SEARCH_VECTORS])
res = q.all()[0]
for base_search_vector in self.BASE_SEARCH_VECTORS:
data = res[base_search_vector.key]
data = unidecode(str(data))
self._update_search_field(base_search_vector, search_vectors, data)
if self.PROPERTY_SEARCH_VECTORS:
for property_search_vector in self.PROPERTY_SEARCH_VECTORS:
data = getattr(self, property_search_vector.key)
if callable(data):
data = data()
if not data:
continue
data = str(data)
self._update_search_field(property_search_vector, search_vectors, data)
if hasattr(self, "data") and self.data:
content_type = ContentType.objects.get_for_model(self)
for json_field in JsonDataField.objects.filter(
content_type=content_type, search_index=True
).all():
data = copy.deepcopy(self.data)
no_data = False
for key in json_field.key.split("__"):
if key not in data:
no_data = True
break
data = data[key]
if no_data or not data:
continue
if json_field.value_type == "B":
if data is True:
data = json_field.name
else:
continue
elif json_field.value_type in ("I", "F"):
self._update_search_number_field(search_vectors, data)
continue
elif json_field.value_type == "D":
# only index year
self._update_search_number_field(search_vectors, data.year)
continue
for lang in ("simple", settings.ISHTAR_SEARCH_LANGUAGE):
with connection.cursor() as cursor:
cursor.execute("SELECT to_tsvector(%s, %s)", [lang, data])
row = cursor.fetchone()
search_vectors.append(row[0])
new_search_vector = merge_tsvectors(search_vectors)
changed = old_search != new_search_vector
self.search_vector = new_search_vector
if save and changed:
self.__class__.objects.filter(pk=self.pk).update(
search_vector=new_search_vector
)
elif not save:
return new_search_vector
return changed
class Imported(models.Model):
imports = models.ManyToManyField(
Import, blank=True, related_name="imported_%(app_label)s_%(class)s"
)
class Meta:
abstract = True
class JsonData(models.Model, CachedGen):
data = JSONField(default={}, blank=True)
class Meta:
abstract = True
def pre_save(self):
if not self.data:
self.data = {}
@property
def json_sections(self):
sections = []
try:
content_type = ContentType.objects.get_for_model(self)
except ContentType.DoesNotExists:
return sections
JsonDataField = apps.get_model("ishtar_common", "JsonDataField")
fields = list(
JsonDataField.objects.filter(
content_type=content_type, display=True, section__isnull=True
).all()
) # no section fields
fields += list(
JsonDataField.objects.filter(
content_type=content_type, display=True, section__isnull=False
)
.order_by("section__order", "order")
.all()
)
for field in fields:
value = None
data = self.data.copy()
for key in field.key.split("__"):
if key in data:
value = copy.copy(data[key])
data = data[key]
else:
value = None
break
if value is None:
continue
if type(value) in (list, tuple):
value = " ; ".join([str(v) for v in value])
section_name = field.section.name if field.section else None
if not sections or section_name != sections[-1][0]:
# if section name is identical it is the same
sections.append((section_name, []))
sections[-1][1].append((field.name, value))
return sections
@classmethod
def refresh_cache(cls):
__, refreshed = get_cache(cls, ["cache_refreshed"])
if refreshed and time.time() - refreshed < 1:
return
cache_ckey, current_keys = get_cache(cls, ["_current_keys"])
if not current_keys:
return
for keys in current_keys:
if keys[0] == "__get_dynamic_choices":
cls._get_dynamic_choices(keys[1], force=True)
@classmethod
def _get_dynamic_choices(cls, key, force=False):
"""
Get choice from existing values
:param key: data key
:param force: if set to True do not use cache
:return: tuple of choices (id, value)
"""
cache_key, value = get_cache(cls, ["__get_dynamic_choices", key])
if not force and value:
return value
choices = set()
splitted_key = key[len("data__") :].split("__")
q = cls.objects.filter(data__has_key=key[len("data__") :]).values_list(
"data", flat=True
)
for value in q.all():
for k in splitted_key:
value = value[k]
choices.add(value)
choices = [("", "")] + [(v, v) for v in sorted(list(choices))]
cache.set(cache_key, choices, settings.CACHE_SMALLTIMEOUT)
return choices
class FixAssociated:
ASSOCIATED = {}
def fix_associated(self):
for key in self.ASSOCIATED:
item = getattr(self, key)
if not item:
continue
dct = self.ASSOCIATED[key]
for dct_key in dct:
subkey, ctype = dct_key
expected_values = dct[dct_key]
if not isinstance(expected_values, (list, tuple)):
expected_values = [expected_values]
if hasattr(ctype, "txt_idx"):
try:
expected_values = [
ctype.objects.get(txt_idx=v) for v in expected_values
]
except ctype.DoesNotExist:
# type not yet initialized
return
current_vals = getattr(item, subkey)
is_many = False
if hasattr(current_vals, "all"):
is_many = True
current_vals = current_vals.all()
else:
current_vals = [current_vals]
is_ok = False
for current_val in current_vals:
if current_val in expected_values:
is_ok = True
break
if is_ok:
continue
# the first value is used
new_value = expected_values[0]
if is_many:
getattr(item, subkey).add(new_value)
else:
setattr(item, subkey, new_value)
class CascasdeUpdate:
DOWN_MODEL_UPDATE = []
def cascade_update(self):
for down_model in self.DOWN_MODEL_UPDATE:
if not settings.USE_BACKGROUND_TASK:
rel = getattr(self, down_model)
if hasattr(rel.model, "need_update"):
rel.update(need_update=True)
continue
for item in getattr(self, down_model).all():
cached_label_changed(item.__class__, instance=item)
if hasattr(item, "point_2d"):
post_save_geo(item.__class__, instance=item)
class SearchAltName(object):
def __init__(
self, search_key, search_query, extra_query=None, distinct_query=False
):
self.search_key = search_key
self.search_query = search_query
self.extra_query = extra_query or {}
self.distinct_query = distinct_query
class HistoryError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class HistoricalRecords(BaseHistoricalRecords):
def _save_historic(
self,
manager,
instance,
history_date,
history_type,
history_user,
history_change_reason,
using,
attrs,
):
history_instance = manager.model(
history_date=history_date,
history_type=history_type,
history_user=history_user,
history_change_reason=history_change_reason,
**attrs
)
pre_create_historical_record.send(
sender=manager.model,
instance=instance,
history_date=history_date,
history_user=history_user,
history_change_reason=history_change_reason,
history_instance=history_instance,
using=using,
)
history_instance.save(using=using)
post_create_historical_record.send(
sender=manager.model,
instance=instance,
history_instance=history_instance,
history_date=history_date,
history_user=history_user,
history_change_reason=history_change_reason,
using=using,
)
def create_historical_record(self, instance, history_type, using=None):
try:
history_modifier = getattr(instance, "history_modifier", None)
assert history_modifier
except (User.DoesNotExist, AssertionError):
# on batch removing of users, user could have disappeared
return
history_date = getattr(instance, "_history_date", datetime.datetime.now())
history_change_reason = getattr(instance, "changeReason", None)
force = getattr(instance, "_force_history", False)
manager = getattr(instance, self.manager_name)
attrs = {}
for field in instance._meta.fields:
attrs[field.attname] = getattr(instance, field.attname)
q_history = instance.history.filter(
history_modifier_id=history_modifier.pk
).order_by("-history_date", "-history_id")
# instance.skip_history_when_saving = True
if not q_history.count():
if force:
delattr(instance, "_force_history")
self._save_historic(
manager,
instance,
history_date,
history_type,
history_modifier,
history_change_reason,
using,
attrs,
)
return
old_instance = q_history.all()[0]
# multiple saving by the same user in a very short time are generaly
# caused by post_save signals it is not relevant to keep them
min_history_date = datetime.datetime.now() - datetime.timedelta(seconds=5)
q = q_history.filter(
history_date__isnull=False, history_date__gt=min_history_date
).order_by("-history_date", "-history_id")
if not force and q.count():
return
if force:
delattr(instance, "_force_history")
# record a new version only if data have been changed
for field in instance._meta.fields:
if getattr(old_instance, field.attname) != attrs[field.attname]:
self._save_historic(
manager,
instance,
history_date,
history_type,
history_modifier,
history_change_reason,
using,
attrs,
)
return
class BaseHistorizedItem(
StatisticItem,
TemplateItem,
FullSearch,
Imported,
JsonData,
FixAssociated,
CascasdeUpdate,
):
"""
Historized item with external ID management.
All historized items are searchable and have a data json field.
Historized items can be "locked" for edition.
"""
IS_BASKET = False
SHOW_URL = None
EXTERNAL_ID_KEY = ""
EXTERNAL_ID_DEPENDENCIES = []
HISTORICAL_M2M = []
history_modifier = models.ForeignKey(
User,
related_name="+",
on_delete=models.SET_NULL,
verbose_name=_("Last editor"),
blank=True,
null=True,
)
history_creator = models.ForeignKey(
User,
related_name="+",
on_delete=models.SET_NULL,
verbose_name=_("Creator"),
blank=True,
null=True,
)
last_modified = models.DateTimeField(auto_now=True)
history_m2m = JSONField(default={}, blank=True)
need_update = models.BooleanField(verbose_name=_("Need update"), default=False)
locked = models.BooleanField(
verbose_name=_("Item locked for edition"), default=False
)
lock_user = models.ForeignKey(
User,
related_name="+",
on_delete=models.SET_NULL,
verbose_name=_("Locked by"),
blank=True,
null=True,
)
ALT_NAMES = {
"history_creator": SearchAltName(
pgettext_lazy("key for text search", "created-by"),
"history_creator__ishtaruser__person__cached_label__iexact",
),
"history_modifier": SearchAltName(
pgettext_lazy("key for text search", "modified-by"),
"history_modifier__ishtaruser__person__cached_label__iexact",
),
"modified_before": SearchAltName(
pgettext_lazy("key for text search", "modified-before"),
"last_modified__lte",
),
"modified_after": SearchAltName(
pgettext_lazy("key for text search", "modified-after"), "last_modified__gte"
),
}
class Meta:
abstract = True
@classmethod
def get_verbose_name(cls):
return cls._meta.verbose_name
def is_locked(self, user=None):
if not user:
return self.locked
return self.locked and (not self.lock_user or self.lock_user != user)
def merge(self, item, keep_old=False):
merge_model_objects(self, item, keep_old=keep_old)
def public_representation(self):
return {}
def duplicate(self, user=None, data=None):
return duplicate_item(self, user, data)
def update_external_id(self, save=False):
if not self.EXTERNAL_ID_KEY or (
self.external_id and not getattr(self, "auto_external_id", False)
):
return
external_id = get_generated_id(self.EXTERNAL_ID_KEY, self)
if external_id == self.external_id:
return
self.auto_external_id = True
self.external_id = external_id
self._cached_label_checked = False
if save:
self.skip_history_when_saving = True
self.save()
return external_id
def get_last_history_date(self):
q = self.history.values("history_date").order_by("-history_date")
if not q.count():
return
return q.all()[0]["history_date"]
def get_previous(self, step=None, date=None, strict=False):
"""
Get a "step" previous state of the item
"""
assert step or date
historized = self.history.all()
item = None
if step:
if len(historized) <= step:
# silently return the last step if too far in the history
item = historized[len(historized) - 1]
else:
item = historized[step]
else:
for step, item in enumerate(historized):
if item.history_date == date:
break
# ended with no match
if item.history_date != date:
return
item._step = step
if len(historized) != (step + 1):
item._previous = historized[step + 1].history_date
else:
item._previous = None
if step > 0:
item._next = historized[step - 1].history_date
else:
item._next = None
item.history_date = historized[step].history_date
model = self.__class__
for k in get_all_field_names(model):
field = model._meta.get_field(k)
if hasattr(field, "rel") and field.rel:
if not hasattr(item, k + "_id"):
setattr(item, k, getattr(self, k))
continue
val = getattr(item, k + "_id")
if not val:
setattr(item, k, None)
continue
try:
val = field.rel.to.objects.get(pk=val)
setattr(item, k, val)
except ObjectDoesNotExist:
if strict:
raise HistoryError(
"The class %s has no pk %d" % (str(field.rel.to), val)
)
setattr(item, k, None)
item.pk = self.pk
return item
@property
def last_edition_date(self):
try:
return self.history.order_by("-history_date").all()[0].history_date
except (AttributeError, IndexError):
return
@property
def history_creation_date(self):
try:
return self.history.order_by("history_date").all()[0].history_date
except (AttributeError, IndexError):
return
def rollback(self, date):
"""
Rollback to a previous state
"""
to_del, new_item = [], None
for item in self.history.all():
if item.history_date == date:
new_item = item
break
to_del.append(item)
if not new_item:
raise HistoryError("The date to rollback to doesn't exist.")
try:
field_keys = [f.name for f in self._meta.fields]
for k in field_keys:
if k != "id" and hasattr(self, k):
if not hasattr(new_item, k):
k = k + "_id"
setattr(self, k, getattr(new_item, k))
try:
self.history_modifier = User.objects.get(
pk=new_item.history_modifier_id
)
except User.ObjectDoesNotExist:
pass
self.save()
saved_m2m = new_item.history_m2m.copy()
for hist_key in self.HISTORICAL_M2M:
# after each association m2m is rewrite - force the original
# to be reset
new_item.history_m2m = saved_m2m
values = new_item.m2m_listing(hist_key, create=True) or []
hist_field = getattr(self, hist_key)
hist_field.clear()
for val in values:
hist_field.add(val)
# force label regeneration
self._cached_label_checked = False
self.save()
except ObjectDoesNotExist:
raise HistoryError("The rollback has failed.")
# clean the obsolete history
for historized_item in to_del:
historized_item.delete()
def m2m_listing(self, key):
return getattr(self, key).all()
def values(self):
values = {}
for f in self._meta.fields:
k = f.name
if k != "id":
values[k] = getattr(self, k)
return values
def get_absolute_url(self):
try:
return reverse("display-item", args=[self.SLUG, self.pk])
except NoReverseMatch:
return
def get_show_url(self):
show_url = self.SHOW_URL
if not show_url:
show_url = "show-" + self.__class__.__name__.lower()
try:
return reverse(show_url, args=[self.pk, ""])
except NoReverseMatch:
return
@property
def associated_filename(self):
if [
True
for attr in (
"get_town_label",
"get_department",
"reference",
"short_class_name",
)
if not hasattr(self, attr)
]:
return ""
items = [
slugify(self.get_department()),
slugify(self.get_town_label()).upper(),
slugify(self.short_class_name),
slugify(self.reference),
slugify(self.name or "").replace("-", "_").capitalize(),
]
last_edition_date = self.last_edition_date
if last_edition_date:
items.append(last_edition_date.strftime("%Y%m%d"))
else:
items.append("00000000")
return "-".join([str(item) for item in items])
def save(self, *args, **kwargs):
created = not self.pk
if not getattr(self, "skip_history_when_saving", False):
assert hasattr(self, "history_modifier")
if created:
self.history_creator = self.history_modifier
# external ID can have related item not available before save
external_id_updated = (
kwargs.pop("external_id_updated")
if "external_id_updated" in kwargs
else False
)
if not created and not external_id_updated:
self.update_external_id()
super(BaseHistorizedItem, self).save(*args, **kwargs)
if created and self.update_external_id():
# force resave for external ID creation
self.skip_history_when_saving = True
self._updated_id = True
return self.save(external_id_updated=True)
for dep in self.EXTERNAL_ID_DEPENDENCIES:
for obj in getattr(self, dep).all():
obj.update_external_id(save=True)
self.fix_associated()
return True
class LightHistorizedItem(BaseHistorizedItem):
history_date = models.DateTimeField(default=datetime.datetime.now)
class Meta:
abstract = True
def save(self, *args, **kwargs):
super(LightHistorizedItem, self).save(*args, **kwargs)
return self
class OwnPerms(object):
"""
Manage special permissions for object's owner
"""
@classmethod
def get_query_owns(cls, ishtaruser):
"""
Query object to get own items
"""
return None # implement for each object
def can_view(self, request):
if hasattr(self, "LONG_SLUG"):
perm = "view_" + self.LONG_SLUG
else:
perm = "view_" + self.SLUG
return self.can_do(request, perm)
def can_do(self, request, action_name):
"""
Check permission availability for the current object.
:param request: request object
:param action_name: action name eg: "change_find" - "own" variation is
checked
:return: boolean
"""
if not getattr(request.user, "ishtaruser", None):
return False
splited = action_name.split("_")
action_own_name = splited[0] + "_own_" + "_".join(splited[1:])
user = request.user
if action_own_name == "view_own_findbasket":
action_own_name = "view_own_find"
return user.ishtaruser.has_right(action_name, request.session) or (
user.ishtaruser.has_right(action_own_name, request.session)
and self.is_own(user.ishtaruser)
)
def is_own(self, user, alt_query_own=None):
"""
Check if the current object is owned by the user
"""
IshtarUser = apps.get_model("ishtar_common", "IshtarUser")
if isinstance(user, IshtarUser):
ishtaruser = user
elif hasattr(user, "ishtaruser"):
ishtaruser = user.ishtaruser
else:
return False
if not alt_query_own:
query = self.get_query_owns(ishtaruser)
else:
query = getattr(self, alt_query_own)(ishtaruser)
if not query:
return False
query &= Q(pk=self.pk)
return self.__class__.objects.filter(query).count()
@classmethod
def has_item_of(cls, user):
"""
Check if the user own some items
"""
IshtarUser = apps.get_model("ishtar_common", "IshtarUser")
if isinstance(user, IshtarUser):
ishtaruser = user
elif hasattr(user, "ishtaruser"):
ishtaruser = user.ishtaruser
else:
return False
query = cls.get_query_owns(ishtaruser)
if not query:
return False
return cls.objects.filter(query).count()
@classmethod
def _return_get_owns(
cls, owns, values, get_short_menu_class, label_key="cached_label"
):
if not owns:
return []
sorted_values = []
if hasattr(cls, "BASKET_MODEL"):
owns_len = len(owns)
for idx, item in enumerate(reversed(owns)):
if get_short_menu_class:
item = item[0]
if type(item) == cls.BASKET_MODEL:
basket = owns.pop(owns_len - idx - 1)
sorted_values.append(basket)
sorted_values = list(reversed(sorted_values))
if not values:
if not get_short_menu_class:
return sorted_values + list(
sorted(owns, key=lambda x: getattr(x, label_key) or "")
)
return sorted_values + list(
sorted(owns, key=lambda x: getattr(x[0], label_key) or "")
)
if not get_short_menu_class:
return sorted_values + list(sorted(owns, key=lambda x: x[label_key] or ""))
return sorted_values + list(sorted(owns, key=lambda x: x[0][label_key] or ""))
@classmethod
def get_owns(
cls,
user,
replace_query=None,
limit=None,
values=None,
get_short_menu_class=False,
menu_filtr=None,
):
"""
Get Own items
"""
if not replace_query:
replace_query = {}
if hasattr(user, "is_authenticated") and not user.is_authenticated():
returned = cls.objects.filter(pk__isnull=True)
if values:
returned = []
return returned
IshtarUser = apps.get_model("ishtar_common", "IshtarUser")
if isinstance(user, User):
try:
ishtaruser = IshtarUser.objects.get(user_ptr=user)
except IshtarUser.DoesNotExist:
returned = cls.objects.filter(pk__isnull=True)
if values:
returned = []
return returned
elif isinstance(user, IshtarUser):
ishtaruser = user
else:
if values:
return []
return cls.objects.filter(pk__isnull=True)
items = []
if hasattr(cls, "BASKET_MODEL"):
items = list(cls.BASKET_MODEL.objects.filter(user=ishtaruser).all())
query = cls.get_query_owns(ishtaruser)
if not query and not replace_query:
returned = cls.objects.filter(pk__isnull=True)
if values:
returned = []
return returned
if query:
q = cls.objects.filter(query)
else: # replace_query
q = cls.objects.filter(replace_query)
if values:
q = q.values(*values)
if limit:
items += list(q.order_by("-pk")[:limit])
else:
items += list(q.order_by(*cls._meta.ordering).all())
if get_short_menu_class:
if values:
if "id" not in values:
raise NotImplementedError(
"Call of get_owns with get_short_menu_class option and"
" no 'id' in values is not implemented"
)
my_items = []
for i in items:
if hasattr(cls, "BASKET_MODEL") and type(i) == cls.BASKET_MODEL:
dct = dict([(k, getattr(i, k)) for k in values])
my_items.append(
(dct, cls.BASKET_MODEL.get_short_menu_class(i.pk))
)
else:
my_items.append((i, cls.get_short_menu_class(i["id"])))
items = my_items
else:
items = [(i, cls.get_short_menu_class(i.pk)) for i in items]
return items
@classmethod
def _get_query_owns_dicts(cls, ishtaruser):
"""
List of query own dict to construct the query.
Each dict are join with an AND operator, each dict key, values are
joined with OR operator
"""
return []
@classmethod
def _construct_query_own(cls, prefix, dct_list):
q = None
for subquery_dict in dct_list:
subquery = None
for k in subquery_dict:
subsubquery = Q(**{prefix + k: subquery_dict[k]})
if subquery:
subquery |= subsubquery
else:
subquery = subsubquery
if not subquery:
continue
if q:
q &= subquery
else:
q = subquery
return q
class NumberManager(models.Manager):
def get_by_natural_key(self, number):
return self.get(number=number)
class State(models.Model):
label = models.CharField(_("Label"), max_length=30)
number = models.CharField(_("Number"), unique=True, max_length=3)
objects = NumberManager()
class Meta:
verbose_name = _("State")
ordering = ["number"]
def __str__(self):
return self.label
def natural_key(self):
return (self.number,)
class Department(models.Model):
label = models.CharField(_("Label"), max_length=30)
number = models.CharField(_("Number"), unique=True, max_length=3)
state = models.ForeignKey(
"State",
verbose_name=_("State"),
blank=True,
null=True,
on_delete=models.SET_NULL,
)
objects = NumberManager()
class Meta:
verbose_name = _("Department")
verbose_name_plural = _("Departments")
ordering = ["number"]
def __str__(self):
return self.label
def natural_key(self):
return (self.number,)
def history_compress(self):
return self.number
@classmethod
def history_decompress(cls, full_value, create=False):
if not full_value:
return []
res = []
for value in full_value:
try:
res.append(cls.objects.get(number=value))
except cls.DoesNotExist:
continue
return res
class Arrondissement(models.Model):
name = models.CharField("Nom", max_length=30)
department = models.ForeignKey(Department, verbose_name="Département")
def __str__(self):
return settings.JOINT.join((self.name, str(self.department)))
class Canton(models.Model):
name = models.CharField("Nom", max_length=30)
arrondissement = models.ForeignKey(Arrondissement, verbose_name="Arrondissement")
def __str__(self):
return settings.JOINT.join((self.name, str(self.arrondissement)))
class TownManager(models.GeoManager):
def get_by_natural_key(self, numero_insee, year):
return self.get(numero_insee=numero_insee, year=year)
class Town(Imported, models.Model):
name = models.CharField(_("Name"), max_length=100)
surface = models.IntegerField(_("Surface (m2)"), blank=True, null=True)
center = models.PointField(
_("Localisation"), srid=settings.SRID, blank=True, null=True
)
limit = models.MultiPolygonField(_("Limit"), blank=True, null=True)
numero_insee = models.CharField("Code commune (numéro INSEE)", max_length=120)
departement = models.ForeignKey(
Department,
verbose_name=_("Department"),
on_delete=models.SET_NULL,
null=True,
blank=True,
)
year = models.IntegerField(
_("Year of creation"),
null=True,
blank=True,
help_text=_(
"Filling this field is relevant to distinguish old towns " "from new towns."
),
)
children = models.ManyToManyField(
"Town", verbose_name=_("Town children"), blank=True, related_name="parents"
)
cached_label = models.CharField(
_("Cached name"), max_length=500, null=True, blank=True, db_index=True
)
objects = TownManager()
class Meta:
verbose_name = _("Town")
verbose_name_plural = _("Towns")
if settings.COUNTRY == "fr":
ordering = ["numero_insee"]
unique_together = (("numero_insee", "year"),)
def natural_key(self):
return (self.numero_insee, self.year)
def history_compress(self):
return {"numero_insee": self.numero_insee, "year": self.year or ""}
@classmethod
def get_documentation_string(cls):
"""
Used for automatic documentation generation
"""
return "**name** {}, **numero_insee** {}, **cached_label** {}".format(
_("Name"), "Code commune (numéro INSEE)", _("Cached name")
)
def get_values(self, prefix="", **kwargs):
return {
prefix or "label": str(self),
prefix + "name": self.name,
prefix + "numero_insee": self.numero_insee,
}
@classmethod
def history_decompress(cls, full_value, create=False):
if not full_value:
return []
res = []
for value in full_value:
try:
res.append(
cls.objects.get(
numero_insee=value["numero_insee"], year=value["year"] or None
)
)
except cls.DoesNotExist:
continue
return res
def __str__(self):
return self.cached_label or ""
@property
def label_with_areas(self):
label = [self.name]
if self.numero_insee:
label.append("({})".format(self.numero_insee))
for area in self.areas.all():
label.append(" - ")
label.append(area.full_label)
return " ".join(label)
def generate_geo(self, force=False):
force = self.generate_limit(force=force)
self.generate_center(force=force)
self.generate_area(force=force)
def generate_limit(self, force=False):
if not force and self.limit:
return
parents = None
if not self.parents.count():
return
for parent in self.parents.all():
if not parent.limit:
return
if not parents:
parents = parent.limit
else:
parents = parents.union(parent.limit)
# if union is a simple polygon make it a multi
if "MULTI" not in parents.wkt:
parents = parents.wkt.replace("POLYGON", "MULTIPOLYGON(") + ")"
if not parents:
return
self.limit = parents
self.save()
return True
def generate_center(self, force=False):
if not force and (self.center or not self.limit):
return
self.center = self.limit.centroid
if not self.center:
return False
self.save()
return True
def generate_area(self, force=False):
if not force and (self.surface or not self.limit):
return
surface = self.limit.transform(settings.SURFACE_SRID, clone=True).area
if surface > 214748364 or not surface:
return False
self.surface = surface
self.save()
return True
def update_town_code(self):
if not self.numero_insee or not self.children.count() or not self.year:
return
old_num = self.numero_insee[:]
numero = old_num.split("-")[0]
self.numero_insee = "{}-{}".format(numero, self.year)
if self.numero_insee != old_num:
return True
def _generate_cached_label(self):
cached_label = self.name
if settings.COUNTRY == "fr" and self.numero_insee:
dpt_len = 2
if (
self.numero_insee.startswith("97")
or self.numero_insee.startswith("98")
or self.numero_insee[0]
not in ("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
):
dpt_len = 3
cached_label = "%s - %s" % (self.name, self.numero_insee[:dpt_len])
if self.year and self.children.count():
cached_label += " ({})".format(self.year)
return cached_label
def post_save_town(sender, **kwargs):
cached_label_changed(sender, **kwargs)
town = kwargs["instance"]
town.generate_geo()
if town.update_town_code():
town.save()
post_save.connect(post_save_town, sender=Town)
def town_child_changed(sender, **kwargs):
town = kwargs["instance"]
if town.update_town_code():
town.save()
m2m_changed.connect(town_child_changed, sender=Town.children.through)
class Address(BaseHistorizedItem):
FIELDS = (
"address",
"address_complement",
"postal_code",
"town",
"precise_town",
"country",
"alt_address",
"alt_address_complement",
"alt_postal_code",
"alt_town",
"alt_country",
"phone",
"phone_desc",
"phone2",
"phone_desc2",
"phone3",
"phone_desc3",
"raw_phone",
"mobile_phone",
"email",
"alt_address_is_prefered",
)
address = models.TextField(_("Address"), blank=True, default="")
address_complement = models.TextField(
_("Address complement"), blank=True, default=""
)
postal_code = models.CharField(
_("Postal code"), max_length=10, null=True, blank=True
)
town = models.CharField(_("Town (freeform)"), max_length=150, null=True, blank=True)
precise_town = models.ForeignKey(
Town, verbose_name=_("Town (precise)"), null=True, blank=True
)
country = models.CharField(_("Country"), max_length=30, null=True, blank=True)
alt_address = models.TextField(_("Other address: address"), blank=True, default="")
alt_address_complement = models.TextField(
_("Other address: address complement"), blank=True, default=""
)
alt_postal_code = models.CharField(
_("Other address: postal code"), max_length=10, null=True, blank=True
)
alt_town = models.CharField(
_("Other address: town"), max_length=70, null=True, blank=True
)
alt_country = models.CharField(
_("Other address: country"), max_length=30, null=True, blank=True
)
phone = models.CharField(_("Phone"), max_length=18, null=True, blank=True)
phone_desc = models.CharField(
_("Phone description"), max_length=300, null=True, blank=True
)
phone2 = models.CharField(
_("Phone description 2"), max_length=18, null=True, blank=True
)
phone_desc2 = models.CharField(
_("Phone description 2"), max_length=300, null=True, blank=True
)
phone3 = models.CharField(_("Phone 3"), max_length=18, null=True, blank=True)
phone_desc3 = models.CharField(
_("Phone description 3"), max_length=300, null=True, blank=True
)
raw_phone = models.TextField(_("Raw phone"), blank=True, default="")
mobile_phone = models.CharField(
_("Mobile phone"), max_length=18, null=True, blank=True
)
email = models.EmailField(_("Email"), max_length=300, blank=True, null=True)
alt_address_is_prefered = models.BooleanField(
_("Alternative address is prefered"), default=False
)
history = HistoricalRecords(inherit=True)
SUB_ADDRESSES = []
class Meta:
abstract = True
def get_short_html_items(self):
items = []
if self.address:
items.append("""{}""".format(self.address))
if self.address_complement:
items.append(
"""{}""".format(
self.address_complement
)
)
if self.postal_code:
items.append(
"""{}""".format(self.postal_code)
)
if self.precise_town:
items.append(
"""{}""".format(self.precise_town.name)
)
elif self.town:
items.append("""{}""".format(self.town))
if self.country:
items.append("""{}""".format(self.country))
return items
def get_short_html_detail(self):
html = """"""
items = self.get_short_html_items()
if not items:
items = [
"{}".format(_("No associated address"))
]
html += "".join(items)
html += """
"""
return html
def get_town_centroid(self):
if self.precise_town:
return self.precise_town.center, self._meta.verbose_name
for sub_address in self.SUB_ADDRESSES:
sub_item = getattr(self, sub_address)
if sub_item and sub_item.precise_town:
return sub_item.precise_town.center, sub_item._meta.verbose_name
def get_town_polygons(self):
if self.precise_town:
return self.precise_town.limit, self._meta.verbose_name
for sub_address in self.SUB_ADDRESSES:
sub_item = getattr(self, sub_address)
if sub_item and sub_item.precise_town:
return sub_item.precise_town.limit, sub_item._meta.verbose_name
def get_attribute(self, attr):
if self.town or self.precise_town:
return getattr(self, attr)
for sub_address in self.SUB_ADDRESSES:
sub_item = getattr(self, sub_address)
if not sub_item:
continue
if sub_item.town or sub_item.precise_town:
return getattr(sub_item, attr)
return getattr(self, attr)
def get_address(self):
return self.get_attribute("address")
def get_address_complement(self):
return self.get_attribute("address_complement")
def get_postal_code(self):
return self.get_attribute("postal_code")
def get_town(self):
return self.get_attribute("town")
def get_precise_town(self):
return self.get_attribute("precise_town")
def get_country(self):
return self.get_attribute("country")
def simple_lbl(self):
return str(self)
def full_address(self):
lbl = self.simple_lbl()
if lbl:
lbl += "\n"
lbl += self.address_lbl()
return lbl
def address_lbl(self):
lbl = ""
prefix = ""
if self.alt_address_is_prefered:
prefix = "alt_"
if getattr(self, prefix + "address"):
lbl += getattr(self, prefix + "address")
if getattr(self, prefix + "address_complement"):
if lbl:
lbl += "\n"
lbl += getattr(self, prefix + "address_complement")
postal_code = getattr(self, prefix + "postal_code")
town = getattr(self, prefix + "town")
if postal_code or town:
if lbl:
lbl += "\n"
lbl += "{}{}{}".format(
postal_code or "", " " if postal_code and town else "", town or ""
)
if self.phone:
if lbl:
lbl += "\n"
lbl += "{} {}".format(str(_("Tel: ")), self.phone)
if self.mobile_phone:
if lbl:
lbl += "\n"
lbl += "{} {}".format(str(_("Mobile: ")), self.mobile_phone)
if self.email:
if lbl:
lbl += "\n"
lbl += "{} {}".format(str(_("Email: ")), self.email)
return lbl
class Merge(models.Model):
merge_key = models.TextField(_("Merge key"), blank=True, null=True)
merge_candidate = models.ManyToManyField("self", blank=True)
merge_exclusion = models.ManyToManyField("self", blank=True)
archived = models.NullBooleanField(default=False, blank=True, null=True)
# 1 for one word similarity, 2 for two word similarity, etc.
MERGE_CLEMENCY = None
EMPTY_MERGE_KEY = "--"
MERGE_ATTRIBUTE = "name"
class Meta:
abstract = True
def generate_merge_key(self):
if self.archived:
return
merge_attr = getattr(self, self.MERGE_ATTRIBUTE)
self.merge_key = slugify(merge_attr if merge_attr else "")
if not self.merge_key:
self.merge_key = self.EMPTY_MERGE_KEY
self.merge_key = self.merge_key
def generate_merge_candidate(self):
if self.archived:
return
if not self.merge_key:
self.generate_merge_key()
self.save(merge_key_generated=True)
if not self.pk or self.merge_key == self.EMPTY_MERGE_KEY:
return
q = (
self.__class__.objects.exclude(pk=self.pk)
.exclude(merge_exclusion=self)
.exclude(merge_candidate=self)
.exclude(archived=True)
)
if not self.MERGE_CLEMENCY:
q = q.filter(merge_key=self.merge_key)
else:
subkeys_front = "-".join(self.merge_key.split("-")[: self.MERGE_CLEMENCY])
subkeys_back = "-".join(self.merge_key.split("-")[-self.MERGE_CLEMENCY :])
q = q.filter(
Q(merge_key__istartswith=subkeys_front)
| Q(merge_key__iendswith=subkeys_back)
)
for item in q.all():
self.merge_candidate.add(item)
def save(self, *args, **kwargs):
# prevent circular save
merge_key_generated = False
if "merge_key_generated" in kwargs:
merge_key_generated = kwargs.pop("merge_key_generated")
self.generate_merge_key()
item = super(Merge, self).save(*args, **kwargs)
if not merge_key_generated:
self.merge_candidate.clear()
self.generate_merge_candidate()
return item
def archive(self):
self.archived = True
self.save()
self.merge_candidate.clear()
self.merge_exclusion.clear()
def merge(self, item, keep_old=False, exclude_fields=None):
merge_model_objects(
self, item, keep_old=keep_old, exclude_fields=exclude_fields
)
self.generate_merge_candidate()
def __get_stats_cache_values(model_name, model_pk):
StatsCache = apps.get_model("ishtar_common", "StatsCache")
q = StatsCache.objects.filter(model=model_name, model_pk=model_pk)
nb = q.count()
if nb >= 1:
sc = q.all()[0]
for extra in q.order_by("-id").all()[1:]:
extra.delete()
else:
sc = StatsCache.objects.create(model=model_name, model_pk=model_pk)
values = sc.values
if not values:
values = {}
return sc, values
@task()
def _update_stats(app, model, model_pk, funcname):
model_name = app + "." + model
model = apps.get_model(app, model)
try:
item = model.objects.get(pk=model_pk)
except model.DoesNotExist:
return
value = getattr(item, funcname)()
sc, current_values = __get_stats_cache_values(model_name, model_pk)
current_values[funcname] = value
sc.values = current_values
sc.update_requested = None
sc.updated = datetime.datetime.now()
sc.save()
def update_stats(statscache, item, funcname):
if not settings.USE_BACKGROUND_TASK:
current_values = statscache.values
if not current_values:
current_values = {}
value = getattr(item, funcname)()
current_values[funcname] = value
statscache.values = current_values
statscache.updated = datetime.datetime.now()
statscache.save()
return current_values
now = datetime.datetime.now()
app_name = item._meta.app_label
model_name = item._meta.model_name
statscache.update_requested = now.isoformat()
statscache.save()
_update_stats.delay(app_name, model_name, item.pk, funcname)
return statscache.values
class DashboardFormItem:
"""
Provide methods to manage statistics
"""
def last_stats_update(self):
model_name = self._meta.app_label + "." + self._meta.model_name
StatsCache = apps.get_model("ishtar_common", "StatsCache")
q = StatsCache.objects.filter(model=model_name, model_pk=self.pk).order_by(
"-updated"
)
if not q.count():
return
return q.all()[0].updated
def _get_or_set_stats(self, funcname, update=False, expected_type=None):
model_name = self._meta.app_label + "." + self._meta.model_name
StatsCache = apps.get_model("ishtar_common", "StatsCache")
sc, __ = StatsCache.objects.get_or_create(model=model_name, model_pk=self.pk)
if not update:
values = sc.values
if funcname not in values:
if expected_type is not None:
return expected_type()
return 0
else:
values = update_stats(sc, self, funcname)
if funcname in values:
values = values[funcname]
else:
values = 0
if expected_type is not None and not isinstance(values, expected_type):
return expected_type()
return values
@classmethod
def get_periods(cls, slice="month", fltr={}, date_source="creation"):
date_var = date_source + "_date"
q = cls.objects.filter(**{date_var + "__isnull": False})
if fltr:
q = q.filter(**fltr)
if slice == "year":
return [
res[date_var].year
for res in list(q.values(date_var).annotate(Count("id")).order_by())
]
elif slice == "month":
return [
(res[date_var].year, res[date_var].month)
for res in list(q.values(date_var).annotate(Count("id")).order_by())
]
return []
@classmethod
def get_by_year(cls, year, fltr={}, date_source="creation"):
date_var = date_source + "_date"
q = cls.objects.filter(**{date_var + "__isnull": False})
if fltr:
q = q.filter(**fltr)
return q.filter(**{date_var + "__year": year}).order_by("pk").distinct("pk")
@classmethod
def get_by_month(cls, year, month, fltr={}, date_source="creation"):
date_var = date_source + "_date"
q = cls.objects.filter(**{date_var + "__isnull": False})
if fltr:
q = q.filter(**fltr)
q = q.filter(**{date_var + "__year": year, date_var + "__month": month})
return q.order_by("pk").distinct("pk")
@classmethod
def get_total_number(cls, fltr=None):
q = cls.objects
if fltr:
q = q.filter(**fltr)
return q.order_by("pk").distinct("pk").count()
class DocumentItem:
ALT_NAMES = {
"documents__image__isnull": SearchAltName(
pgettext_lazy("key for text search", "has-image"),
"documents__image__isnull",
),
"documents__associated_url__isnull": SearchAltName(
pgettext_lazy("key for text search", "has-url"),
"documents__associated_url__isnull",
),
"documents__associated_file__isnull": SearchAltName(
pgettext_lazy("key for text search", "has-attached-file"),
"documents__associated_file__isnull",
),
}
def public_representation(self):
images = []
if getattr(self, "main_image", None):
images.append(self.main_image.public_representation())
images += [
image.public_representation()
for image in self.images_without_main_image.all()
]
return {"images": images}
@property
def images(self):
if not hasattr(self, "documents"):
Document = apps.get_model("ishtar_common", "Document")
return Document.objects.none()
return (
self.documents.filter(image__isnull=False).exclude(image="").order_by("pk")
)
@property
def images_without_main_image(self):
if not hasattr(self, "main_image") or not hasattr(self, "documents"):
return self.images
if not self.main_image:
return (
self.documents.filter(image__isnull=False)
.exclude(image="")
.order_by("pk")
)
return (
self.documents.filter(image__isnull=False)
.exclude(image="")
.exclude(pk=self.main_image.pk)
.order_by("pk")
)
@property
def pdf_attached(self):
for document in self.documents.filter(
Q(associated_file__isnull=False) | Q(source__associated_file__isnull=False)
).all():
return document.pdf_attached
def get_extra_actions(self, request):
"""
For sheet template: return "Add document / image" action
"""
# url, base_text, icon, extra_text, extra css class, is a quick action
try:
actions = super(DocumentItem, self).get_extra_actions(request)
except AttributeError:
actions = []
if not hasattr(self, "SLUG"):
return actions
can_add_doc = self.can_do(request, "add_document")
if can_add_doc and (
not hasattr(self, "is_locked") or not self.is_locked(request.user)
):
actions += [
(
reverse("create-document") + "?{}={}".format(self.SLUG, self.pk),
_("Add document/image"),
"fa fa-plus",
_("doc./image"),
"",
False,
)
]
return actions
def clean_duplicate_association(document, related_item, action):
profile = get_current_profile()
if not profile.clean_redundant_document_association or action != "post_add":
return
class_name = related_item.__class__.__name__
if class_name not in ("Find", "ContextRecord", "Operation"):
return
if class_name == "Find":
for cr in document.context_records.filter(
base_finds__find__pk=related_item.pk
).all():
document.context_records.remove(cr)
for ope in document.operations.filter(
context_record__base_finds__find__pk=related_item.pk
).all():
document.operations.remove(ope)
return
if class_name == "ContextRecord":
for ope in document.operations.filter(context_record__pk=related_item.pk).all():
document.operations.remove(ope)
if document.finds.filter(base_finds__context_record=related_item.pk).count():
document.context_records.remove(related_item)
return
if class_name == "Operation":
if document.context_records.filter(operation=related_item.pk).count():
document.operations.remove(related_item)
return
if document.finds.filter(
base_finds__context_record__operation=related_item.pk
).count():
document.operations.remove(related_item)
return
def document_attached_changed(sender, **kwargs):
# associate a default main image
instance = kwargs.get("instance", None)
model = kwargs.get("model", None)
pk_set = kwargs.get("pk_set", None)
if not instance or not model:
return
if hasattr(instance, "documents"):
items = [instance]
else:
if not pk_set:
return
try:
items = [model.objects.get(pk=pk) for pk in pk_set]
except model.DoesNotExist:
return
for item in items:
clean_duplicate_association(instance, item, kwargs.get("action", None))
for doc in item.documents.all():
doc.regenerate_all_ids()
q = item.documents.filter(image__isnull=False).exclude(image="")
if item.main_image:
if q.filter(pk=item.main_image.pk).count():
return
# the association has disappear not the main image anymore
item.main_image = None
item.skip_history_when_saving = True
item.save()
if not q.count():
return
# by default get the lowest pk
item.main_image = q.order_by("pk").all()[0]
item.skip_history_when_saving = True
item.save()
class QuickAction:
"""
Quick action available from tables
"""
def __init__(
self, url, icon_class="", text="", target=None, rights=None, module=None
):
self.url = url
self.icon_class = icon_class
self.text = text
self.rights = rights
self.target = target
self.module = module
assert self.target in ("one", "many", None)
def is_available(self, user, session=None, obj=None):
if self.module and not getattr(get_current_profile(), self.module):
return False
if not self.rights: # no restriction
return True
if not user or not hasattr(user, "ishtaruser") or not user.ishtaruser:
return False
user = user.ishtaruser
for right in self.rights:
if user.has_perm(right, session=session, obj=obj):
return True
return False
@property
def rendered_icon(self):
if not self.icon_class:
return ""
return "".format(self.icon_class)
@property
def base_url(self):
if self.target is None:
url = reverse(self.url)
else:
# put arbitrary pk for the target
url = reverse(self.url, args=[0])
url = url[:-2] # all quick action url have to finish with the
# pk of the selected item and a "/"
return url
class DynamicRequest:
def __init__(
self,
label,
app_name,
model_name,
form_key,
search_key,
type_query,
search_query,
):
self.label = label
self.form_key = form_key
self.search_key = search_key
self.app_name = app_name
self.model_name = model_name
self.type_query = type_query
self.search_query = search_query
def get_all_types(self):
model = apps.get_app_config(self.app_name).get_model(self.model_name)
return model.objects.filter(available=True)
def get_form_fields(self):
fields = {}
for item in self.get_all_types().all():
fields[self.form_key + "-" + item.txt_idx] = forms.CharField(
label=str(self.label) + " " + str(item), required=False
)
return fields
def get_extra_query(self, slug):
return {self.type_query: slug}
def get_alt_names(self):
alt_names = {}
for item in self.get_all_types().all():
alt_names[self.form_key + "-" + item.txt_idx] = SearchAltName(
self.search_key + "-" + item.txt_idx,
self.search_query,
self.get_extra_query(item.txt_idx),
distinct_query=True,
)
return alt_names
class SpatialReferenceSystem(GeneralType):
order = models.IntegerField(_("Order"), default=10)
auth_name = models.CharField(_("Authority name"), default="EPSG", max_length=256)
srid = models.IntegerField(_("Authority SRID"))
class Meta:
verbose_name = _("Spatial reference system")
verbose_name_plural = _("Spatial reference systems")
ordering = ("label",)
@classmethod
def get_documentation_string(cls):
"""
Used for automatic documentation generation
"""
doc = super(SpatialReferenceSystem, cls).get_documentation_string()
doc += ", **srid** {}, **auth_name** {}".format(
_("Authority SRID"), _("Authority name")
)
return doc
post_save.connect(post_save_cache, sender=SpatialReferenceSystem)
post_delete.connect(post_save_cache, sender=SpatialReferenceSystem)
class GeoItem(models.Model):
GEO_SOURCE = (("T", _("Town")), ("P", _("Precise")), ("M", _("Polygon")))
# gis
x = models.FloatField(_("X"), blank=True, null=True)
y = models.FloatField(_("Y"), blank=True, null=True)
z = models.FloatField(_("Z"), blank=True, null=True)
estimated_error_x = models.FloatField(
_("Estimated error for X"), blank=True, null=True
)
estimated_error_y = models.FloatField(
_("Estimated error for Y"), blank=True, null=True
)
estimated_error_z = models.FloatField(
_("Estimated error for Z"), blank=True, null=True
)
spatial_reference_system = models.ForeignKey(
SpatialReferenceSystem,
verbose_name=_("Spatial Reference System"),
blank=True,
null=True,
)
point = models.PointField(_("Point"), blank=True, null=True, dim=3)
point_2d = models.PointField(_("Point (2D)"), blank=True, null=True)
point_source = models.CharField(
_("Point source"), choices=GEO_SOURCE, max_length=1, blank=True, null=True
)
point_source_item = models.CharField(
_("Point source item"), max_length=100, blank=True, null=True
)
multi_polygon = models.MultiPolygonField(_("Multi polygon"), blank=True, null=True)
multi_polygon_source = models.CharField(
_("Multi-polygon source"),
choices=GEO_SOURCE,
max_length=1,
blank=True,
null=True,
)
multi_polygon_source_item = models.CharField(
_("Multi polygon source item"), max_length=100, blank=True, null=True
)
GEO_LABEL = ""
class Meta:
abstract = True
def get_town_centroid(self):
raise NotImplementedError
def get_town_polygons(self):
raise NotImplementedError
@property
def X(self):
"""x coordinates using the default SRS"""
coord = self.display_coordinates
if not coord:
return
return coord[0]
@property
def Y(self):
"""y coordinates using the default SRS"""
coord = self.display_coordinates
if not coord:
return
return coord[1]
@property
def display_coordinates(self, rounded=True):
if not self.point_2d:
return ""
profile = get_current_profile()
if (
not profile.display_srs
or not profile.display_srs.srid
or (
profile.display_srs == self.spatial_reference_system
and self.x
and self.y
)
):
x, y = self.x, self.y
else:
point = self.point_2d.transform(profile.display_srs.srid, clone=True)
x, y = point.x, point.y
if rounded:
return round(x, 5), round(y, 5)
return x, y
@property
def display_spatial_reference_system(self):
profile = get_current_profile()
if not profile.display_srs or not profile.display_srs.srid:
return self.spatial_reference_system
return profile.display_srs
def get_precise_points(self):
if self.point_source == "P" and self.point_2d:
return self.point_2d, self.point, self.point_source_item
def get_precise_polygons(self):
if self.multi_polygon_source == "P" and self.multi_polygon:
return self.multi_polygon, self.multi_polygon_source_item
def get_geo_items(self, get_polygons=False, rounded=True):
dict = {
"type": "Feature",
"geometry": {},
}
if self.multi_polygon:
if get_polygons:
list_coords = []
for polygon in self.multi_polygon:
list_coords.append([])
for linear_ring in range(len(polygon)):
list_coords[-1].append([])
for coords in polygon[linear_ring]:
point_2d = Point(
coords[0], coords[1], srid=self.multi_polygon.srid
)
list_coords[-1][linear_ring].append(
self.convert_coordinates(point_2d, rounded)
)
dict["geometry"]["type"] = "MultiPolygon"
dict["geometry"]["coordinates"] = list_coords
else:
dict["geometry"]["type"] = "Point"
dict["geometry"]["coordinates"] = self.convert_coordinates(
self.multi_polygon.centroid, rounded
)
else:
dict["geometry"]["type"] = "Point"
x, y = self.display_coordinates
dict["geometry"]["coordinates"] = [x, y]
return dict
def convert_coordinates(self, point_2d, rounded):
profile = get_current_profile()
if (
not profile.display_srs
or not profile.display_srs.srid
or (
profile.display_srs == self.spatial_reference_system
and point_2d.x
and point_2d.y
)
):
x, y = point_2d.x, point_2d.y
else:
point = point_2d.transform(profile.display_srs.srid, clone=True)
x, y = point.x, point.y
if rounded:
return [round(x, 5), round(y, 5)]
return [x, y]
def most_precise_geo(self):
if self.point_source == "M":
return "multi_polygon"
current_source = str(self.__class__._meta.verbose_name)
if self.multi_polygon_source_item == current_source and (
self.multi_polygon_source == "P"
or (self.point_source_item != current_source and self.point_source != "P")
):
return "multi_polygon"
if self.point_source_item == current_source and self.point_source == "P":
return "point"
if self.multi_polygon_source == "P":
return "multi_polygon"
if self.point_source == "P":
return "point"
if self.multi_polygon:
return "multi_polygon"
if self.point_2d:
return "point"
def geo_point_source(self):
if not self.point_source:
return ""
return "{} - {}".format(
dict(self.GEO_SOURCE)[self.point_source], self.point_source_item
)
def geo_polygon_source(self):
if not self.multi_polygon_source:
return ""
return "{} - {}".format(
dict(self.GEO_SOURCE)[self.multi_polygon_source],
self.multi_polygon_source_item,
)
def _geojson_serialize(self, geom_attr):
if not hasattr(self, geom_attr):
return ""
cached_label_key = "cached_label"
if self.GEO_LABEL:
cached_label_key = self.GEO_LABEL
if getattr(self, "CACHED_LABELS", None):
cached_label_key = self.CACHED_LABELS[-1]
geojson = serialize(
"geojson",
self.__class__.objects.filter(pk=self.pk),
geometry_field=geom_attr,
fields=(cached_label_key,),
)
geojson_dct = json.loads(geojson)
profile = get_current_profile()
precision = profile.point_precision
features = geojson_dct.pop("features")
for idx in range(len(features)):
feature = features[idx]
lbl = feature["properties"].pop(cached_label_key)
feature["properties"]["name"] = lbl
feature["properties"]["id"] = self.pk
if precision is not None:
geom_type = feature["geometry"].get("type", None)
if geom_type == "Point":
feature["geometry"]["coordinates"] = [
round(coord, precision)
for coord in feature["geometry"]["coordinates"]
]
geojson_dct["features"] = features
geojson_dct["link_template"] = simple_link_to_window(self).replace(
"999999", ""
)
geojson = json.dumps(geojson_dct)
return geojson
@property
def point_2d_geojson(self):
return self._geojson_serialize("point_2d")
@property
def multi_polygon_geojson(self):
return self._geojson_serialize("multi_polygon")
class ImageContainerModel:
def _get_image_path(self, filename):
return "{}/{}".format(self._get_base_image_path(), filename)
def _get_base_image_path(self):
n = datetime.datetime.now()
return "upload/{}/{:02d}/{:02d}".format(n.year, n.month, n.day)
class CompleteIdentifierItem(models.Model, ImageContainerModel):
HAS_QR_CODE = True
complete_identifier = models.TextField(
_("Complete identifier"), blank=True, default=""
)
custom_index = models.IntegerField("Custom index", blank=True, null=True)
qrcode = models.ImageField(
upload_to=get_image_path, blank=True, null=True, max_length=255
)
class Meta:
abstract = True
@property
def qrcode_path(self):
if not self.qrcode:
self.generate_qrcode()
if not self.qrcode: # error on qrcode generation
return ""
return self.qrcode.path
def generate_qrcode(self, request=None, secure=True, tmpdir=None):
url = self.get_absolute_url()
site = Site.objects.get_current()
if request:
scheme = request.scheme
else:
if secure:
scheme = "https"
else:
scheme = "http"
url = scheme + "://" + site.domain + url
TinyUrl = apps.get_model("ishtar_common", "TinyUrl")
tiny_url = TinyUrl()
tiny_url.link = url
tiny_url.save()
short_url = (
scheme
+ "://"
+ site.domain
+ reverse("tiny-redirect", args=[tiny_url.get_short_id()])
)
qr = pyqrcode.create(short_url, version=settings.ISHTAR_QRCODE_VERSION)
tmpdir_created = False
if not tmpdir:
tmpdir = tempfile.mkdtemp("-qrcode")
tmpdir_created = True
filename = tmpdir + os.sep + "qrcode.png"
qr.png(filename, scale=settings.ISHTAR_QRCODE_SCALE)
with open(filename, "rb") as qrfile:
self.qrcode.save("qrcode.png", File(qrfile))
self.skip_history_when_saving = True
self._no_move = True
self.save()
if tmpdir_created:
shutil.rmtree(tmpdir)
def generate_complete_identifier(self):
SLUG = getattr(self, "SLUG", None)
if not SLUG:
return ""
complete_identifier = get_generated_id(SLUG + "_complete_identifier", self)
if complete_identifier:
return complete_identifier
cached_label_key = "cached_label"
if getattr(self, "GEO_LABEL", None):
cached_label_key = getattr(self, "GEO_LABEL", None)
if hasattr(self, "CACHED_COMPLETE_ID"):
cached_label_key = self.CACHED_COMPLETE_ID
if not cached_label_key:
return
complete_identifier = getattr(self, cached_label_key)
return complete_identifier
def generate_custom_index(self, force=False):
if not self.pk:
return
if self.custom_index and not force:
return self.custom_index
SLUG = getattr(self, "SLUG", None)
if not SLUG:
return
k = SLUG + "_custom_index"
profile = get_current_profile()
if not hasattr(profile, k):
return
key = getattr(profile, k)
if not key or not key.strip():
return
keys = key.strip().split(";")
if len(keys) == 1 and hasattr(self, "get_index_" + keys[0]):
# custom index generation
return getattr(self, "get_index_" + key)()
model = self.__class__
try:
self_keys = set(list(model.objects.filter(pk=self.pk).values_list(*keys)))
except Exception: # bad settings - not managed here
print("Bad settings for custom_index {}".format(";".join(keys)))
return
if len(self_keys) != 1: # key is not distinct
return
self_key = self_keys.pop()
return self._get_index(keys, self_key)
def _get_index(self, keys: list, self_keys: list):
model = self.__class__
q = model.objects
if self.pk:
q = model.objects.exclude(pk=self.pk)
for idx, key in enumerate(keys):
q = q.filter(**{key: self_keys[idx]})
try:
r = q.aggregate(max_index=Max("custom_index"))
except Exception: # bad settings
return
if not r["max_index"]:
return 1
return r["max_index"] + 1
def save(self, *args, **kwargs):
super(CompleteIdentifierItem, self).save(*args, **kwargs)
self.regenerate_all_ids()
def regenerate_all_ids(self):
if getattr(self, "_prevent_loop", False):
return
modified = False
custom_index = self.generate_custom_index()
if custom_index != self.custom_index:
modified = True
self.custom_index = custom_index
complete_id = self.generate_complete_identifier()
if complete_id:
modified = True
self.complete_identifier = complete_id
if modified:
self._prevent_loop = True
self.skip_history_when_saving = True
self.save()
class SearchVectorConfig:
def __init__(self, key, language=None, func=None):
self.key = key
if language:
self.language = language
if language == "local":
self.language = settings.ISHTAR_SEARCH_LANGUAGE
else:
self.language = "simple"
self.func = func
def format(self, value):
if value == "None":
value = ""
if not self.func:
return [value]
return self.func(value)
class ShortMenuItem:
"""
Item available in the short menu
"""
UP_MODEL_QUERY = {}
@classmethod
def get_short_menu_class(cls, pk):
return ""
@property
def short_class_name(self):
return ""
class MainItem(ShortMenuItem):
"""
Item with quick actions available from tables
Extra actions are available from sheets
"""
QUICK_ACTIONS = []
@classmethod
def get_quick_actions(cls, user, session=None, obj=None):
"""
Get a list of (url, title, icon, target) actions for an user
"""
qas = []
for action in cls.QUICK_ACTIONS:
if not action.is_available(user, session=session, obj=obj):
continue
qas.append(
[
action.base_url,
mark_safe(action.text),
mark_safe(action.rendered_icon),
action.target or "",
]
)
return qas
@classmethod
def get_quick_action_by_url(cls, url):
for action in cls.QUICK_ACTIONS:
if action.url == url:
return action
def regenerate_external_id(self):
if not hasattr(self, "external_id"):
return
self.skip_history_when_saving = True
self._no_move = True
self.external_id = ""
self.auto_external_id = True
self.save()
def get_extra_actions(self, request):
if not hasattr(self, "SLUG"):
return []
actions = []
if request.user.is_superuser and hasattr(self, "auto_external_id"):
actions += [
(
reverse("regenerate-external-id")
+ "?{}={}".format(self.SLUG, self.pk),
_("Regenerate ID"),
"fa fa-key",
_("regen."),
"btn-info",
True,
200,
)
]
return actions