#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Generic models and tools for models
"""
import copy
from collections import OrderedDict
import datetime
import fiona
from importlib import import_module
import json
import logging
import os
import pyqrcode
import re
import shutil
import tempfile
import time
from django import forms
from django.apps import apps
from django.conf import settings
from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.gis.db import models
from django.contrib.gis.geos import GEOSGeometry, Point
from django.contrib.gis.gdal.error import GDALException
from django.contrib.postgres.fields import JSONField
from django.contrib.postgres.search import SearchVectorField, SearchVector
from django.contrib.sites.models import Site
from django.core.cache import cache as django_cache
from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
from django.core.files import File
from django.core.serializers import serialize
from django.urls import reverse, NoReverseMatch
from django.core.validators import validate_slug
from django.db import connection, transaction, OperationalError, IntegrityError
from django.db.models import Q, Count, Max
from django.db.models.signals import post_save, post_delete, m2m_changed
from django.template import loader
from django.template.defaultfilters import slugify
from django.utils.safestring import SafeText, mark_safe
from django.utils.translation import activate, deactivate
from ishtar_common.utils import (
ugettext_lazy as _,
pgettext_lazy,
get_image_path,
get_columns_from_class,
human_date,
SheetItem
)
from simple_history.models import HistoricalRecords as BaseHistoricalRecords
from simple_history.signals import (
post_create_historical_record,
pre_create_historical_record,
)
from unidecode import unidecode
from ishtar_common.data_importer import post_importer_action, ImporterError
from ishtar_common.model_managers import TypeManager
from ishtar_common.model_merging import merge_model_objects
from ishtar_common.models_imports import Import
from ishtar_common.templatetags.link_to_window import simple_link_to_window
from ishtar_common.utils import (
get_cache,
disable_for_loaddata,
get_all_field_names,
merge_tsvectors,
cached_label_changed,
external_id_changed,
post_save_geo,
post_save_geodata,
task,
duplicate_item,
get_generated_id,
get_current_profile,
OwnPerms
)
logger = logging.getLogger(__name__)
"""
from ishtar_common.models import GeneralType, get_external_id, \
LightHistorizedItem, OwnPerms, Address, post_save_cache, \
DashboardFormItem, document_attached_changed, SearchAltName, \
DynamicRequest, GeoItem, QRCodeItem, SearchVectorConfig, DocumentItem, \
QuickAction, MainItem, Merge
"""
class CachedGen(object):
@classmethod
def refresh_cache(cls):
raise NotImplementedError()
@classmethod
def _add_cache_key_to_refresh(cls, keys):
cache_ckey, current_keys = get_cache(cls, ["_current_keys"])
if type(current_keys) != list:
current_keys = []
if keys not in current_keys:
current_keys.append(keys)
django_cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT)
class Cached(CachedGen):
slug_field = "txt_idx"
@classmethod
def refresh_cache(cls):
cache_ckey, current_keys = get_cache(cls, ["_current_keys"])
if not current_keys:
return
for keys in current_keys:
if len(keys) == 2 and keys[0] == "__slug":
cls.get_cache(keys[1], force=True)
elif keys[0] == "__get_types":
default = None
empty_first = True
exclude = []
if len(keys) >= 2:
default = keys.pop()
if len(keys) > 1:
empty_first = bool(keys.pop())
exclude = keys[1:]
cls.get_types(
exclude=exclude,
empty_first=empty_first,
default=default,
force=True,
)
elif keys[0] == "__get_help":
cls.get_help(force=True)
@classmethod
def _add_cache_key_to_refresh(cls, keys):
cache_ckey, current_keys = get_cache(cls, ["_current_keys"])
if type(current_keys) != list:
current_keys = []
if keys not in current_keys:
current_keys.append(keys)
django_cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT)
@classmethod
def get_cache(cls, slug, force=False):
cache_key, value = get_cache(cls, ["__slug", slug])
if not force and value:
return value
try:
k = {cls.slug_field: slug}
obj = cls.objects.get(**k)
django_cache.set(cache_key, obj, settings.CACHE_TIMEOUT)
return obj
except cls.DoesNotExist:
django_cache.set(cache_key, None, settings.CACHE_TIMEOUT)
return None
@disable_for_loaddata
def post_save_cache(sender, **kwargs):
sender.refresh_cache()
class GeneralType(Cached, models.Model):
"""
Abstract class for "types"
"""
label = models.TextField(_("Label"))
txt_idx = models.TextField(
_("Textual ID"),
validators=[validate_slug],
unique=True,
help_text=_(
"The slug is the standardized version of the name. It contains "
"only lowercase letters, numbers and hyphens. Each slug must "
"be unique."
),
)
comment = models.TextField(_("Comment"), blank=True, default="")
available = models.BooleanField(_("Available"), default=True)
HELP_TEXT = ""
objects = TypeManager()
class Meta:
abstract = True
def __str__(self):
return self.label
def natural_key(self):
return (self.txt_idx,)
def history_compress(self):
return self.txt_idx
@classmethod
def get_documentation_string(cls):
"""
Used for automatic documentation generation
"""
s = "**label** {}, **txt_idx** {}".format(str(_("Label")), str(_("Textual ID")))
if hasattr(cls, "extra_documentation_string"):
s += cls.extra_documentation_string()
return s
@classmethod
def admin_url(cls):
return str(
reverse(
"admin:{}_{}_changelist".format(
cls._meta.app_label, cls._meta.model_name
)
)
)
@classmethod
def history_decompress(cls, value, create=False):
if not value:
return []
res = []
for txt_idx in value:
try:
res.append(cls.objects.get(txt_idx=txt_idx))
except cls.DoesNotExist:
continue
return res
@property
def explicit_label(self):
return "{} ({})".format(self.label, self._meta.verbose_name)
@classmethod
def create_default_for_test(cls):
return [cls.objects.create(label="Test %d" % i) for i in range(5)]
@property
def short_label(self):
return self.label
@property
def name(self):
return self.label
@classmethod
def get_or_create(cls, slug, label=""):
"""
Get or create a new item.
:param slug: textual id
:param label: label for initialization if the item doesn't exist (not
mandatory)
:return: instancied item of the base class
"""
item = cls.get_cache(slug)
if item:
return item
item, created = cls.objects.get_or_create(
txt_idx=slug, defaults={"label": label}
)
return item
@classmethod
def get_or_create_pk(cls, slug):
"""
Get an id from a slug. Create the associated item if needed.
:param slug: textual id
:return: id of the item (string)
"""
return str(cls.get_or_create(slug).pk)
@classmethod
def get_or_create_pks(cls, slugs):
"""
Get and merge a list of ids from a slug list. Create the associated
items if needed.
:param slugs: textual ids
:return: string with ids separated by "_"
"""
items = []
for slug in slugs:
items.append(str(cls.get_or_create(slug).pk))
return "_".join(items)
@classmethod
def get_help(cls, dct=None, exclude=None, force=False, full_hierarchy=None):
if not dct:
dct = {}
if not exclude:
exclude = []
keys = ["__get_help"]
keys += ["{}".format(ex) for ex in exclude]
keys += ["{}-{}".format(str(k), dct[k]) for k in dct]
cache_key, value = get_cache(cls, keys)
if value and not force:
return mark_safe(value)
help_text = cls.HELP_TEXT
c_rank = -1
help_items = "\n"
for item in cls.get_types(dct=dct, instances=True, exclude=exclude):
if hasattr(item, "__iter__"):
pk = item[0]
item = cls.objects.get(pk=pk)
item.rank = c_rank + 1
if hasattr(item, "parent"):
c_item = item
parents = []
while c_item.parent:
parents.append(c_item.parent.label)
c_item = c_item.parent
parents.reverse()
parents.append(item.label)
item.label = " / ".join(parents)
if not item.comment:
continue
if c_rank > item.rank:
help_items += "\n"
elif c_rank < item.rank:
help_items += "
\n"
c_rank = item.rank
help_items += "- %s
- %s
" % (
item.label,
"
".join(item.comment.split("\n")),
)
c_rank += 1
if c_rank:
help_items += c_rank * "
"
if help_text or help_items != "\n":
help_text = help_text + help_items
else:
help_text = ""
django_cache.set(cache_key, help_text, settings.CACHE_TIMEOUT)
return mark_safe(help_text)
@classmethod
def _get_initial_types(cls, initial, type_pks, instance=False):
new_vals = []
if not initial:
return []
if not isinstance(initial, (list, tuple)):
initial = [initial]
for value in initial:
try:
pk = int(value)
except (ValueError, TypeError):
continue
if pk in type_pks:
continue
try:
extra_type = cls.objects.get(pk=pk)
if instance:
new_vals.append(extra_type)
else:
new_vals.append((extra_type.pk, str(extra_type)))
except cls.DoesNotExist:
continue
return new_vals
@classmethod
def get_types(
cls,
dct=None,
instances=False,
exclude=None,
empty_first=True,
default=None,
initial=None,
force=False,
full_hierarchy=False,
):
if not dct:
dct = {}
if not exclude:
exclude = []
types = []
if not instances and empty_first and not default:
types = [("", "--")]
types += cls._pre_get_types(
dct, instances, exclude, default, force, get_full_hierarchy=full_hierarchy
)
if not initial:
return types
new_vals = cls._get_initial_types(initial, [idx for idx, lbl in types])
types += new_vals
return types
@classmethod
def _pre_get_types(
cls,
dct=None,
instances=False,
exclude=None,
default=None,
force=False,
get_full_hierarchy=False,
):
if not dct:
dct = {}
if not exclude:
exclude = []
# cache
cache_key = None
if not instances:
keys = ["__get_types"]
keys += ["{}".format(ex) for ex in exclude] + ["{}".format(default)]
keys += ["{}-{}".format(str(k), dct[k]) for k in dct]
cache_key, value = get_cache(cls, keys)
if value and not force:
return value
base_dct = dct.copy()
if hasattr(cls, "parent"):
if not cache_key:
return cls._get_parent_types(
base_dct,
instances,
exclude=exclude,
default=default,
get_full_hierarchy=get_full_hierarchy,
)
vals = [
v
for v in cls._get_parent_types(
base_dct,
instances,
exclude=exclude,
default=default,
get_full_hierarchy=get_full_hierarchy,
)
]
django_cache.set(cache_key, vals, settings.CACHE_TIMEOUT)
return vals
if not cache_key:
return cls._get_types(base_dct, instances, exclude=exclude, default=default)
vals = [
v
for v in cls._get_types(
base_dct, instances, exclude=exclude, default=default
)
]
django_cache.set(cache_key, vals, settings.CACHE_TIMEOUT)
return vals
@classmethod
def _get_types(cls, dct=None, instances=False, exclude=None, default=None):
if not dct:
dct = {}
if not exclude:
exclude = []
dct["available"] = True
if default:
try:
default = cls.objects.get(txt_idx=default)
yield (default.pk, str(default))
except cls.DoesNotExist:
pass
items = cls.objects.filter(**dct)
if default and default != "None":
if hasattr(default, "txt_idx"):
exclude.append(default.txt_idx)
else:
exclude.append(default)
if exclude:
items = items.exclude(txt_idx__in=exclude)
for item in items.order_by(*cls._meta.ordering).all():
if instances:
item.rank = 0
yield item
else:
yield item.pk, str(item) if item and str(item) else ""
@classmethod
def _get_childs_list(cls, dct=None, exclude=None, instances=False):
if not dct:
dct = {}
if not exclude:
exclude = []
if "parent" in dct:
dct.pop("parent")
childs = cls.objects.filter(**dct)
if exclude:
childs = childs.exclude(txt_idx__in=exclude)
ordering = cls._meta.ordering
if not ordering and hasattr(cls, "order"):
childs = childs.order_by("order")
else:
childs = childs.order_by(*ordering)
res = {}
if instances:
for item in childs.all():
parent_id = item.parent_id or 0
if parent_id not in res:
res[parent_id] = []
res[parent_id].append(item)
else:
for item in childs.values("id", "parent_id", "label").all():
parent_id = item["parent_id"] or 0
if item["id"] == item["parent_id"]:
parent_id = 0
if parent_id not in res:
res[parent_id] = []
res[parent_id].append((item["id"], item["label"]))
return res
PREFIX = "│ "
PREFIX_EMPTY = " "
PREFIX_MEDIUM = "├ "
PREFIX_LAST = "└ "
PREFIX_CODES = ["\u2502", "\u251C", "\u2514"]
@classmethod
def _get_childs(
cls,
item,
child_list,
prefix=0,
instances=False,
is_last=False,
last_of=None,
get_full_hierarchy=False,
):
if not last_of:
last_of = []
prefix += 1
current_child_lst = []
if item in child_list:
current_child_lst = child_list[item]
lst = []
total = len(current_child_lst)
full_hierarchy_initial = get_full_hierarchy
for idx, child in enumerate(current_child_lst):
mylast_of = last_of[:]
p = ""
if instances:
child.rank = prefix
lst.append(child)
else:
if full_hierarchy_initial:
if isinstance(full_hierarchy_initial, str):
p = full_hierarchy_initial + " > "
else:
p = ""
else:
cprefix = prefix
while cprefix:
cprefix -= 1
if not cprefix:
if (idx + 1) == total:
p += cls.PREFIX_LAST
else:
p += cls.PREFIX_MEDIUM
elif is_last:
if mylast_of:
clast = mylast_of.pop(0)
if clast:
p += cls.PREFIX_EMPTY
else:
p += cls.PREFIX
else:
p += cls.PREFIX_EMPTY
else:
p += cls.PREFIX
lst.append((child[0], SafeText(p + child[1])))
clast_of = last_of[:]
clast_of.append(idx + 1 == total)
if instances:
child_id = child.id
else:
child_id = child[0]
if get_full_hierarchy:
if p:
if not p.endswith(" > "):
p += " > "
get_full_hierarchy = p + child[1]
else:
get_full_hierarchy = child[1]
for sub_child in cls._get_childs(
child_id,
child_list,
prefix,
instances,
is_last=((idx + 1) == total),
last_of=clast_of,
get_full_hierarchy=get_full_hierarchy,
):
lst.append(sub_child)
return lst
@classmethod
def _get_parent_types(
cls,
dct=None,
instances=False,
exclude=None,
default=None,
get_full_hierarchy=False,
):
if not dct:
dct = {}
if not exclude:
exclude = []
dct["available"] = True
child_list = cls._get_childs_list(dct, exclude, instances)
if 0 in child_list:
for item in child_list[0]:
if instances:
item.rank = 0
item_id = item.pk
yield item
else:
item_id = item[0]
yield item
if get_full_hierarchy:
get_full_hierarchy = item[1]
for child in cls._get_childs(
item_id,
child_list,
instances=instances,
get_full_hierarchy=get_full_hierarchy,
):
yield child
def set_txt_idx(self):
base_q = self.__class__.objects
if self.pk:
base_q = base_q.exclude(pk=self.pk)
count, txt_idx = True, None
idx = 0
while count:
if not txt_idx:
txt_idx = slugify(self.label)[:100]
else:
txt_idx = txt_idx[:-5] + f"{idx:05d}"
q = base_q.filter(txt_idx=txt_idx)
count = q.count()
idx += 1
self.txt_idx = txt_idx
def save(self, *args, **kwargs):
ItemKey = apps.get_model("ishtar_common", "ItemKey")
if not self.id and not self.label:
txt_idx = self.txt_idx
if isinstance(txt_idx, list):
txt_idx = txt_idx[0]
self.txt_idx = txt_idx
self.label = " ".join(" ".join(self.txt_idx.split("-")).split("_")).title()
if not self.txt_idx:
self.set_txt_idx()
# clean old keys
if self.pk:
old = self.__class__.objects.get(pk=self.pk)
content_type = ContentType.objects.get_for_model(self.__class__)
if slugify(self.label) != slugify(old.label):
ItemKey.objects.filter(
object_id=self.pk, key=slugify(old.label), content_type=content_type
).delete()
if self.txt_idx != old.txt_idx:
ItemKey.objects.filter(
object_id=self.pk, key=old.txt_idx, content_type=content_type
).delete()
obj = super(GeneralType, self).save(*args, **kwargs)
self.generate_key(force=True)
return obj
def add_key(self, key, force=False, importer=None, group=None, user=None):
ItemKey = apps.get_model("ishtar_common", "ItemKey")
content_type = ContentType.objects.get_for_model(self.__class__)
if (
not importer
and not force
and ItemKey.objects.filter(key=key, content_type=content_type).count()
):
return
filtr = {"key": key, "content_type": content_type}
if group:
filtr["group"] = group
elif user:
filtr["user"] = user
else:
filtr["importer"] = importer
if force:
ItemKey.objects.filter(**filtr).exclude(object_id=self.pk).delete()
filtr["object_id"] = self.pk
ItemKey.objects.get_or_create(**filtr)
def generate_key(self, force=False):
for key in (slugify(self.label), self.txt_idx):
self.add_key(key)
def get_keys(self, importer):
ItemKey = apps.get_model("ishtar_common", "ItemKey")
keys = [self.txt_idx]
content_type = ContentType.objects.get_for_model(self.__class__)
base_q = Q(content_type=content_type, object_id=self.pk)
subquery = Q(importer__isnull=True, user__isnull=True, group__isnull=True)
subquery |= Q(user__isnull=True, group__isnull=True, importer=importer)
if importer.user:
subquery |= Q(user=importer.user, group__isnull=True, importer=importer)
if importer.associated_group:
subquery |= Q(
user__isnull=True, group=importer.associated_group, importer=importer
)
q = ItemKey.objects.filter(base_q & subquery)
for ik in q.exclude(key=self.txt_idx).all():
keys.append(ik.key)
return keys
@classmethod
def generate_keys(cls):
# content_type = ContentType.objects.get_for_model(cls)
for item in cls.objects.all():
item.generate_key()
class OrderedModel(models.Model):
order = models.IntegerField(_("Order"), default=10)
class Meta:
abstract = True
class OrderedType(OrderedModel, GeneralType):
class Meta:
abstract = True
class HierarchicalType(GeneralType):
parent = models.ForeignKey(
"self",
blank=True,
null=True,
on_delete=models.SET_NULL,
verbose_name=_("Parent"),
)
class Meta:
abstract = True
has_full_label = True
def full_label(self):
lbls = [self.label]
item = self
parents = [self.pk] # prevent loop
while item.parent and item.parent_id not in parents:
parents.append(item.parent_id)
item = item.parent
lbls.append(item.label)
return " > ".join(reversed(lbls))
@property
def first_parent(self):
parent = self.parent
parents = []
while parent:
if parent in parents: # prevent circular
return parent
parents.append(parent)
if not parent.parent:
return parent
parent = parent.parent
class OrderedHierarchicalType(OrderedModel, HierarchicalType):
class Meta:
abstract = True
class StatisticItem:
STATISTIC_MODALITIES = [] # example: "year", "operation_type__label"
STATISTIC_MODALITIES_OPTIONS = OrderedDict() # example:
# OrderedDict([('year', _("Year")),
# ("operation_type__label", _("Operation type"))])
STATISTIC_SUM_VARIABLE = OrderedDict(
(("pk", (_("Number"), 1)),)
) # example: "Price", "Volume" - the number is a multiplier
class TemplateItem:
@classmethod
def _label_templates_q(cls):
model_name = "{}.{}".format(cls.__module__, cls.__name__)
q = Q(associated_model__klass=model_name, for_labels=True, available=True)
alt_model_name = model_name.replace("models_finds", "models").replace(
"models_treatments", "models"
)
if alt_model_name != model_name:
q |= Q(associated_model__klass=model_name, for_labels=True, available=True)
DocumentTemplate = apps.get_model("ishtar_common", "DocumentTemplate")
return DocumentTemplate.objects.filter(q)
@classmethod
def has_label_templates(cls):
return cls._label_templates_q().count()
@classmethod
def label_templates(cls):
return cls._label_templates_q()
def get_extra_templates(self, request):
cls = self.__class__
templates = []
name = str(cls.__name__)
module = str(cls.__module__)
if "archaeological_finds" in module:
if "models_finds" in name or "models_treatments" in name:
names = [
name,
name.replace("models_finds", "models").replace(
"models_treatments", "models"
),
]
else:
names = [
name,
name.replace("models", "models_finds"),
name.replace("models", "models_treatments"),
]
else:
names = [name]
model_names = ["{}.{}".format(module, name) for name in names]
DocumentTemplate = apps.get_model("ishtar_common", "DocumentTemplate")
q = DocumentTemplate.objects.filter(
associated_model__klass__in=model_names, for_labels=False, available=True
)
for template in q.all():
urlname = "generate-document"
templates.append(
(template.name, reverse(urlname, args=[template.slug, self.pk]))
)
return templates
class SheetFilter(models.Model):
key = models.CharField(_("Key"), max_length=200)
class Meta:
abstract = True
def get_template(self):
raise NotImplemented()
def get_keys(self):
attrs = re.compile(r"item\.([_a-zA-Z]+)")
includes = re.compile(r"""\{\% *include *["'](/?(?:[^/]+/?)+)["'] *\%\}""")
main_template = self.get_template()
templates = [main_template]
with open(main_template, "r") as fle:
content = fle.read()
keys = attrs.findall(content)
for line in content.split("\n"):
for tpl_name in includes.findall(line):
if tpl_name in templates:
continue
templates.append(tpl_name)
tpl = loader.get_template(tpl_name)
with open(tpl.template.origin.name, "r") as fle:
sub_content = fle.read()
keys += attrs.findall(sub_content)
return sorted(set(keys))
class FullSearch(models.Model):
search_vector = SearchVectorField(
_("Search vector"), blank=True, null=True, help_text=_("Auto filled at save")
)
EXTRA_REQUEST_KEYS = {}
DYNAMIC_REQUESTS = {}
ALT_NAMES = {}
BOOL_FIELDS = []
NUMBER_FIELDS = []
REVERSED_BOOL_FIELDS = []
CALLABLE_BOOL_FIELDS = []
BASE_SEARCH_VECTORS = []
PROPERTY_SEARCH_VECTORS = []
INT_SEARCH_VECTORS = []
M2M_SEARCH_VECTORS = []
PARENT_SEARCH_VECTORS = []
# prevent circular dependency
PARENT_ONLY_SEARCH_VECTORS = []
# tuple (module, class) in text for dynamic import
DEFAULT_SEARCH_FORM = tuple()
class Meta:
abstract = True
@classmethod
def get_default_search_form(cls):
# DEFAULT_SEARCH_FORM is used to get the form when exporting tables
if not cls.DEFAULT_SEARCH_FORM:
return
form = getattr(import_module(cls.DEFAULT_SEARCH_FORM[0]),
cls.DEFAULT_SEARCH_FORM[1])
return form
@classmethod
def general_types(cls):
for k in get_all_field_names(cls):
field = cls._meta.get_field(k)
if not hasattr(field, "remote_field") or not field.remote_field:
continue
rel_model = field.remote_field.model
if issubclass(rel_model, (GeneralType, HierarchicalType)):
yield k
@classmethod
def get_alt_names(cls):
alt_names = cls.ALT_NAMES.copy()
for dr_k in cls.DYNAMIC_REQUESTS:
alt_names.update(cls.DYNAMIC_REQUESTS[dr_k].get_alt_names())
return alt_names
@classmethod
def get_query_parameters(cls):
query_parameters = {}
for v in cls.get_alt_names().values():
for language_code, language_lbl in settings.LANGUAGES:
activate(language_code)
query_parameters[str(v.search_key)] = v
deactivate()
return query_parameters
@classmethod
def _update_raw_search_field(cls, value):
result = []
if not value:
value = ""
for val in value.split("'"):
result.append(f"'{val.lower()}':1")
SEPS = [" ", "-", "/"]
values = []
# split ID terms
for idx, sep in enumerate(SEPS):
if not idx:
values = value.split(sep)
continue
new_values = []
for val in values:
new_values += val.split(sep)
values = new_values
for val in values:
if len(val) < 2:
continue
val = val.replace("'", "").lower()
result.append(f"'{val}':1")
return result
def _update_search_field(self, search_vector_conf, search_vectors, data):
for value in search_vector_conf.format(data):
if search_vector_conf.language == "raw":
search_vectors += self._update_raw_search_field(value)
continue
with connection.cursor() as cursor:
cursor.execute(
"SELECT to_tsvector(%s, %s)", [search_vector_conf.language, value]
)
row = cursor.fetchone()
search_vectors.append(row[0])
def _update_search_number_field(self, search_vectors, val):
try:
search_vectors.append("'{}':1".format(int(val)))
except (ValueError, TypeError):
pass
def update_search_vector(self, save=True, exclude_parent=False):
"""
Update the search vector
:param save: True if you want to save the object immediately
:return: True if modified
"""
if getattr(self, "_search_vector_updated", None):
return
self._search_vector_updated = True
if not hasattr(self, "search_vector"):
return
if not self.pk:
# logger.warning("Cannot update search vector before save or "
# "after deletion.")
return
if (
not self.BASE_SEARCH_VECTORS
and not self.M2M_SEARCH_VECTORS
and not self.INT_SEARCH_VECTORS
and not self.PROPERTY_SEARCH_VECTORS
and not self.PARENT_SEARCH_VECTORS
):
logger.warning("No search_vectors defined for {}".format(self.__class__))
return
if getattr(self, "_search_updated", None):
if not save:
return self.search_vector
return
JsonDataField = apps.get_model("ishtar_common", "JsonDataField")
self._search_updated = True
old_search = ""
if self.search_vector:
old_search = self.search_vector[:]
search_vectors = []
base_q = self.__class__.objects.filter(pk=self.pk)
# many to many have to be queried one by one otherwise only one is fetch
for m2m_search_vector in self.M2M_SEARCH_VECTORS:
key_splitted = m2m_search_vector.key.split("__")
key = key_splitted[0]
rel_key = getattr(self, key)
if len(key_splitted) == 2:
attr = key_splitted[1]
if m2m_search_vector.language == "raw":
values = list(rel_key.values_list(attr, flat=True))
else:
values = list(
rel_key.annotate(
search=SearchVector(
attr, config=m2m_search_vector.language
)).values_list("search", flat=True)
)
for value in values:
search_vectors += self._update_raw_search_field(value)
else:
for item in rel_key.values("pk").all():
query_dct = {key + "__pk": item["pk"]}
q = copy.copy(base_q).filter(**query_dct)
if m2m_search_vector.language == "raw":
q = q.values_list(m2m_search_vector.key, flat=True)
search_vectors += self._update_raw_search_field(q.all()[0])
continue
query_dct = {key + "__pk": item["pk"]}
q = copy.copy(base_q).filter(**query_dct)
q = q.annotate(
search=SearchVector(
m2m_search_vector.key, config=m2m_search_vector.language
)
).values("search")
search_vectors.append(q.all()[0]["search"])
# int/float are not well managed by the SearchVector
for int_search_vector in self.INT_SEARCH_VECTORS:
q = base_q.values(int_search_vector.key)
for val in int_search_vector.format(q.all()[0][int_search_vector.key]):
self._update_search_number_field(search_vectors, val)
if not exclude_parent:
# copy parent vector fields
for PARENT_SEARCH_VECTOR in self.PARENT_SEARCH_VECTORS:
parent = getattr(self, PARENT_SEARCH_VECTOR)
if hasattr(parent, "all"): # m2m
for p in parent.all():
search_vectors.append(p.search_vector)
elif parent:
search_vectors.append(parent.search_vector)
for PARENT_ONLY_SEARCH_VECTOR in self.PARENT_ONLY_SEARCH_VECTORS:
parent = getattr(self, PARENT_ONLY_SEARCH_VECTOR)
if hasattr(parent, "all"): # m2m
for p in parent.all():
search_vectors.append(
p.update_search_vector(save=False, exclude_parent=True)
)
elif parent:
search_vectors.append(
parent.update_search_vector(save=False, exclude_parent=True)
)
if self.BASE_SEARCH_VECTORS:
# query "simple" fields
q = base_q.values(*[sv.key for sv in self.BASE_SEARCH_VECTORS])
res = q.all()[0]
for base_search_vector in self.BASE_SEARCH_VECTORS:
data = res[base_search_vector.key]
data = unidecode(str(data))
self._update_search_field(base_search_vector, search_vectors, data)
if self.PROPERTY_SEARCH_VECTORS:
for property_search_vector in self.PROPERTY_SEARCH_VECTORS:
data = getattr(self, property_search_vector.key)
if callable(data):
data = data()
if not data:
continue
data = str(data)
self._update_search_field(property_search_vector, search_vectors, data)
if hasattr(self, "data") and self.data:
content_type = ContentType.objects.get_for_model(self)
for json_field in JsonDataField.objects.filter(
content_type=content_type, search_index=True
).all():
data = copy.deepcopy(self.data)
no_data = False
for key in json_field.key.split("__"):
if key not in data:
no_data = True
break
data = data[key]
if no_data or not data:
continue
if json_field.value_type == "B":
if data is True:
data = json_field.name
else:
continue
elif json_field.value_type in ("I", "F"):
self._update_search_number_field(search_vectors, data)
continue
elif json_field.value_type == "D":
# only index year
if hasattr(data, "year"):
self._update_search_number_field(search_vectors, data.year)
else:
y = None
for d in data.split("-"):
if len(d) == 4: # should be the year
try:
y = int(d)
except ValueError:
y = None
if y:
self._update_search_number_field(search_vectors, y)
continue
datas = [data]
if json_field.value_type == "MC":
datas = data
for d in datas:
for lang in ("simple", settings.ISHTAR_SEARCH_LANGUAGE):
with connection.cursor() as cursor:
cursor.execute("SELECT to_tsvector(%s, %s)", [lang, d])
row = cursor.fetchone()
search_vectors.append(row[0])
# TODO - performance: could be very slow -> cf. DGM CD17
new_search_vector = merge_tsvectors(search_vectors)
changed = old_search != new_search_vector
self.search_vector = new_search_vector
if save and changed:
self.__class__.objects.filter(pk=self.pk).update(
search_vector=new_search_vector
)
elif not save:
return new_search_vector
return changed
class SearchAltName(object):
def __init__(
self, search_key, search_query, extra_query=None, distinct_query=False, related_name=None
):
self.search_key = search_key
self.search_query = search_query
self.extra_query = extra_query or {}
self.distinct_query = distinct_query
self.related_name = related_name
class Imported(models.Model):
imports = models.ManyToManyField(
Import, blank=True, related_name="imported_%(app_label)s_%(class)s",
verbose_name=_("Created by imports")
)
timestamp_geo = models.IntegerField(_("Timestamp geo"), null=True, blank=True)
timestamp_label = models.IntegerField(_("Timestamp label"), null=True, blank=True)
imports_updated = models.ManyToManyField(
Import, blank=True, related_name="import_updated_%(app_label)s_%(class)s",
verbose_name=_("Updated by imports")
)
ALT_NAMES = {
"imports": SearchAltName(
pgettext_lazy("key for text search", "imports"),
"imports__name__iexact",
),
}
class Meta:
abstract = True
def _get_imports(self, user, key, limit):
if not user.ishtaruser:
return []
q = getattr(self, key)
if user.is_superuser or user.ishtaruser.has_right("view_import"):
q = q.all()
elif user.ishtaruser.has_right("view_own_import"):
q = q.filter(Q(user=user.ishtaruser) | Q(importer_type__users__pk=user.ishtaruser.pk))
else:
return []
q = q.order_by("-id")
has_limit = False
if q.count() > limit:
has_limit = True
q = q[:limit]
lst = list(q.all())
new_lst = []
for imprt in lst:
if imprt.group:
new_lst.append(imprt.group)
else:
new_lst.append(imprt)
if has_limit:
new_lst.append("...")
return new_lst
def get_imports(self, user, limit=None):
return self._get_imports(user, "imports", limit)
def get_imports_updated(self, user, limit=None):
return self._get_imports(user, "imports_updated", limit)
class JsonData(models.Model, CachedGen):
data = JSONField(default=dict, blank=True)
class Meta:
abstract = True
def pre_save(self):
if not self.data:
self.data = {}
@property
def json_sections(self):
sections = []
try:
content_type = ContentType.objects.get_for_model(self)
except ContentType.DoesNotExists:
return sections
JsonDataField = apps.get_model("ishtar_common", "JsonDataField")
fields = list(
JsonDataField.objects.filter(
content_type=content_type, display=True, section__isnull=True
).all()
) # no section fields
fields += list(
JsonDataField.objects.filter(
content_type=content_type, display=True, section__isnull=False
)
.order_by("section__order", "order")
.all()
)
for field in fields:
value = None
data = self.data.copy()
for key in field.key.split("__"):
if key in data:
value = copy.copy(data[key])
data = data[key]
else:
value = None
break
if value is None:
continue
if type(value) in (list, tuple):
value = " ; ".join([field.format_value(v) for v in value if v])
if not value:
continue
else:
value = field.format_value(value)
section_name = field.section.name if field.section else None
if not sections or section_name != sections[-1][0]:
# if section name is identical it is the same
sections.append((section_name, []))
sections[-1][1].append((field.name, value))
return sections
@classmethod
def refresh_cache(cls):
__, refreshed = get_cache(cls, ["cache_refreshed"])
if refreshed and time.time() - refreshed < 1:
return
cache_ckey, current_keys = get_cache(cls, ["_current_keys"])
if not current_keys:
return
for keys in current_keys:
if isinstance(keys, (list, tuple)) and keys[0] == "__get_dynamic_choices":
cls._get_dynamic_choices(keys[1], force=True)
@classmethod
def _get_dynamic_choices(cls, key, force=False):
"""
Get choice from existing values
:param key: data key
:param force: if set to True do not use cache
:return: tuple of choices (id, value)
"""
cache_key, value = get_cache(cls, ["__get_dynamic_choices", key])
if not force and value:
return value
choices = set()
splitted_key = key[len("data__"):].split("__")
q = cls.objects.filter(data__has_key=key[len("data__"):]).values_list(
"id", "data"
)
multi = False
for pk, value in q.all():
for k in splitted_key:
value = value[k]
if isinstance(value, list):
# fix bad recording
if len(value) == 1 and isinstance(value[0], str) \
and value[0].startswith("['") \
and value[0].endswith("']"):
value = value[0][2:-2]
value = value.split("', '")
obj = cls.objects.get(pk=pk)
data = copy.deepcopy(obj.data)
if len(splitted_key) == 1:
data[splitted_key[0]] = value
elif len(splitted_key) == 2:
data[splitted_key[0]][splitted_key[1]] = value
elif len(splitted_key) == 3:
data[splitted_key[0]][splitted_key[1]][splitted_key[2]] = value
else:
print("To many level in json field - fix not managed")
obj.data = data
obj.no_post_process()
obj.save()
multi = True
for v in value:
if v:
choices.add(v)
else:
choices.add(value)
c = []
if not multi:
c = [("", "")]
c += [(v, v) for v in sorted(list(choices)) if v]
django_cache.set(cache_key, c, settings.CACHE_SMALLTIMEOUT)
return c
class FixAssociated:
ASSOCIATED = {}
def fix_associated(self):
for key in self.ASSOCIATED:
item = getattr(self, key)
if not item:
continue
dct = self.ASSOCIATED[key]
for dct_key in dct:
subkey, ctype = dct_key
expected_values = dct[dct_key]
if not isinstance(expected_values, (list, tuple)):
expected_values = [expected_values]
if hasattr(ctype, "txt_idx"):
try:
expected_values = [
ctype.objects.get(txt_idx=v) for v in expected_values
]
except ctype.DoesNotExist:
# type not yet initialized
return
current_vals = getattr(item, subkey)
is_many = False
if hasattr(current_vals, "all"):
is_many = True
current_vals = current_vals.all()
else:
current_vals = [current_vals]
is_ok = False
for current_val in current_vals:
if current_val in expected_values:
is_ok = True
break
if is_ok:
continue
# the first value is used
new_value = expected_values[0]
if is_many:
getattr(item, subkey).add(new_value)
else:
setattr(item, subkey, new_value)
class HistoryError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class HistoricalRecords(BaseHistoricalRecords):
def get_extra_fields(self, model, fields):
def get_history_m2m(attr):
def _get_history_m2m(self):
q = model.objects.filter(pk=getattr(self, model._meta.pk.attname))
if q.count():
item = q.all()[0]
if attr in item.history_m2m:
return item.history_m2m[attr]
return _get_history_m2m
def get_serialize_call(attr):
def _get_serialize_call(self):
q = model.objects.filter(pk=getattr(self, model._meta.pk.attname))
if q.count():
return getattr(q.all()[0], attr)()
return _get_serialize_call
def get_serialize_properties(attr):
def _get_serialize_properties(self):
q = model.objects.filter(pk=getattr(self, model._meta.pk.attname))
if q.count():
return getattr(q.all()[0], attr)
return _get_serialize_properties
extra_fields = super().get_extra_fields(model, fields)
# initialize default empty fields
fields = [f.name for f in model._meta.fields]
lst = ["documents"]
for key in lst:
extra_fields[key] = ""
for k in getattr(model, "SERIALIZE_PROPERTIES", []):
if k not in fields:
extra_fields[k] = get_serialize_properties(k)
for k in getattr(model, "SERIALIZE_CALL", []):
if k not in fields:
extra_fields[k] = get_serialize_call(k)
for k in getattr(model, "HISTORICAL_M2M", []):
if k not in fields:
extra_fields[k] = get_history_m2m(k)
return extra_fields
def _save_historic(
self,
manager,
instance,
history_date,
history_type,
history_user,
history_change_reason,
using,
attrs,
):
history_instance = manager.model(
history_date=history_date,
history_type=history_type,
history_user=history_user,
history_change_reason=history_change_reason,
**attrs,
)
pre_create_historical_record.send(
sender=manager.model,
instance=instance,
history_date=history_date,
history_user=history_user,
history_change_reason=history_change_reason,
history_instance=history_instance,
using=using,
)
history_instance.save(using=using)
post_create_historical_record.send(
sender=manager.model,
instance=instance,
history_instance=history_instance,
history_date=history_date,
history_user=history_user,
history_change_reason=history_change_reason,
using=using,
)
def create_historical_record(self, instance, history_type, using=None):
try:
history_modifier = getattr(instance, "history_modifier", None)
except User.DoesNotExist:
# on batch removing of users, user could have disappeared
return
if not history_modifier:
return
history_date = getattr(instance, "_history_date", datetime.datetime.now())
history_change_reason = getattr(instance, "changeReason", None)
force = getattr(instance, "_force_history", False)
manager = getattr(instance, self.manager_name)
attrs = {}
for field in instance._meta.fields:
attrs[field.attname] = getattr(instance, field.attname)
q_history = instance.history.filter(
history_modifier_id=history_modifier.pk
).order_by("-history_date", "-history_id")
# instance.skip_history_when_saving = True
if not q_history.count():
if force:
delattr(instance, "_force_history")
self._save_historic(
manager,
instance,
history_date,
history_type,
history_modifier,
history_change_reason,
using,
attrs,
)
return
old_instance = q_history.all()[0]
# multiple saving by the same user in a very short time are generaly
# caused by post_save signals it is not relevant to keep them
min_history_date = datetime.datetime.now() - datetime.timedelta(seconds=5)
q = q_history.filter(
history_date__isnull=False, history_date__gt=min_history_date
).order_by("-history_date", "-history_id")
if not force and q.count():
return
if force:
delattr(instance, "_force_history")
# record a new version only if data have been changed
for field in instance._meta.fields:
if getattr(old_instance, field.attname) != attrs[field.attname]:
self._save_historic(
manager,
instance,
history_date,
history_type,
history_modifier,
history_change_reason,
using,
attrs,
)
return
class BaseHistorizedItem(
StatisticItem,
TemplateItem,
FullSearch,
Imported,
JsonData,
FixAssociated,
):
"""
Historized item with external ID management.
All historized items are searchable and have a data json field.
Historized items can be "locked" for edition.
"""
IS_BASKET = False
EXTERNAL_ID_KEY = ""
EXTERNAL_ID_DEPENDENCIES = []
HISTORICAL_M2M = []
history_modifier = models.ForeignKey(
User,
related_name="+",
on_delete=models.SET_NULL,
verbose_name=_("Last editor"),
blank=True,
null=True,
)
history_creator = models.ForeignKey(
User,
related_name="+",
on_delete=models.SET_NULL,
verbose_name=_("Creator"),
blank=True,
null=True,
)
last_modified = models.DateTimeField(blank=True, default=datetime.datetime.now)
created = models.DateTimeField(blank=True, default=datetime.datetime.now)
history_m2m = JSONField(default=dict, blank=True)
need_update = models.BooleanField(verbose_name=_("Need update"), default=False)
locked = models.BooleanField(
verbose_name=_("Item locked for edition"), default=False
)
lock_user = models.ForeignKey(
User,
related_name="+",
on_delete=models.SET_NULL,
verbose_name=_("Locked by"),
blank=True,
null=True,
)
DATED_FIELDS = [
"created",
"last_modified",
]
BOOL_FIELDS = ["locked"]
ALT_NAMES = {
"history_creator": SearchAltName(
pgettext_lazy("key for text search", "created-by"),
"history_creator__ishtaruser__person__cached_label__iexact",
),
"history_modifier": SearchAltName(
pgettext_lazy("key for text search", "modified-by"),
"history_modifier__ishtaruser__person__cached_label__iexact",
),
"created": SearchAltName(
pgettext_lazy("key for text search", "created"),
"created",
),
"modified": SearchAltName(
pgettext_lazy("key for text search", "modified"),
"last_modified",
),
"locked": SearchAltName(
pgettext_lazy("key for text search", "locked"), "locked"
)
}
class Meta:
abstract = True
@classmethod
def get_verbose_name(cls):
return cls._meta.verbose_name
def is_locked(self, user=None):
if not user:
return self.locked
return self.locked and (not self.lock_user or self.lock_user != user)
def merge(self, item, keep_old=False):
merge_model_objects(self, item, keep_old=keep_old)
def public_representation(self):
return {}
def duplicate(self, user=None, data=None):
return duplicate_item(self, user, data)
def update_external_id(self, save=False, no_set=False):
if not hasattr(self, "external_id"):
return
if self.external_id and not getattr(self, "auto_external_id", False):
return
external_id_key = self.EXTERNAL_ID_KEY or (
hasattr(self, "SLUG") and (self.SLUG + "_external_id")
)
if not external_id_key:
return
external_id = get_generated_id(external_id_key, self)
if external_id == self.external_id:
return
if no_set:
return external_id
try:
self.auto_external_id = True
except AttributeError:
pass
try:
self.external_id = external_id
except AttributeError:
return
self._cached_label_checked = False
if save:
if self.pk:
self.__class__.objects.filter(pk=self.pk).update(
auto_external_id=True,
external_id=external_id
)
else:
self.skip_history_when_saving = True
self.save()
return external_id
def get_last_history_date(self):
q = self.history.values("history_date").order_by("-history_date")
if not q.count():
return
return q.all()[0]["history_date"]
def get_previous(self, step=None, date=None, strict=False):
"""
Get a "step" previous state of the item
"""
if not step and not date:
raise AttributeError("Need to provide step or date")
historized = self.history.all()
item = None
if step:
if len(historized) <= step:
# silently return the last step if too far in the history
item = historized[len(historized) - 1]
else:
item = historized[step]
else:
for step, item in enumerate(historized):
if item.history_date == date:
break
# ended with no match
if item.history_date != date:
return
item._step = step
if len(historized) != (step + 1):
item._previous = historized[step + 1].history_date
else:
item._previous = None
if step > 0:
item._next = historized[step - 1].history_date
else:
item._next = None
item.history_date = historized[step].history_date
model = self.__class__
for k in get_all_field_names(model):
field = model._meta.get_field(k)
if hasattr(field, "rel") and field.rel:
if not hasattr(item, k + "_id"):
setattr(item, k, getattr(self, k))
continue
val = getattr(item, k + "_id")
if not val:
setattr(item, k, None)
continue
try:
val = field.remote_field.model.objects.get(pk=val)
setattr(item, k, val)
except ObjectDoesNotExist:
if strict:
raise HistoryError(
"The class %s has no pk %d"
% (str(field.remote_field.model), val)
)
setattr(item, k, None)
item.pk = self.pk
return item
@property
def last_edition_date(self):
try:
return self.history.order_by("-history_date").all()[0].history_date
except (AttributeError, IndexError):
return
@property
def history_creation_date(self):
try:
return self.history.order_by("history_date").all()[0].history_date
except (AttributeError, IndexError):
return
def rollback(self, date):
"""
Rollback to a previous state
"""
to_del, new_item = [], None
for item in self.history.all():
if item.history_date == date:
new_item = item
break
to_del.append(item)
if not new_item:
raise HistoryError("The date to rollback to doesn't exist.")
try:
field_keys = [f.name for f in self._meta.fields]
for k in field_keys:
if k != "id" and hasattr(self, k):
if not hasattr(new_item, k):
k = k + "_id"
setattr(self, k, getattr(new_item, k))
try:
self.history_modifier = User.objects.get(
pk=new_item.history_modifier_id
)
except User.ObjectDoesNotExist:
pass
self.save()
saved_m2m = new_item.history_m2m.copy()
for hist_key in self.HISTORICAL_M2M:
# after each association m2m is rewrite - force the original
# to be reset
new_item.history_m2m = saved_m2m
values = new_item.m2m_listing(hist_key, create=True) or []
hist_field = getattr(self, hist_key)
hist_field.clear()
for val in values:
hist_field.add(val)
# force label regeneration
self._cached_label_checked = False
self.save()
except ObjectDoesNotExist:
raise HistoryError("The rollback has failed.")
# clean the obsolete history
for historized_item in to_del:
historized_item.delete()
def m2m_listing(self, key):
return getattr(self, key).all()
def values(self):
values = {}
for f in self._meta.fields:
k = f.name
if k != "id":
values[k] = getattr(self, k)
return values
@property
def associated_filename(self):
if [
True
for attr in (
"get_town_label",
"get_department",
"reference",
"short_class_name",
)
if not hasattr(self, attr)
]:
return ""
items = [
slugify(self.get_department()),
slugify(self.get_town_label()).upper(),
slugify(self.short_class_name),
slugify(self.reference),
slugify(self.name or "").replace("-", "_").capitalize(),
]
last_edition_date = self.last_edition_date
if last_edition_date:
items.append(last_edition_date.strftime("%Y%m%d"))
else:
items.append("00000000")
return "-".join([str(item) for item in items])
def save(self, *args, **kwargs):
created = not self.pk
if (not getattr(self, "skip_history_when_saving", False)
and not getattr(self, "_no_last_modified_update", False)) \
or not self.last_modified:
self.last_modified = datetime.datetime.now()
if not getattr(self, "skip_history_when_saving", False):
if not hasattr(self, "history_modifier"):
raise NotImplementedError("Should have a history_modifier field.")
if created:
self.history_creator = self.history_modifier
# external ID can have related item not available before save
external_id_updated = (
kwargs.pop("external_id_updated")
if "external_id_updated" in kwargs
else False
)
if not created and not external_id_updated:
self.update_external_id()
super(BaseHistorizedItem, self).save(*args, **kwargs)
if created and self.update_external_id():
# force resave for external ID creation
self.skip_history_when_saving = True
self._updated_id = True
return self.save(external_id_updated=True)
for dep in self.EXTERNAL_ID_DEPENDENCIES:
for obj in getattr(self, dep).all():
obj.update_external_id(save=True)
self.fix_associated()
return True
class LightHistorizedItem(BaseHistorizedItem):
history_date = models.DateTimeField(default=datetime.datetime.now)
class Meta:
abstract = True
def save(self, *args, **kwargs):
super(LightHistorizedItem, self).save(*args, **kwargs)
return self
class DocumentItem:
ALT_NAMES = {
"documents__image__isnull": SearchAltName(
pgettext_lazy("key for text search", "has-image"),
"documents__image__isnull",
),
"documents__associated_url__isnull": SearchAltName(
pgettext_lazy("key for text search", "has-url"),
"documents__associated_url__isnull",
),
"documents__associated_file__isnull": SearchAltName(
pgettext_lazy("key for text search", "has-attached-file"),
"documents__associated_file__isnull",
),
"documents__source_type": SearchAltName(
pgettext_lazy("key for text search", "document-type"),
"documents__source_type__label__iexact",
),
}
@classmethod
def get_label_for_model_plural(cls):
return cls._meta.verbose_name_plural
def documents_list(self) -> list:
Document = apps.get_model("ishtar_common", "Document")
return self.get_associated_main_item_list("documents", Document)
def public_representation(self):
images = []
if getattr(self, "main_image", None):
images.append(self.main_image.public_representation())
images += [
image.public_representation()
for image in self.images_without_main_image.all()
]
return {"images": images}
@property
def images(self):
if not hasattr(self, "documents"):
Document = apps.get_model("ishtar_common", "Document")
return Document.objects.none()
return (
self.documents.filter(image__isnull=False).exclude(image="").order_by("pk")
)
@property
def images_number(self):
return self.images.count()
@property
def images_without_main_image(self):
if not hasattr(self, "main_image") or not hasattr(self, "documents"):
return self.images
if not self.main_image:
return (
self.documents.filter(image__isnull=False)
.exclude(image="")
.order_by("pk")
)
return (
self.documents.filter(image__isnull=False)
.exclude(image="")
.exclude(pk=self.main_image.pk)
.order_by("pk")
)
@property
def pdf_attached(self):
for document in self.documents.filter(
Q(associated_file__isnull=False) | Q(source__associated_file__isnull=False)
).all():
return document.pdf_attached
def get_extra_actions(self, request):
"""
For sheet template: return "Add document / image" action
"""
# url, base_text, icon, extra_text, extra css class, is a quick action
try:
actions = super(DocumentItem, self).get_extra_actions(request)
except AttributeError:
actions = []
if not hasattr(self, "SLUG") or not hasattr(self, "can_do"):
return actions
if not hasattr(self, "can_do"):
print(f"**WARNING** can_do not implemented for {self.__class__}")
return actions
can_add_doc = self.can_do(request, "add_document")
if can_add_doc and (
not hasattr(self, "is_locked") or not self.is_locked(request.user)
):
actions += [
(
reverse("create-document") + "?{}={}".format(self.SLUG, self.pk),
_("Add document/image"),
"fa fa-plus",
_("doc./image"),
"",
False,
)
]
return actions
def clean_duplicate_association(document, related_item, action):
profile = get_current_profile()
if not profile.clean_redundant_document_association or action != "post_add":
return
class_name = related_item.__class__.__name__
if class_name not in ("Find", "ContextRecord", "Operation"):
return
if class_name == "Find":
for cr in document.context_records.filter(
base_finds__find__pk=related_item.pk
).all():
document.context_records.remove(cr)
for ope in document.operations.filter(
context_record__base_finds__find__pk=related_item.pk
).all():
document.operations.remove(ope)
return
if class_name == "ContextRecord":
for ope in document.operations.filter(context_record__pk=related_item.pk).all():
document.operations.remove(ope)
if document.finds.filter(base_finds__context_record=related_item.pk).count():
document.context_records.remove(related_item)
return
if class_name == "Operation":
if document.context_records.filter(operation=related_item.pk).count():
document.operations.remove(related_item)
return
if document.finds.filter(
base_finds__context_record__operation=related_item.pk
).count():
document.operations.remove(related_item)
return
def document_attached_changed(sender, **kwargs):
# associate a default main image
instance = kwargs.get("instance", None)
model = kwargs.get("model", None)
pk_set = kwargs.get("pk_set", None)
if not instance or not model:
return
if hasattr(instance, "documents"):
items = [instance]
else:
if not pk_set:
return
try:
items = [model.objects.get(pk=pk) for pk in pk_set]
except model.DoesNotExist:
return
for item in items:
clean_duplicate_association(instance, item, kwargs.get("action", None))
for doc in item.documents.all():
doc.regenerate_all_ids()
q = item.documents.filter(image__isnull=False).exclude(image="")
if item.main_image:
if q.filter(pk=item.main_image.pk).count():
return
# the association has disappear not the main image anymore
item.main_image = None
item.skip_history_when_saving = True
item.save()
if not q.count():
return
# by default get the lowest pk
item.main_image = q.order_by("pk").all()[0]
item.skip_history_when_saving = True
item.save()
class NumberManager(models.Manager):
def get_by_natural_key(self, number):
return self.get(number=number)
class State(models.Model):
label = models.CharField(_("Label"), max_length=30)
number = models.CharField(_("Number"), unique=True, max_length=10)
objects = NumberManager()
class Meta:
verbose_name = _("State")
ordering = ["number"]
def __str__(self):
return self.label
def natural_key(self):
return (self.number,)
class Department(models.Model):
label = models.CharField(_("Label"), max_length=30)
number = models.CharField(_("Number"), unique=True, max_length=3)
state = models.ForeignKey(
"State",
verbose_name=_("State"),
blank=True,
null=True,
on_delete=models.SET_NULL,
)
objects = NumberManager()
class Meta:
verbose_name = _("Department")
verbose_name_plural = _("Departments")
ordering = ["number"]
ADMIN_SECTION = _("Geography")
def __str__(self):
return self.label
def natural_key(self):
return (self.number,)
def history_compress(self):
return self.number
@classmethod
def history_decompress(cls, full_value, create=False):
if not full_value:
return []
res = []
for value in full_value:
try:
res.append(cls.objects.get(number=value))
except cls.DoesNotExist:
continue
return res
class Arrondissement(models.Model):
name = models.CharField("Nom", max_length=30)
department = models.ForeignKey(
Department, verbose_name="Département", on_delete=models.CASCADE
)
def __str__(self):
return settings.JOINT.join((self.name, str(self.department)))
class Canton(models.Model):
name = models.CharField("Nom", max_length=30)
arrondissement = models.ForeignKey(
Arrondissement, verbose_name="Arrondissement", on_delete=models.CASCADE
)
def __str__(self):
return settings.JOINT.join((self.name, str(self.arrondissement)))
class SpatialReferenceSystem(GeneralType):
order = models.IntegerField(_("Order"), default=10)
auth_name = models.CharField(_("Authority name"), default="EPSG", max_length=256)
srid = models.IntegerField(_("Authority SRID"))
round = models.IntegerField(_("Number of decimal places"), default=5)
round_z = models.IntegerField(_("Number of decimal places for Z"), default=3)
class Meta:
verbose_name = _("Geographic - Spatial reference system")
verbose_name_plural = _("Geographic - Spatial reference systems")
ordering = (
"order",
"label",
)
ADMIN_SECTION = _("Geography")
@classmethod
def get_documentation_string(cls):
"""
Used for automatic documentation generation
"""
doc = super(SpatialReferenceSystem, cls).get_documentation_string()
doc += ", **srid** {}, **auth_name** {}".format(
_("Authority SRID"), _("Authority name")
)
return doc
post_save.connect(post_save_cache, sender=SpatialReferenceSystem)
post_delete.connect(post_save_cache, sender=SpatialReferenceSystem)
class GeoOriginType(HierarchicalType):
"""
ex: topographical surveys, georeferencing, ...
"""
order = models.IntegerField(_("Order"), default=10)
class Meta:
verbose_name = _("Geographic - Origin type")
verbose_name_plural = _("Geographic - Origin types")
ordering = (
"order",
"label",
)
ADMIN_SECTION = _("Geography")
class GeoDataType(HierarchicalType):
"""
ex: outline, z-sup, ...
"""
order = models.IntegerField(_("Order"), default=10)
class Meta:
verbose_name = _("Geographic - Data type")
verbose_name_plural = _("Geographic - Data types")
ordering = (
"order",
"label",
)
ADMIN_SECTION = _("Geography")
class GeoProviderType(HierarchicalType):
"""
ex: GeoNames, IGN, ...
"""
order = models.IntegerField(_("Order"), default=10)
class Meta:
verbose_name = _("Geographic - Provider type")
verbose_name_plural = _("Geographic - Provider types")
ordering = (
"order",
"label",
)
ADMIN_SECTION = _("Geography")
class GeoBufferType(GeneralType):
order = models.IntegerField(_("Order"), default=10)
class Meta:
verbose_name = _("Geographic - Buffer type")
verbose_name_plural = _("Geographic - Buffer types")
ordering = (
"order",
"label",
)
ADMIN_SECTION = _("Geography")
GEOJSON_POINT_TPL = {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": []
},
"properties": {
}
}
]
}
GEOMETRY_TYPE_LBL = {
"POINT": _("Point"),
"MULTILINE": _("Line(s)"),
"MULTIPOINTS": _("Point(s)"),
"MULTIPOLYGON": _("Polygon(s)"),
}
GEOTYPE_TO_GEOVECTOR = {
# key: (geom attr, need list convert)
"Point": ("point_2d", False),
"LineString": ("multi_line", True),
"Polygon": ("multi_polygon", True),
"MultiPoint": ("multi_points", False),
"MultiLineString": ("multi_line", False),
"MultiPolygon": ("multi_polygon", False),
}
class GeoVectorData(Imported, OwnPerms):
SLUG = "geovectordata"
RELATED_MODELS = [
"related_items_ishtar_common_town",
"related_items_archaeological_operations_operation",
"related_items_archaeological_operations_archaeologicalsite",
"related_items_archaeological_context_records_contextrecord",
"related_items_archaeological_finds_basefind",
"related_items_archaeological_warehouse_warehouse",
"related_items_archaeological_warehouse_container",
]
buffer = models.FloatField(
_("Buffer"), blank=True, null=True
)
buffer_type = models.ForeignKey(GeoBufferType, blank=True, null=True,
on_delete=models.CASCADE)
need_update = models.BooleanField(_("Need update"), default=False)
name = models.TextField(_("Name"), default="-")
source_content_type = models.ForeignKey(
ContentType, related_name="content_type_geovectordata", on_delete=models.CASCADE
)
source_id = models.PositiveIntegerField()
source = GenericForeignKey("source_content_type", "source_id")
import_key = models.TextField(_("Import key"), blank=True, null=True,
help_text=_("Use this for update imports"))
origin = models.ForeignKey(
GeoOriginType,
blank=True,
null=True,
on_delete=models.PROTECT,
verbose_name=_("Origin"),
help_text=_("For instance: topographical survey, georeferencing, ..."),
)
data_type = models.ForeignKey(
GeoDataType,
blank=True,
null=True,
on_delete=models.PROTECT,
verbose_name=_("Data type"),
help_text=_("For instance: outline, z-sup, ..."),
)
provider = models.ForeignKey(
GeoProviderType,
blank=True,
null=True,
on_delete=models.PROTECT,
verbose_name=_("Provider"),
help_text=_("Data provider"),
)
comment = models.TextField(_("Comment"), default="", blank=True)
x = models.FloatField(_("X"), blank=True, null=True, help_text=_("User input"))
y = models.FloatField(_("Y"), blank=True, null=True, help_text=_("User input"))
z = models.FloatField(_("Z"), blank=True, null=True, help_text=_("User input"))
# x == cached_x if user input else get it from other sources
# cached is converted to the display SRID
cached_x = models.FloatField(_("X (cached)"), blank=True, null=True)
cached_y = models.FloatField(_("Y (cached)"), blank=True, null=True)
cached_z = models.FloatField(_("Z (cached)"), blank=True, null=True)
estimated_error_x = models.FloatField(
_("Estimated error for X"), blank=True, null=True
)
estimated_error_y = models.FloatField(
_("Estimated error for Y"), blank=True, null=True
)
estimated_error_z = models.FloatField(
_("Estimated error for Z"), blank=True, null=True
)
spatial_reference_system = models.ForeignKey(
SpatialReferenceSystem,
verbose_name=_("Spatial Reference System"),
blank=True,
null=True,
on_delete=models.PROTECT,
)
point_2d = models.PointField(_("Point (2D)"), blank=True, null=True)
point_3d = models.PointField(_("Point (3D)"), blank=True, null=True, dim=3)
multi_points = models.MultiPointField(_("Multi points"), blank=True, null=True)
multi_line = models.MultiLineStringField(_("Multi lines"), blank=True, null=True)
multi_polygon = models.MultiPolygonField(_("Multi polygons"), blank=True, null=True)
class Meta:
verbose_name = _("Geographic - Vector data")
verbose_name_plural = _("Geographic - Vector data")
unique_together = ("source_content_type", "source_id", "import_key")
permissions = (
("view_own_geovectordata", "Can view own Geographic - Vector data"),
("add_own_geovectordata", "Can add own Geographic - Vector data"),
("change_own_geovectordata", "Can change own Geographic - Vector data"),
("delete_own_geovectordata", "Can delete own Geographic - Vector data"),
)
ADMIN_SECTION = _("Geography")
def __str__(self):
name = self.name
if not name or name == "-":
for related_model in self.RELATED_MODELS:
q = getattr(self, related_model)
cached_label_key = "cached_label"
if getattr(q.model, "GEO_LABEL", None):
cached_label_key = q.model.GEO_LABEL
if q.count(): # arbitrary return the first item
name = str(q.values_list(cached_label_key, flat=True).all()[0])
break
if self.data_type:
name += f" ({str(self.data_type).lower()})"
return name
def is_own(self, ishtaruser, alt_query_own=None):
ct = self.source_content_type
model = apps.get_model(ct.app_label, ct.model)
if not hasattr(model, "_get_query_owns_dicts"):
return False
sub_q = model.get_query_owns(ishtaruser)
if not sub_q:
return False
return self.source_id in list(
model.objects.filter(sub_q).values_list("id", flat=True)
)
SERIALIZE_EXCLUDE = []
def full_serialize(self, search_model, recursion=False, request=None):
dct = {}
fields = [
"id",
"buffer",
"buffer_type",
"name",
"origin",
"data_type",
"provider",
"comment",
"spatial_reference_system",
"source",
"estimated_error_x",
"estimated_error_y",
"estimated_error_z",
]
for field_name in fields:
value = getattr(self, field_name)
dct[field_name] = str(value) if value else ""
dct["geojson"] = self.geojson
return dct
@classmethod
def get_query_owns(cls, ishtaruser):
q = None
for app_label, model_name in (
("archaeological_operations", "Operation"),
("archaeological_operations", "ArchaeologicalSite"),
("archaeological_context_records", "ContextRecord"),
("archaeological_finds", "BaseFind"),
("archaeological_warehouse", "Warehouse"),
("archaeological_warehouse", "Container"),
):
model = apps.get_model(app_label, model_name)
sub_q = cls._construct_query_own(
model, "", model._get_query_owns_dicts(ishtaruser)
)
q2 = Q(
source_id__in=list(sub_q.values_list("id", flat=True)),
source_content_type__app_label=app_label,
source_content_type__model=model_name.lower(),
)
if not q:
q = q2
else:
q |= q2
return q
@property
def source_label(self):
return str(self.source)
def display_coordinates_3d(self):
return self.display_coordinates(dim=3)
def display_coordinates(self, rounded=None, rounded_z=None, dim=2, srid=None, cache=True):
spatial_reference_system = None
if not srid:
if self.spatial_reference_system and self.spatial_reference_system.srid:
spatial_reference_system = self.spatial_reference_system
else:
profile = get_current_profile()
if profile.display_srs and profile.display_srs.srid:
spatial_reference_system = profile.display_srs
if spatial_reference_system:
srid = spatial_reference_system.srid
if not srid:
srid = 4326
if not spatial_reference_system:
q = SpatialReferenceSystem.objects.filter(srid=srid)
if q.count():
spatial_reference_system = q.all()[0]
if not rounded:
if spatial_reference_system:
rounded = spatial_reference_system.round
else:
rounded = 5
if not rounded_z:
if spatial_reference_system:
rounded_z = spatial_reference_system.round_z
else:
rounded_z = 3
try:
return self.get_coordinates(rounded=rounded, rounded_z=rounded_z, srid=srid, dim=dim,
cache=cache)
except GDALException:
# bad conversion
return
def get_coordinates(self, rounded=5, rounded_z=3, srid: int = None, dim=2, cache=False):
if dim not in (2, 3):
raise ValueError(_("Only 2 or 3 dimensions"))
if cache and srid == 4326:
coordinates = [self.cached_x, self.cached_y]
if dim == 3:
coordinates.append(self.cached_z)
else:
if self.x or self.y: # user input
if not srid or not self.spatial_reference_system or \
srid == self.spatial_reference_system.srid:
coordinates = [self.x, self.y]
if dim == 3:
coordinates.append(self.z)
if not self.spatial_reference_system:
q = SpatialReferenceSystem.objects.filter(srid=4326)
if q.count():
self.spatial_reference_system = q.all()[0]
else:
args = {
"x": self.x,
"y": self.y,
"srid": self.spatial_reference_system.srid,
}
if dim == 3:
args["z"] = self.z
point = Point(**args).transform(srid, clone=True)
coordinates = [point.x, point.y]
if srid == 4326:
coordinates = list(reversed(coordinates))
if dim == 3:
coordinates.append(point.z)
else:
if self.point_2d:
geom = self.point_2d
elif self.point_3d:
geom = self.point_3d
elif self.multi_points:
geom = self.multi_points.centroid
elif self.multi_line:
geom = self.multi_line.centroid
elif self.multi_polygon:
geom = self.multi_polygon.centroid
else:
if dim == 2:
return [None, None]
return [None, None, self.z or None]
point = geom
if not srid or srid != geom.srid:
point = geom.transform(srid, clone=True)
x, y = point.x, point.y
else:
x, y = point.x, point.y
if dim == 2:
coordinates = [x, y]
else:
coordinates = [x, y, point.z]
if rounded is None:
return coordinates
if coordinates[0]:
if rounded <= 0:
coordinates[0] = int(coordinates[0])
coordinates[1] = int(coordinates[1])
else:
coordinates[0] = round(coordinates[0] or 0, rounded)
coordinates[1] = round(coordinates[1] or 0, rounded)
if dim == 3 and rounded_z is not None and coordinates[2] is not None:
if rounded_z <= 0:
coordinates[2] = round(coordinates[2] or 0, rounded_z)
else:
coordinates[2] = round(coordinates[2] or 0, rounded_z)
return coordinates
def get_coordinates_from_polygon(self, rounded=5, srid: int = None):
if self.multi_polygon:
return self.convert_coordinates(
self.multi_polygon.centroid, rounded=rounded, srid=srid
)
def get_x(self, srid: int = None) -> float:
coord = self.get_coordinates(srid)
if coord:
return coord[0]
def get_y(self, srid: int = None) -> float:
coord = self.get_coordinates(srid)
if coord:
return coord[1]
def get_z(self, srid: int = None) -> float:
coord = self.get_coordinates(srid, dim=3)
if coord:
return coord[2]
@property
def display_spatial_reference_system(self):
if self.spatial_reference_system:
return self.spatial_reference_system
profile = get_current_profile()
return profile.srs
@classmethod
def _get_geo_item_list(cls, q, current_geodata, url, precision, rounded):
collection_id = []
items_id = []
q = q.values("main_geodata", "id")
for item in q.distinct().all():
geodata_id = item["main_geodata"]
if geodata_id not in current_geodata:
collection_id.append(geodata_id)
items_id.append(item["id"])
current_geodata.append(geodata_id)
collection = []
for idx in range(len(collection_id)):
geo = json.loads(GeoVectorData.objects.get(pk=collection_id[idx]).geojson)
geo_type = geo.get("type", None)
url_geo = url.format(items_id[idx])
if geo_type == "FeatureCollection":
for feat in geo["features"]:
if "properties" in feat:
feat["properties"]["url"] = url_geo
collection += geo["features"]
elif geo_type:
if "properties" in geo:
geo["properties"]["url"] = url_geo
collection.append(geo)
if not precision and rounded:
precision = 6
r = re.compile(r"(\d+)\.(\d{6})(\d*)")
new_collection = []
for feat in collection:
geom_type = feat["geometry"].get("type", None)
if geom_type == "Point":
if precision is not None:
feat["geometry"]["coordinates"] = [
round(feat["geometry"]["coordinates"][0], precision),
round(feat["geometry"]["coordinates"][1], precision),
]
if not (-90 <= feat["geometry"]["coordinates"][1] <= 90) or not (
-180 <= feat["geometry"]["coordinates"][0] <= 180):
# probably a bad projection
continue
new_collection.append(feat)
return new_collection
def get_geo_items(self, rounded=5):
dct = {"type": "Feature", "geometry": {},
"properties": {"label": str(self)}}
if self.multi_polygon:
list_coords = []
for polygon in self.multi_polygon:
list_coords.append([])
for linear_ring in range(len(polygon)):
list_coords[-1].append([])
for coords in polygon[linear_ring].coords:
point_2d = Point(
coords[0], coords[1], srid=self.multi_polygon.srid
)
list_coords[-1][linear_ring].append(
self.convert_coordinates(point_2d, rounded)
)
dct["geometry"]["type"] = "MultiPolygon"
dct["geometry"]["coordinates"] = list_coords
elif self.multi_points:
list_coords = []
for coords in self.multi_points:
point_2d = Point(
coords.x, coords.y, srid=self.multi_points.srid
)
list_coords.append(
self.convert_coordinates(point_2d, rounded)
)
dct["geometry"]["type"] = "MultiPoint"
dct["geometry"]["coordinates"] = list_coords
elif self.multi_line:
list_coords = []
for idx, line in enumerate(self.multi_line):
if not idx:
list_coords.append([])
for coords in line:
point_2d = Point(
coords.x, coords.y, srid=self.multi_line.srid
)
list_coords[-1].append(
self.convert_coordinates(point_2d, rounded)
)
dct["geometry"]["type"] = "MultiLine"
dct["geometry"]["coordinates"] = list_coords
else:
dct["geometry"]["type"] = "Point"
coords = self.display_coordinates(srid=4326)
if coords:
dct["geometry"]["coordinates"] = coords
else:
return {}
return dct
def convert_coordinates(self, point_2d, rounded=5, srid=None):
if not srid:
profile = get_current_profile()
if profile.display_srs and profile.display_srs.srid:
srid = profile.display_srs.srid
if not srid:
srid = 4326
point = point_2d.transform(srid, clone=True)
x, y = point.x, point.y
if rounded:
return [round(x, rounded), round(y, rounded)]
return [x, y]
def _geojson_serialize(self, geom_attr):
if not hasattr(self, geom_attr):
return "{}"
geojson = serialize(
"geojson",
self.__class__.objects.filter(pk=self.pk),
geometry_field=geom_attr,
fields=("name",),
)
geojson_dct = json.loads(geojson)
return self._geojson_base_serialize(geojson_dct)
def _geojson_base_serialize(self, geojson_dct):
profile = get_current_profile()
precision = profile.point_precision
features = geojson_dct.pop("features")
for idx in range(len(features)):
feature = features[idx]
lbl = self.name or ""
feature["properties"]["name"] = lbl
feature["properties"]["id"] = self.pk
if precision is not None:
geom_type = feature["geometry"].get("type", None)
if geom_type == "Point":
feature["geometry"]["coordinates"] = [
round(coord, precision)
for coord in feature["geometry"]["coordinates"]
]
if self.buffer:
feature["properties"]["buffer"] = self.buffer
geojson_dct["features"] = features
try:
geojson_dct["link_template"] = simple_link_to_window(self).replace(
"999999", ""
)
except NoReverseMatch:
pass
geojson = json.dumps(geojson_dct)
return geojson
@property
def point_2d_geojson(self):
return self._geojson_serialize("point_2d")
@property
def multi_polygon_geojson(self):
return self._geojson_serialize("multi_polygon")
@property
def geometry_type(self):
if self.x or self.y or self.z or self.point_2d or self.point_3d:
return "POINT"
if self.multi_line:
return "MULTILINE"
if self.multi_points:
return "MULTIPOINTS"
if self.multi_polygon:
return "MULTIPOLYGON"
return ""
@property
def geometry_type_label(self):
return GEOMETRY_TYPE_LBL.get(self.geometry_type, "")
@property
def geojson(self):
if self.x or self.y or self.z:
geo = GEOJSON_POINT_TPL.copy()
geo["features"][0]["geometry"]["coordinates"] = [
self.cached_x, self.cached_y
]
return self._geojson_base_serialize(geo)
if self.point_2d:
return self._geojson_serialize("point_2d")
if self.point_3d:
return self._geojson_serialize("point_3d")
if self.multi_line:
return self._geojson_serialize("multi_line")
if self.multi_points:
return self._geojson_serialize("multi_points")
if self.multi_polygon:
return self._geojson_serialize("multi_polygon")
return "{}"
def update_from_geojson(self, geojson: str, save=False) -> bool:
"""
Update geometry from a geojson string
:param geojson: geojson
:param save: save the object if set to True
:return: True if update
"""
if not geojson: # no data provided
return False
with fiona.open(geojson) as src:
if len(src) != 1:
# ambiguous -> need only one feature
return False
feature = src[0]
geom = feature["geometry"]
if geom["type"] not in GEOTYPE_TO_GEOVECTOR:
# unknown geometry type
return False
feature_attr, need_list_convert = GEOTYPE_TO_GEOVECTOR[geom["type"]]
if need_list_convert:
geom["coordinates"] = [geom["coordinates"]]
geom = json.dumps(geom)
setattr(self, feature_attr, GEOSGeometry(geom))
if save:
self.save()
geo_attrs = ["point_2d", "point_3d", "x", "y", "z", "multi_line",
"multi_points", "multi_polygon"]
geo_attrs.pop(geo_attrs.index(feature_attr))
for geo_attr in geo_attrs:
setattr(self, geo_attr, None)
if save:
self.save()
return True
@classmethod
def migrate_srid(cls, new_srid):
fields = (
"point_2d", "point_3d", "multi_points", "multi_line", "multi_polygon",
)
with connection.cursor() as cursor:
for name in fields:
cursor.execute(
"UPDATE ishtar_common_geovectordata SET %s=ST_SetSRID(%s, %s);",
[name, name, new_srid]
)
items = cls.objects.all()
for item in items:
for name in fields:
getattr(item, name).transform(new_srid)
item._no_geo_check = True
item.save()
post_save.connect(post_save_geodata, sender=GeoVectorData)
def geodata_attached_post_add(model, instance, pk_set):
item_pks = list(model.objects.filter(pk__in=pk_set).values_list("pk", flat=True))
if not item_pks:
return
# use a cache to manage during geodata attach
if not hasattr(instance, "_geodata"):
instance._geodata = []
if not instance.main_geodata_id:
instance.main_geodata_id = item_pks[0]
instance.skip_history_when_saving = True
instance._no_move = True
if not hasattr(instance, "_geodata"):
instance._geodata = []
instance._geodata += [pk for pk in item_pks if pk not in instance._geodata]
instance.save()
# for all sub item verify that the geo items are present
for query in instance.geodata_child_item_queries():
child_model = query.model
m2m_model = child_model.geodata.through
m2m_key = f"{child_model._meta.model_name}_id"
geoitems = {}
for child_id in query.values_list("id", flat=True):
child = None
for pk in item_pks:
q = m2m_model.objects.filter(**{m2m_key: child_id,
"geovectordata_id": pk})
if not q.count():
if not child:
child = model.objects.get(pk=pk)
if pk not in geoitems:
geoitems[pk] = GeoVectorData.objects.get(pk=pk)
child_model.objects.get(pk=child_id).geodata.add(geoitems[pk])
def geodata_attached_remove(model, instance, pk_set=None, clear=False):
if clear:
item_pks = getattr(instance, "_geodata_clear_item_pks", [])
else:
item_pks = list(model.objects.filter(pk__in=pk_set).values_list("pk", flat=True))
if not item_pks:
return
# use a cache to manage during geodata attach
if instance.main_geodata_id in item_pks:
instance.main_geodata_id = None
instance.skip_history_when_saving = True
instance._no_move = True
instance.save()
# for all sub item verify that the geo items are present
for query in instance.geodata_child_item_queries():
child_model = query.model
m2m_model = child_model.geodata.through
m2m_key = f"{child_model._meta.model_name}_id"
geoitems = {}
for child_id in query.values_list("id", flat=True):
child = None
for pk in item_pks:
q = m2m_model.objects.filter(**{m2m_key: child_id,
"geovectordata_id": pk})
if q.count():
if not child:
child = model.objects.get(pk=pk)
if pk not in geoitems:
geoitems[pk] = GeoVectorData.objects.get(pk=pk)
child_model.objects.get(pk=child_id).geodata.remove(geoitems[pk])
def geodata_attached_changed(sender, **kwargs):
if getattr(settings, "ISHTAR_MIGRATE_V4", False):
# disable on first migration
return
# manage main geoitem and cascade association
profile = get_current_profile()
if not profile.mapping:
return
instance = kwargs.get("instance", None)
model = kwargs.get("model", None)
pk_set = kwargs.get("pk_set", None)
action = kwargs.get("action", None)
if not instance or not model:
return
if model != GeoVectorData: # reverse post attributes
instance_pk = instance.pk
instance = model.objects.get(pk=list(pk_set)[0])
model = GeoVectorData
pk_set = {instance_pk}
if action == "post_add":
geodata_attached_post_add(model, instance, pk_set)
elif action == "post_remove":
geodata_attached_remove(model, instance, pk_set)
elif action == "pre_clear":
instance._geodata_clear_item_pks = list(
instance.geodata.values_list("id", flat=True)
)
elif action == "post_clear":
geodata_attached_remove(model, instance, clear=True)
class GeographicItem(models.Model):
main_geodata = models.ForeignKey(
GeoVectorData,
on_delete=models.SET_NULL,
blank=True,
null=True,
related_name="main_related_items_%(app_label)s_%(class)s",
verbose_name=_("Main geodata")
)
geodata = models.ManyToManyField(
GeoVectorData, blank=True, related_name="related_items_%(app_label)s_%(class)s",
verbose_name=_("Geodata")
)
ALT_NAMES = {
"geodata__name": SearchAltName(
pgettext_lazy("key for text search", "geo-name"), "geodata__name__iexact"
),
"geodata__data_type": SearchAltName(
pgettext_lazy("key for text search", "geo-type"), "geodata__data_type__label__iexact"
),
"geodata__origin": SearchAltName(
pgettext_lazy("key for text search", "geo-origin"), "geodata__origin__label__iexact"
),
"geodata__provider": SearchAltName(
pgettext_lazy("key for text search", "geo-provider"), "geodata__provider__label__iexact"
),
"geodata__z": SearchAltName(
pgettext_lazy("key for text search", "z"),
"geodata__cached_z"
),
"geodata__comment": SearchAltName(
pgettext_lazy("key for text search", "geo-comment"), "geodata__comment__iexact"
),
}
@classmethod
def ALT_NAMES_FOR_FIND(cls):
dct = {}
for k in cls.ALT_NAMES:
sa = cls.ALT_NAMES[k]
dct[k] = SearchAltName(sa.search_key, "base_finds__" + sa.search_query)
return dct
class Meta:
abstract = True
@classmethod
def get_label_for_model_plural(cls):
return cls._meta.verbose_name_plural
def get_add_geo_action(self):
return (
reverse("create-pre-geo", args=[
self.__class__._meta.app_label,
self.__class__._meta.model_name,
self.pk
]),
_("Add geographic item"),
"fa fa-plus",
_("geo."),
"",
False
)
def geodata_child_item_queries(self):
"""
:return: list of queries associated geographically with this item. When
geographic data is add to this item all sub items get the geographic data.
For instance an operation return the list of context records associated, so
when you add the syrvey limit, it is associated to all context records of
the operation.
"""
return []
def save(
self, force_insert=False, force_update=False, using=None, update_fields=None
):
super(GeographicItem, self).save(
force_insert=force_insert,
force_update=force_update,
using=using,
update_fields=update_fields,
)
if self.main_geodata and not self.geodata.filter(
pk=self.main_geodata.pk).count():
try:
with transaction.atomic():
self.geodata.add(self.main_geodata)
except (OperationalError, IntegrityError):
pass
elif not self.main_geodata:
try:
with transaction.atomic():
if self.geodata.count():
# arbitrary associate the first to geodata
self.main_geodata = self.geodata.order_by("pk").all()[0]
self.skip_history_when_saving = True
self._no_move = True
self.save()
except (OperationalError, IntegrityError, IndexError):
pass
@property
def geodata_list(self):
lst = []
if self.main_geodata:
lst.append(self.main_geodata)
for geo in self.geodata.all():
if geo != self.main_geodata:
lst.append(geo)
return lst
class SerializeItem:
SERIALIZE_EXCLUDE = ["search_vector"]
SERIALIZE_PROPERTIES = [
"external_id",
"multi_polygon_geojson",
"point_2d_geojson",
"images_number",
"json_sections",
]
SERIALIZE_CALL = {}
SERIALIZE_DATES = []
SERIALIZATION_FILES = []
SERIALIZE_STRING = []
def full_serialize(self, search_model=None, recursion=False, request=None) -> dict:
"""
API serialization
:return: data dict
"""
full_result = {}
serialize_fields = []
exclude = []
if search_model:
exclude = [sf.key for sf in search_model.sheet_filters.distinct().all()]
no_geodata = False
for prop in ("main_geodata", "geodata", "geodata_list"):
if prop in self.SERIALIZE_EXCLUDE or prop in exclude:
no_geodata = True
break
for field in self._meta.get_fields():
field_name = field.name
if field_name in self.SERIALIZE_EXCLUDE:
continue
elif field_name in exclude:
full_result[field_name] = ""
elif field.many_to_one or field.one_to_one:
try:
value = getattr(self, field_name)
except (MultipleObjectsReturned, ObjectDoesNotExist):
value = None
if value:
if (
field_name not in self.SERIALIZE_STRING
and hasattr(value, "full_serialize")
and not recursion
):
# print(field.name, self.__class__, self)
if field_name == "main_geodata" and no_geodata:
continue
value = value.full_serialize(search_model, recursion=True, request=request)
elif field_name in self.SERIALIZATION_FILES:
try:
value = {"url": request.build_absolute_uri(value.url)}
except ValueError:
value = None
else:
value = str(value)
else:
value = None
full_result[field_name] = value
if field_name == "main_geodata":
full_result["geodata_list"] = [value]
elif field.many_to_many:
values = getattr(self, field_name)
if values.count():
first_value = values.all()[0]
if (
field_name not in self.SERIALIZE_STRING
and hasattr(first_value, "full_serialize")
and not recursion
):
# print(field.name, self.__class__, self)
values = [
v.full_serialize(search_model, recursion=True, request=request)
for v in values.all()
]
else:
if first_value in self.SERIALIZATION_FILES:
values = []
for v in values:
try:
values.append({"url": request.build_absolute_uri(v.url)})
except ValueError:
pass
else:
values = [str(v) for v in values.all()]
else:
values = []
full_result[field_name] = values
else:
if field_name in self.SERIALIZATION_FILES:
value = getattr(self, field_name)
try:
value = {"url": request.build_absolute_uri(value.url)}
except ValueError:
value = None
full_result[field.name] = value
else:
serialize_fields.append(field_name)
result = json.loads(serialize("json", [self], fields=serialize_fields))
full_result.update(result[0]["fields"])
for prop in self.SERIALIZE_PROPERTIES:
if prop in self.SERIALIZE_EXCLUDE or prop in exclude:
continue
if hasattr(self, prop) and prop not in full_result:
full_result[prop] = getattr(self, prop)
if "point_2d_geojson" in full_result:
full_result["point_2d"] = True
if "multi_polygon_geojson" in full_result:
full_result["multi_polygon"] = True
for prop in self.SERIALIZE_DATES:
if prop in self.SERIALIZE_EXCLUDE or prop in exclude:
continue
dt = getattr(self, prop) or ""
if dt:
dt = human_date(dt)
full_result[prop] = dt
for k in self.SERIALIZE_CALL:
if k in self.SERIALIZE_EXCLUDE or k in exclude:
continue
full_result[k] = getattr(self, self.SERIALIZE_CALL[k])()
full_result["SLUG"] = self.SLUG
full_result["pk"] = f"external_{self.pk}"
full_result["id"] = f"external_{self.id}"
return full_result
def get_associated_main_item_list(self, attr, model) -> list:
items = getattr(self, attr)
if not items.count():
return []
lst = []
table_cols = model.TABLE_COLS
if callable(table_cols):
table_cols = table_cols()
for colname in table_cols:
if colname in model.COL_LABELS:
lst.append(str(model.COL_LABELS[colname]))
else:
lst.append(model._meta.get_field(colname).verbose_name)
lst = [lst]
for values in items.values_list(*table_cols):
lst.append(["-" if v is None else v for v in values])
return lst
class ShortMenuItem:
"""
Item available in the short menu
"""
UP_MODEL_QUERY = {}
@classmethod
def get_short_menu_class(cls, pk):
return ""
@property
def short_class_name(self):
return ""
class MainItem(ShortMenuItem, SerializeItem, SheetItem):
"""
Item with quick actions available from tables
Extra actions are available from sheets
Manage cascade updated, has_changed and no_post_process
"""
QUICK_ACTIONS = []
SLUG = ""
SHOW_URL = None
DOWN_MODEL_UPDATE = []
INITIAL_VALUES = [] # list of field checkable if changed on save
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._initial_values = {}
for field_name in self.INITIAL_VALUES:
self._initial_values[field_name] = getattr(self, field_name)
def has_changed(self):
"""
Check which field have a changed value between INITIAL_VALUES
:return: list of changed fields
"""
changed = []
for field_name in self._initial_values:
value = getattr(self, field_name)
if self._initial_values[field_name] != value:
changed.append(field_name)
self._initial_values[field_name] = value
return changed
def cascade_update(self, changed=True):
if not changed:
return
if getattr(self, "_no_down_model_update", False):
return
queue = getattr(self, "_queue", settings.CELERY_DEFAULT_QUEUE)
for down_model in self.DOWN_MODEL_UPDATE:
if not settings.USE_BACKGROUND_TASK:
rel = getattr(self, down_model)
if hasattr(rel.model, "need_update"):
rel.update(need_update=True)
continue
for item in getattr(self, down_model).all():
if hasattr(self, "_timestamp"):
item._timestamp = self._timestamp
item._queue = queue
if hasattr(item, "cached_label_changed"):
item.cached_label_changed()
if hasattr(item, "main_geodata"):
item.post_save_geo()
def no_post_process(self):
self.skip_history_when_saving = True
self._cached_label_checked = True
self._post_saved_geo = True
self._external_id_checked = True
self._search_updated = True
self._no_move = True
self._no_down_model_update = True
@classmethod
def app_label(cls):
return cls._meta.app_label
@classmethod
def model_name(cls):
return cls._meta.model_name
@classmethod
def class_verbose_name(cls):
return cls._meta.verbose_name
def get_absolute_url(self):
try:
return reverse("display-item", args=[self.SLUG, self.pk])
except NoReverseMatch:
return
@classmethod
def get_columns(cls, table_cols_attr="TABLE_COLS", dict_col_labels=True):
"""
:param table_cols_attr: "TABLE_COLS" if not specified
:param dict_col_labels: (default: True) if set to False return list matching
with table_cols list
:return: (table_cols, table_col_labels)
"""
return get_columns_from_class(cls, table_cols_attr=table_cols_attr,
dict_col_labels=dict_col_labels)
def get_search_url(self):
if self.SLUG:
try:
return reverse(self.SLUG + "_search")
except NoReverseMatch:
pass
@classmethod
def get_quick_actions(cls, user, session=None, obj=None):
"""
Get a list of (url, title, icon, target) actions for an user
"""
qas = []
for action in cls.QUICK_ACTIONS:
if not action.is_available(user, session=session, obj=obj):
continue
qas.append(
[
action.base_url,
mark_safe(action.text),
mark_safe(action.rendered_icon),
action.target or "",
action.is_popup,
]
)
return qas
@classmethod
def get_quick_action_by_url(cls, url):
for action in cls.QUICK_ACTIONS:
if action.url == url:
return action
def regenerate_external_id(self):
if not hasattr(self, "external_id"):
return
try:
self.external_id = ""
self.auto_external_id = True
except AttributeError:
return
self.skip_history_when_saving = True
self._no_move = True
self.save()
def cached_label_changed(self):
self.no_post_process()
self._cached_label_checked = False
cached_label_changed(self.__class__, instance=self, created=False)
def post_save_geo(self, save=True):
if getattr(self, "_post_saved_geo", False):
return
self.no_post_process()
post_save_geo(self.__class__, instance=self, created=False)
return False
def external_id_changed(self):
self.no_post_process()
self._external_id_checked = False
external_id_changed(self.__class__, instance=self, created=False)
def can_do(self, request, action_name):
"""
Check permission availability for the current object.
:param request: request object
:param action_name: action name eg: "change_find"
:return: boolean
"""
# overload with OwnPerm when _own_ is relevant
if not getattr(request.user, "ishtaruser", None):
return False
user = request.user
return user.ishtaruser.has_right(action_name, request.session)
def get_extra_actions(self, request):
if not hasattr(self, "SLUG"):
return []
actions = []
if request.user.is_superuser and hasattr(self, "auto_external_id"):
actions += [
(
reverse("regenerate-external-id")
+ "?{}={}".format(self.SLUG, self.pk),
_("Regenerate ID"),
"fa fa-key",
_("regen."),
"btn-warning",
True,
200,
)
]
return actions
class TownManager(models.Manager):
def get_by_natural_key(self, numero_insee, year):
return self.get(numero_insee=numero_insee, year=year)
class Town(GeographicItem, Imported, DocumentItem, MainItem, models.Model):
SLUG = "town"
name = models.CharField(_("Name"), max_length=100)
surface = models.IntegerField(_("Surface (m2)"), blank=True, null=True)
center = models.PointField(
_("Localisation"), srid=settings.SRID, blank=True, null=True
)
limit = models.MultiPolygonField(_("Limit"), blank=True, null=True)
numero_insee = models.CharField("Code commune (numéro INSEE)", max_length=120)
notice = models.TextField(_("Notice"), blank=True, default="")
departement = models.ForeignKey(
Department,
verbose_name=_("Department"),
on_delete=models.SET_NULL,
null=True,
blank=True,
)
year = models.IntegerField(
_("Year of creation"),
null=True,
blank=True,
help_text=_(
"Filling this field is relevant to distinguish old towns " "from new towns."
),
)
children = models.ManyToManyField(
"Town", verbose_name=_("Town children"), blank=True, related_name="parents"
)
cached_label = models.CharField(
_("Cached name"), max_length=500, null=True, blank=True, db_index=True
)
documents = models.ManyToManyField(
"Document", related_name="towns", verbose_name=_("Documents"), blank=True
)
main_image = models.ForeignKey(
"Document",
related_name="main_image_towns",
on_delete=models.SET_NULL,
verbose_name=_("Main image"),
blank=True,
null=True,
)
objects = TownManager()
class Meta:
verbose_name = _("Town")
verbose_name_plural = _("Towns")
if settings.COUNTRY == "fr":
ordering = ["numero_insee"]
unique_together = (("numero_insee", "year"),)
ADMIN_SECTION = _("Geography")
def natural_key(self):
return (self.numero_insee, self.year)
def history_compress(self):
return {"numero_insee": self.numero_insee, "year": self.year or ""}
@classmethod
def get_documentation_string(cls):
"""
Used for automatic documentation generation
"""
return "**name** {}, **numero_insee** {}, **cached_label** {}".format(
_("Name"), "Code commune (numéro INSEE)", _("Cached name")
)
@property
def surface_ha(self):
if not self.surface:
return 0
return round(self.surface / 10000.0, 5)
def get_filename(self):
if self.numero_insee:
return f"{self.numero_insee} - {slugify(self.name)}"
return slugify(self.name)
def associated_filename(self):
return self.get_filename()
def get_values(self, prefix="", **kwargs):
return {
prefix or "label": str(self),
prefix + "name": self.name,
prefix + "numero_insee": self.numero_insee,
}
@classmethod
def history_decompress(cls, full_value, create=False):
if not full_value:
return []
res = []
for value in full_value:
try:
res.append(
cls.objects.get(
numero_insee=value["numero_insee"], year=value["year"] or None
)
)
except cls.DoesNotExist:
continue
return res
def __str__(self):
return self.cached_label or ""
def geodata_child_item_queries(self):
return [self.sites, self.operations]
@property
def label_with_areas(self):
label = [self.name]
if self.numero_insee:
label.append("({})".format(self.numero_insee))
for area in self.areas.all():
label.append(" - ")
label.append(area.full_label)
label = " ".join(label)
if self.children.count():
label += str(_(", old town of ")) + " ; ".join([
"{label} ({code})".format(label=p.name, code=p.numero_insee)
if p.numero_insee else p.name for p in self.children.all()
])
return label
@property
def detail_label(self):
return self.label_with_areas
def generate_geo(self, force=False):
force = self.generate_limit(force=force)
self.generate_center(force=force)
self.generate_area(force=force)
def generate_limit(self, force=False):
if not force and self.limit:
return
parents = None
if not self.parents.count():
return
for parent in self.parents.all():
if not parent.limit:
return
if not parents:
parents = parent.limit
else:
parents = parents.union(parent.limit)
# if union is a simple polygon make it a multi
if "MULTI" not in parents.wkt:
parents = parents.wkt.replace("POLYGON", "MULTIPOLYGON(") + ")"
if not parents:
return
self.limit = parents
self.save()
return True
def generate_center(self, force=False):
if not force and (self.center or not self.limit):
return
self.center = self.limit.centroid
if not self.center:
return False
self.save()
return True
def generate_area(self, force=False):
if not force and (self.surface or not self.limit):
return
try:
surface = self.limit.transform(settings.SURFACE_SRID, clone=True).area
except GDALException:
return False
if surface > 214748364 or not surface:
return False
self.surface = surface
self.save()
return True
def update_town_code(self):
if not self.numero_insee or not self.children.count() or not self.year:
return
old_num = self.numero_insee[:]
numero = old_num.split("-")[0]
base_insee = "{}-{}".format(numero, self.year)
self.numero_insee = base_insee
idx = 0
while Town.objects.filter(
year=self.year, numero_insee=self.numero_insee).exclude(
pk=self.pk).count():
idx += 1
self.numero_insee = base_insee + "-" + str(idx)
if self.numero_insee != old_num:
return True
def _get_base_image_path(self):
if self.numero_insee and len(self.numero_insee) == 5:
prefix = self.numero_insee[:2]
return f"{self.SLUG}/{prefix}"
return self.SLUG
def _generate_cached_label(self):
cached_label = self.name
if settings.COUNTRY == "fr" and self.numero_insee:
dpt_len = 2
if (
self.numero_insee.startswith("97")
or self.numero_insee.startswith("98")
or self.numero_insee[0]
not in ("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
):
dpt_len = 3
cached_label = "%s - %s" % (self.name, self.numero_insee[:dpt_len])
if self.year and self.children.count():
cached_label += " ({})".format(self.year)
return cached_label
def get_extra_actions(self, request):
"""
For sheet template
"""
# url, base_text, icon, extra_text, extra css class, is a quick action
actions = super().get_extra_actions(request)
profile = get_current_profile()
can_add_geo = profile.mapping and self.can_do(request, "add_geovectordata")
if can_add_geo:
actions.append(self.get_add_geo_action())
return actions
def post_save_town(sender, **kwargs):
cached_label_changed(sender, **kwargs)
town = kwargs["instance"]
town.generate_geo()
if town.update_town_code():
town.save()
post_save.connect(post_save_town, sender=Town)
m2m_changed.connect(geodata_attached_changed, sender=Town.geodata.through)
m2m_changed.connect(document_attached_changed, sender=Town.documents.through)
def town_child_changed(sender, **kwargs):
town = kwargs["instance"]
if town.update_town_code():
town.save()
m2m_changed.connect(town_child_changed, sender=Town.children.through)
def geotown_attached_changed(sender, **kwargs):
# manage associated geoitem
profile = get_current_profile()
if not profile.mapping:
return
instance = kwargs.get("instance", None)
model = kwargs.get("model", None)
pk_set = kwargs.get("pk_set", None)
action = kwargs.get("action", None)
if not instance or not model or not hasattr(instance, "post_save_geo"):
return
instance._post_save_geo_ok = False
if action in ("post_add", "post_remove", "post_clear"):
instance.post_save_geo(save=True)
class Address(BaseHistorizedItem):
FIELDS = (
"address",
"address_complement",
"postal_code",
"town",
"precise_town_id",
"country",
"alt_address",
"alt_address_complement",
"alt_postal_code",
"alt_town",
"alt_country",
"phone",
"phone_desc",
"phone2",
"phone_desc2",
"phone3",
"phone_desc3",
"raw_phone",
"mobile_phone",
"email",
"alt_address_is_prefered",
)
address = models.TextField(_("Address"), blank=True, default="")
address_complement = models.TextField(
_("Address complement"), blank=True, default=""
)
postal_code = models.CharField(
_("Postal code"), max_length=10, null=True, blank=True
)
town = models.CharField(_("Town (freeform)"), max_length=150, null=True, blank=True)
precise_town_id = models.PositiveIntegerField(
verbose_name=_("Town (precise)"),
null=True,
blank=True,
)
country = models.CharField(_("Country"), max_length=30, null=True, blank=True)
alt_address = models.TextField(_("Other address: address"), blank=True, default="")
alt_address_complement = models.TextField(
_("Other address: address complement"), blank=True, default=""
)
alt_postal_code = models.CharField(
_("Other address: postal code"), max_length=10, null=True, blank=True
)
alt_town = models.CharField(
_("Other address: town"), max_length=70, null=True, blank=True
)
alt_country = models.CharField(
_("Other address: country"), max_length=30, null=True, blank=True
)
phone = models.CharField(_("Phone"), max_length=32, null=True, blank=True)
phone_desc = models.CharField(
_("Phone description"), max_length=300, null=True, blank=True
)
phone2 = models.CharField(
_("Phone description 2"), max_length=32, null=True, blank=True
)
phone_desc2 = models.CharField(
_("Phone description 2"), max_length=300, null=True, blank=True
)
phone3 = models.CharField(_("Phone 3"), max_length=32, null=True, blank=True)
phone_desc3 = models.CharField(
_("Phone description 3"), max_length=300, null=True, blank=True
)
raw_phone = models.TextField(_("Raw phone"), blank=True, default="")
mobile_phone = models.CharField(
_("Mobile phone"), max_length=32, null=True, blank=True
)
email = models.EmailField(_("Email"), max_length=300, blank=True, null=True)
alt_address_is_prefered = models.BooleanField(
_("Alternative address is prefered"), default=False
)
history = HistoricalRecords(inherit=True)
SUB_ADDRESSES = []
class Meta:
abstract = True
@property
def precise_town(self):
if hasattr(self, "_precise_town"):
return self._precise_town
if not self.precise_town_id:
self._precise_town = None
else:
try:
self._precise_town = Town.objects.get(id=self.precise_town_id)
except Town.DoesNotExist:
self._precise_town = None
return self._precise_town
@property
def precise_town_name(self):
if not self.precise_town:
return ""
return self.precise_town.name
def get_values(self, prefix="", no_values=False, filtr=None, **kwargs):
values = super().get_values(
prefix=prefix, no_values=no_values, filtr=filtr,
**kwargs
)
values[f"{prefix}precise_town_name"] = self.precise_town_name
return values
@post_importer_action
def set_town_by_code(self, context, value):
try:
town = Town.objects.get(numero_insee=value)
except Town.DoesNotExist:
raise ImporterError(
str(_("Town with code: {} does not exists")).format(value)
)
self.precise_town_id = town.pk
self.skip_history_when_saving = True
self.save()
return self
set_town_by_code.post_save = True
def get_short_html_items(self):
items = []
if self.address:
items.append("""{}""".format(self.address))
if self.address_complement:
items.append(
"""{}""".format(
self.address_complement
)
)
if self.postal_code:
items.append(
"""{}""".format(self.postal_code)
)
if self.precise_town:
items.append(
"""{}""".format(self.precise_town.name)
)
elif self.town:
items.append("""{}""".format(self.town))
if self.country:
items.append("""{}""".format(self.country))
return items
def get_short_html_detail(self):
html = """"""
items = self.get_short_html_items()
if not items:
items = [
"{}".format(_("No associated address"))
]
html += "".join(items)
html += """
"""
return html
def get_town_centroid(self):
if self.precise_town:
return self.precise_town.center, self._meta.verbose_name
for sub_address in self.SUB_ADDRESSES:
sub_item = getattr(self, sub_address)
if sub_item and sub_item.precise_town:
return sub_item.precise_town.center, sub_item._meta.verbose_name
def get_town_polygons(self):
if self.precise_town:
return self.precise_town.limit, self._meta.verbose_name
for sub_address in self.SUB_ADDRESSES:
sub_item = getattr(self, sub_address)
if sub_item and sub_item.precise_town:
return sub_item.precise_town.limit, sub_item._meta.verbose_name
def get_attribute(self, attr):
if self.town or self.precise_town:
return getattr(self, attr)
for sub_address in self.SUB_ADDRESSES:
sub_item = getattr(self, sub_address)
if not sub_item:
continue
if sub_item.town or sub_item.precise_town:
return getattr(sub_item, attr)
return getattr(self, attr)
def get_address(self):
return self.get_attribute("address")
def get_address_complement(self):
return self.get_attribute("address_complement")
def get_postal_code(self):
return self.get_attribute("postal_code")
def get_town(self):
return self.get_attribute("town")
def get_precise_town(self):
return self.get_attribute("precise_town")
def get_country(self):
return self.get_attribute("country")
def simple_lbl(self):
return str(self)
def full_address(self):
lbl = self.simple_lbl()
if lbl:
lbl += "\n"
lbl += self.address_lbl()
return lbl
def address_lbl(self, list=False):
lbls = []
prefix = ""
if self.alt_address_is_prefered:
prefix = "alt_"
if getattr(self, prefix + "address"):
lbls.append((
getattr(self, prefix + "address"),
_("Address")
))
if getattr(self, prefix + "address_complement"):
lbls.append((
getattr(self, prefix + "address_complement"),
_("Address complement")
))
postal_code = getattr(self, prefix + "postal_code")
town = getattr(self, prefix + "town")
if postal_code or town:
lbls.append((
" ".join([postal_code, town]),
_("Postal code - Town")
))
if self.phone:
lbls.append((
self.phone,
_("Phone")
))
if self.mobile_phone:
lbls.append((
self.mobile_phone,
_("Mobile")
))
if self.email:
lbls.append((
self.email,
_("Email")
))
if list:
return lbls
return "\n".join([
value for value, lbl in lbls
])
def address_lbl_list(self):
return self.address_lbl(list=True)
class Merge(models.Model):
merge_key = models.TextField(_("Merge key"), blank=True, null=True)
merge_candidate = models.ManyToManyField("self", blank=True)
merge_exclusion = models.ManyToManyField("self", blank=True)
archived = models.NullBooleanField(default=False, blank=True, null=True)
# 1 for one word similarity, 2 for two word similarity, etc.
MERGE_CLEMENCY = None
EMPTY_MERGE_KEY = "--"
MERGE_ATTRIBUTE = "name"
class Meta:
abstract = True
def generate_merge_key(self):
if self.archived:
return
merge_attr = getattr(self, self.MERGE_ATTRIBUTE)
self.merge_key = slugify(merge_attr if merge_attr else "")
if not self.merge_key:
self.merge_key = self.EMPTY_MERGE_KEY
self.merge_key = self.merge_key
def generate_merge_candidate(self):
if self.archived:
return
if not self.merge_key:
self.generate_merge_key()
self.save(merge_key_generated=True)
if not self.pk or self.merge_key == self.EMPTY_MERGE_KEY:
return
q = (
self.__class__.objects.exclude(pk=self.pk)
.exclude(merge_exclusion=self)
.exclude(merge_candidate=self)
.exclude(archived=True)
)
if not self.MERGE_CLEMENCY:
q = q.filter(merge_key=self.merge_key)
else:
subkeys_front = "-".join(self.merge_key.split("-")[: self.MERGE_CLEMENCY])
subkeys_back = "-".join(self.merge_key.split("-")[-self.MERGE_CLEMENCY :])
q = q.filter(
Q(merge_key__istartswith=subkeys_front)
| Q(merge_key__iendswith=subkeys_back)
)
for item in q.all():
self.merge_candidate.add(item)
def save(self, *args, **kwargs):
# prevent circular save
merge_key_generated = False
if "merge_key_generated" in kwargs:
merge_key_generated = kwargs.pop("merge_key_generated")
self.generate_merge_key()
item = super(Merge, self).save(*args, **kwargs)
if not merge_key_generated:
self.merge_candidate.clear()
self.generate_merge_candidate()
return item
def archive(self):
self.archived = True
self.save()
self.merge_candidate.clear()
self.merge_exclusion.clear()
def merge(self, item, keep_old=False, exclude_fields=None):
merge_model_objects(
self, item, keep_old=keep_old, exclude_fields=exclude_fields
)
self.generate_merge_candidate()
def __get_stats_cache_values(model_name, model_pk):
StatsCache = apps.get_model("ishtar_common", "StatsCache")
q = StatsCache.objects.filter(model=model_name, model_pk=model_pk)
nb = q.count()
if nb >= 1:
sc = q.all()[0]
for extra in q.order_by("-id").all()[1:]:
extra.delete()
else:
sc = StatsCache.objects.create(model=model_name, model_pk=model_pk)
values = sc.values
if not values:
values = {}
return sc, values
@task()
def _update_stats(app, model, model_pk, funcname):
model_name = app + "." + model
model = apps.get_model(app, model)
try:
item = model.objects.get(pk=model_pk)
except model.DoesNotExist:
return
value = getattr(item, funcname)()
sc, current_values = __get_stats_cache_values(model_name, model_pk)
current_values[funcname] = value
sc.values = current_values
sc.update_requested = None
sc.updated = datetime.datetime.now()
sc.save()
def update_stats(statscache, item, funcname):
if not settings.USE_BACKGROUND_TASK:
current_values = statscache.values
if not current_values:
current_values = {}
value = getattr(item, funcname)()
current_values[funcname] = value
statscache.values = current_values
statscache.updated = datetime.datetime.now()
statscache.save()
return current_values
now = datetime.datetime.now()
app_name = item._meta.app_label
model_name = item._meta.model_name
statscache.update_requested = now.isoformat()
statscache.save()
_update_stats.delay(app_name, model_name, item.pk, funcname)
return statscache.values
class DashboardFormItem:
"""
Provide methods to manage statistics
"""
def last_stats_update(self):
model_name = self._meta.app_label + "." + self._meta.model_name
StatsCache = apps.get_model("ishtar_common", "StatsCache")
q = StatsCache.objects.filter(model=model_name, model_pk=self.pk).order_by(
"-updated"
)
if not q.count():
return
return q.all()[0].updated
def _get_or_set_stats(self, funcname, update=False, expected_type=None):
model_name = self._meta.app_label + "." + self._meta.model_name
StatsCache = apps.get_model("ishtar_common", "StatsCache")
sc, __ = StatsCache.objects.get_or_create(model=model_name, model_pk=self.pk)
if not update:
values = sc.values
if funcname not in values:
if expected_type is not None:
return expected_type()
return 0
else:
values = update_stats(sc, self, funcname)
if funcname in values:
values = values[funcname]
else:
values = 0
if expected_type is not None and not isinstance(values, expected_type):
return expected_type()
return values
@classmethod
def get_periods(cls, slice="month", fltr={}, date_source="creation"):
date_var = date_source + "_date"
q = cls.objects.filter(**{date_var + "__isnull": False})
if fltr:
q = q.filter(**fltr)
if slice == "year":
return [
res[date_var].year
for res in list(q.values(date_var).annotate(Count("id")).order_by())
]
elif slice == "month":
return [
(res[date_var].year, res[date_var].month)
for res in list(q.values(date_var).annotate(Count("id")).order_by())
]
return []
@classmethod
def get_by_year(cls, year, fltr={}, date_source="creation"):
date_var = date_source + "_date"
q = cls.objects.filter(**{date_var + "__isnull": False})
if fltr:
q = q.filter(**fltr)
return q.filter(**{date_var + "__year": year}).order_by("pk").distinct("pk")
@classmethod
def get_by_month(cls, year, month, fltr={}, date_source="creation"):
date_var = date_source + "_date"
q = cls.objects.filter(**{date_var + "__isnull": False})
if fltr:
q = q.filter(**fltr)
q = q.filter(**{date_var + "__year": year, date_var + "__month": month})
return q.order_by("pk").distinct("pk")
@classmethod
def get_total_number(cls, fltr=None):
q = cls.objects
if fltr:
q = q.filter(**fltr)
return q.order_by("pk").distinct("pk").count()
class QuickAction:
"""
Quick action available from tables
"""
def __init__(
self,
url,
icon_class="",
text="",
target=None,
rights=None,
module=None,
is_popup=True,
):
self.url = url
self.icon_class = icon_class
self.text = text
self.rights = rights
self.target = target
self.module = module
self.is_popup = is_popup
if self.target not in ("one", "many", None):
raise AttributeError("target must be one, many or None")
def is_available(self, user, session=None, obj=None):
if self.module and not getattr(get_current_profile(), self.module):
return False
if not self.rights: # no restriction
return True
if not user or not hasattr(user, "ishtaruser") or not user.ishtaruser:
return False
user = user.ishtaruser
for right in self.rights:
if user.has_perm(right, session=session, obj=obj):
return True
return False
@property
def rendered_icon(self):
if not self.icon_class:
return ""
return "".format(self.icon_class)
@property
def base_url(self):
if self.target is None:
url = reverse(self.url)
else:
# put arbitrary pk for the target
url = reverse(self.url, args=[0])
url = url[:-2] # all quick action url have to finish with the
# pk of the selected item and a "/"
return url
class DynamicRequest:
def __init__(
self,
label,
app_name,
model_name,
form_key,
search_key,
type_query,
search_query,
):
self.label = label
self.form_key = form_key
self.search_key = search_key
self.app_name = app_name
self.model_name = model_name
self.type_query = type_query
self.search_query = search_query
def get_all_types(self):
model = apps.get_app_config(self.app_name).get_model(self.model_name)
return model.objects.filter(available=True)
def get_form_fields(self):
fields = {}
for item in self.get_all_types().all():
fields[self.form_key + "-" + item.txt_idx] = forms.CharField(
label=str(self.label) + " " + str(item), required=False
)
return fields
def get_extra_query(self, slug):
return {self.type_query: slug}
def get_alt_names(self):
alt_names = {}
for item in self.get_all_types().all():
alt_names[self.form_key + "-" + item.txt_idx] = SearchAltName(
self.search_key + "-" + item.txt_idx,
self.search_query,
self.get_extra_query(item.txt_idx),
distinct_query=True,
)
return alt_names
class GeoItem(GeographicItem):
NUMBER_FIELDS = ["geodata__cached_z"]
# gis - to be removed
GEO_SOURCE = (("T", _("Town")), ("P", _("Precise")), ("M", _("Polygon")))
x = models.FloatField(_("X"), blank=True, null=True)
y = models.FloatField(_("Y"), blank=True, null=True)
z = models.FloatField(_("Z"), blank=True, null=True)
estimated_error_x = models.FloatField(
_("Estimated error for X"), blank=True, null=True
)
estimated_error_y = models.FloatField(
_("Estimated error for Y"), blank=True, null=True
)
estimated_error_z = models.FloatField(
_("Estimated error for Z"), blank=True, null=True
)
spatial_reference_system = models.ForeignKey(
SpatialReferenceSystem,
verbose_name=_("Spatial Reference System"),
blank=True,
null=True,
on_delete=models.PROTECT,
)
point = models.PointField(_("Point"), blank=True, null=True, dim=3)
point_2d = models.PointField(_("Point (2D)"), blank=True, null=True)
point_source = models.CharField(
_("Point source"), choices=GEO_SOURCE, max_length=1, blank=True, null=True
)
point_source_item = models.CharField(
_("Point source item"), max_length=100, blank=True, null=True
)
multi_polygon = models.MultiPolygonField(_("Multi polygon"), blank=True, null=True)
multi_polygon_source = models.CharField(
_("Multi-polygon source"),
choices=GEO_SOURCE,
max_length=1,
blank=True,
null=True,
)
multi_polygon_source_item = models.CharField(
_("Multi polygon source item"), max_length=100, blank=True, null=True
)
GEO_LABEL = ""
class Meta:
abstract = True
def get_town_centroid(self):
raise NotImplementedError
def get_town_polygons(self):
raise NotImplementedError
@property
def X(self):
"""x coordinates using the default SRS"""
coord = self.display_coordinates
if not coord:
return
return coord[0]
@property
def Y(self):
"""y coordinates using the default SRS"""
coord = self.display_coordinates
if not coord:
return
return coord[1]
@property
def display_coordinates(self, rounded=True):
if not self.main_geodata:
return ""
return self.main_geodata.display_coordinates(rounded=rounded)
@property
def display_spatial_reference_system(self):
profile = get_current_profile()
if not profile.display_srs or not profile.display_srs.srid:
return self.spatial_reference_system
return profile.display_srs
def get_precise_points(self):
if self.point_source == "P" and self.point_2d:
return self.point_2d, self.point, self.point_source_item
def get_precise_polygons(self):
if self.multi_polygon_source == "P" and self.multi_polygon:
return self.multi_polygon, self.multi_polygon_source_item
def get_geo_items(self, rounded=5):
if not self.main_geodata:
return {}
return self.main_geodata.get_geo_items(rounded)
def convert_coordinates(self, point_2d, rounded):
profile = get_current_profile()
if (
not profile.display_srs
or not profile.display_srs.srid
or (
profile.display_srs == self.spatial_reference_system
and point_2d.x
and point_2d.y
)
):
x, y = point_2d.x, point_2d.y
else:
point = point_2d.transform(profile.display_srs.srid, clone=True)
x, y = point.x, point.y
if rounded:
return [round(x, 5), round(y, 5)]
return [x, y]
def _geojson_serialize(self, geom_attr):
if not hasattr(self, geom_attr):
return ""
cached_label_key = "cached_label"
if self.GEO_LABEL:
cached_label_key = self.GEO_LABEL
if getattr(self, "CACHED_LABELS", None):
cached_label_key = self.CACHED_LABELS[-1]
geojson = serialize(
"geojson",
self.__class__.objects.filter(pk=self.pk),
geometry_field=geom_attr,
fields=(cached_label_key,),
)
geojson_dct = json.loads(geojson)
profile = get_current_profile()
precision = profile.point_precision
features = geojson_dct.pop("features")
for idx in range(len(features)):
feature = features[idx]
lbl = feature["properties"].pop(cached_label_key)
feature["properties"]["name"] = lbl
feature["properties"]["id"] = self.pk
if precision is not None:
geom_type = feature["geometry"].get("type", None)
if geom_type == "Point":
feature["geometry"]["coordinates"] = [
round(coord, precision)
for coord in feature["geometry"]["coordinates"]
]
geojson_dct["features"] = features
geojson_dct["link_template"] = simple_link_to_window(self).replace(
"999999", ""
)
geojson = json.dumps(geojson_dct)
return geojson
@property
def point_2d_geojson(self):
return self._geojson_serialize("point_2d")
@property
def multi_polygon_geojson(self):
return self._geojson_serialize("multi_polygon")
class ImageContainerModel:
def _get_image_path(self, filename):
return "{}/{}".format(self._get_base_image_path(), filename)
def _get_base_image_path(self):
n = datetime.datetime.now()
return "upload/{}/{:02d}/{:02d}".format(n.year, n.month, n.day)
class CompleteIdentifierItem(models.Model, ImageContainerModel):
HAS_QR_CODE = True
cached_label = models.TextField(
_("Cached name"),
blank=True,
default="",
db_index=True,
help_text=_("Generated automatically - do not edit"),
)
complete_identifier = models.TextField(
_("Complete identifier"), blank=True, default=""
)
custom_index = models.IntegerField("Custom index", blank=True, null=True)
qrcode = models.ImageField(
upload_to=get_image_path, blank=True, null=True, max_length=255
)
class Meta:
abstract = True
@property
def qrcode_path(self):
if not self.qrcode:
self.generate_qrcode()
if not self.qrcode: # error on qrcode generation
return ""
return self.qrcode.path
def _profile_generate_cached_label(self):
slug = getattr(self, "SLUG", None)
if not slug:
return
return get_generated_id(slug + "_cached_label", self)
def _generate_cached_label(self):
label = self._profile_generate_cached_label()
if not label:
# to be eventually overloaded by parent class
return str(self)
return label
def generate_qrcode(self, request=None, secure=True, tmpdir=None):
if not hasattr(self, "get_absolute_url"):
return
url = self.get_absolute_url()
site = Site.objects.get_current()
if request:
scheme = request.scheme
else:
if secure:
scheme = "https"
else:
scheme = "http"
url = scheme + "://" + site.domain + url
TinyUrl = apps.get_model("ishtar_common", "TinyUrl")
tiny_url = TinyUrl()
tiny_url.link = url
tiny_url.save()
short_url = (
scheme
+ "://"
+ site.domain
+ reverse("tiny-redirect", args=[tiny_url.get_short_id()])
)
qr = pyqrcode.create(short_url, version=settings.ISHTAR_QRCODE_VERSION)
tmpdir_created = False
if not tmpdir:
tmpdir = tempfile.mkdtemp("-qrcode")
tmpdir_created = True
filename = tmpdir + os.sep + "qrcode.png"
qr.png(filename, scale=settings.ISHTAR_QRCODE_SCALE)
self.skip_history_when_saving = True
self._no_move = True
self._no_geo_check = True
with open(filename, "rb") as qrfile:
self.qrcode.save("qrcode.png", File(qrfile))
self.save()
if tmpdir_created:
shutil.rmtree(tmpdir)
def generate_complete_identifier(self):
SLUG = getattr(self, "SLUG", None)
if not SLUG:
return ""
complete_identifier = get_generated_id(SLUG + "_complete_identifier", self)
if complete_identifier:
return complete_identifier
cached_label_key = "cached_label"
if getattr(self, "GEO_LABEL", None):
cached_label_key = getattr(self, "GEO_LABEL", None)
if hasattr(self, "CACHED_COMPLETE_ID"):
cached_label_key = self.CACHED_COMPLETE_ID
if not cached_label_key:
return
complete_identifier = getattr(self, cached_label_key)
return complete_identifier
def get_index_whole_db(self):
q = self.__class__.objects.exclude(custom_index__isnull=True)
q = q.order_by("-custom_index")
if q.count():
current_index = q.values_list("custom_index", flat=True).all()[0]
return current_index + 1
return 1
def generate_custom_index(self, force=False):
if not self.pk:
return
if self.custom_index and not force:
return self.custom_index
SLUG = getattr(self, "SLUG", None)
if not SLUG:
return
k = SLUG + "_custom_index"
profile = get_current_profile()
if not hasattr(profile, k):
return
key = getattr(profile, k)
if not key or not key.strip():
return
keys = key.strip().split(";")
if len(keys) == 1 and hasattr(self, "get_index_" + keys[0]):
# custom index generation
return getattr(self, "get_index_" + key)()
model = self.__class__
try:
self_keys = set(list(model.objects.filter(pk=self.pk).values_list(*keys)))
except Exception: # bad settings - not managed here
print("Bad settings for custom_index {}".format(";".join(keys)))
return
if len(self_keys) != 1: # key is not distinct
return
self_key = self_keys.pop()
return self._get_index(keys, self_key)
def _get_index(self, keys: list, self_keys: list):
model = self.__class__
q = model.objects
if self.pk:
q = model.objects.exclude(pk=self.pk)
for idx, key in enumerate(keys):
q = q.filter(**{key: self_keys[idx]})
try:
r = q.aggregate(max_index=Max("custom_index"))
except Exception: # bad settings
return
if not r["max_index"]:
return 1
return r["max_index"] + 1
def save(self, *args, **kwargs):
super(CompleteIdentifierItem, self).save(*args, **kwargs)
self.regenerate_all_ids()
def regenerate_all_ids(self, save=True):
if getattr(self, "_prevent_loop", False):
return {}
updated = {}
custom_index = self.generate_custom_index()
if custom_index != self.custom_index:
self.custom_index = custom_index
updated["custom_index"] = custom_index
complete_id = self.generate_complete_identifier()
if complete_id and complete_id != self.complete_identifier:
self.complete_identifier = complete_id
updated["complete_identifier"] = complete_id
if updated:
self._prevent_loop = True
self.skip_history_when_saving = True
if save:
if self.pk:
self.__class__.objects.filter(pk=self.pk).update(**updated)
else:
self.save()
return updated
class SearchVectorConfig:
def __init__(self, key, language=None, func=None):
self.key = key
if language:
self.language = language
if language == "local":
self.language = settings.ISHTAR_SEARCH_LANGUAGE
else:
self.language = "simple"
self.func = func
def format(self, value):
if value == "None":
value = ""
if not self.func:
return [value]
value = self.func(value)
if not isinstance(value, list):
return [value]
return value