#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Generic models and tools for models """ import copy from collections import OrderedDict import datetime import fiona from importlib import import_module import json import logging import os import pyqrcode import re import shutil import sys import tempfile import time from unidecode import unidecode from django import forms from django.apps import apps from django.conf import settings from django.contrib.auth.models import User from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.gis.db import models from django.contrib.gis.geos import GEOSGeometry, Point from django.contrib.gis.gdal.error import GDALException from django.contrib.postgres.search import SearchVectorField, SearchVector from django.contrib.sites.models import Site from django.core.cache import cache as django_cache from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned from django.core.files import File from django.core.serializers import serialize from django.urls import reverse, NoReverseMatch from django.core.validators import validate_slug from django.db import connection, transaction, OperationalError, IntegrityError from django.db.models import JSONField, Q, Count, Max from django.db.models.signals import post_save, post_delete, m2m_changed from django.template import loader from django.template.defaultfilters import slugify from django.utils import timezone from django.utils.safestring import SafeText, mark_safe from django.utils.translation import activate, deactivate from ishtar_common.utils import ( BColors, gettext_lazy as _, pgettext_lazy, get_image_path, get_columns_from_class, human_date, HistoryError, SearchAltName, SheetItem, get_progress ) from simple_history.models import HistoricalRecords as BaseHistoricalRecords from simple_history.signals import ( post_create_historical_record, pre_create_historical_record, ) from ishtar_common.data_importer import post_importer_action, ImporterError from ishtar_common.model_managers import TypeManager from ishtar_common.model_merging import merge_model_objects from ishtar_common.models_imports import Import from ishtar_common.templatetags.link_to_window import simple_link_to_window from ishtar_common.utils import ( cached_label_changed, disable_for_loaddata, duplicate_item, external_id_changed, GENERAL_TYPE_PREFIX, get_all_field_names, get_cache, get_current_profile, get_generated_id, merge_tsvectors, OwnPerms, post_save_geo, post_save_geodata, task, ) logger = logging.getLogger(__name__) """ from ishtar_common.models import GeneralType, get_external_id, \ LightHistorizedItem, OwnPerms, Address, post_save_cache, \ DashboardFormItem, document_attached_changed, SearchAltName, \ DynamicRequest, GeoItem, QRCodeItem, SearchVectorConfig, DocumentItem, \ QuickAction, MainItem, Merge """ class CachedGen(object): @classmethod def refresh_cache(cls): raise NotImplementedError() @classmethod def _add_cache_key_to_refresh(cls, keys): cache_ckey, current_keys = get_cache(cls, ["_current_keys"]) if type(current_keys) != list: current_keys = [] if keys not in current_keys: current_keys.append(keys) django_cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT) class Cached(CachedGen): slug_field = "txt_idx" @classmethod def refresh_cache(cls): cache_ckey, current_keys = get_cache(cls, ["_current_keys"]) if not current_keys: return for keys in current_keys: if len(keys) == 2 and keys[0] == "__slug": cls.get_cache(keys[1], force=True) elif keys[0] == "__get_types": default = None empty_first = True exclude = [] if len(keys) >= 2: default = keys.pop() if len(keys) > 1: empty_first = bool(keys.pop()) exclude = keys[1:] cls.get_types( exclude=exclude, empty_first=empty_first, default=default, force=True, ) elif keys[0] == "__get_help": cls.get_help(force=True) @classmethod def _add_cache_key_to_refresh(cls, keys): cache_ckey, current_keys = get_cache(cls, ["_current_keys"]) if type(current_keys) != list: current_keys = [] if keys not in current_keys: current_keys.append(keys) django_cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT) @classmethod def get_cache(cls, slug, force=False): cache_key, value = get_cache(cls, ["__slug", slug]) if not force and value: return value try: k = {cls.slug_field: slug} obj = cls.objects.get(**k) django_cache.set(cache_key, obj, settings.CACHE_TIMEOUT) return obj except cls.DoesNotExist: django_cache.set(cache_key, None, settings.CACHE_TIMEOUT) return None @disable_for_loaddata def post_save_cache(sender, **kwargs): sender.refresh_cache() class GeneralType(Cached, models.Model): """ Abstract class for "types" """ label = models.TextField(_("Label")) txt_idx = models.TextField( _("Textual ID"), validators=[validate_slug], unique=True, help_text=_( "The slug is the standardized version of the name. It contains " "only lowercase letters, numbers and hyphens. Each slug must " "be unique." ), ) comment = models.TextField(_("Comment"), blank=True, default="") available = models.BooleanField(_("Available"), default=True) HELP_TEXT = "" objects = TypeManager() class Meta: abstract = True def __str__(self): return self.label def natural_key(self): return (self.txt_idx,) def history_compress(self): return self.txt_idx @classmethod def get_documentation_string(cls): """ Used for automatic documentation generation """ s = "**label** {}, **txt_idx** {}".format(str(_("Label")), str(_("Textual ID"))) if hasattr(cls, "extra_documentation_string"): s += cls.extra_documentation_string() return s @classmethod def admin_url(cls): return str( reverse( "admin:{}_{}_changelist".format( cls._meta.app_label, cls._meta.model_name ) ) ) @classmethod def history_decompress(cls, value, create=False): if not value: return [] res = [] for txt_idx in value: try: res.append(cls.objects.get(txt_idx=txt_idx)) except cls.DoesNotExist: continue return res @property def explicit_label(self): return "{} ({})".format(self.label, self._meta.verbose_name) @classmethod def create_default_for_test(cls): return [cls.objects.create(label="Test %d" % i) for i in range(5)] @property def short_label(self): return self.label @property def name(self): return self.label @classmethod def get_or_create(cls, slug, label=""): """ Get or create a new item. :param slug: textual id :param label: label for initialization if the item doesn't exist (not mandatory) :return: instancied item of the base class """ item = cls.get_cache(slug) if item: return item item, created = cls.objects.get_or_create( txt_idx=slug, defaults={"label": label} ) return item @classmethod def get_or_create_pk(cls, slug): """ Get an id from a slug. Create the associated item if needed. :param slug: textual id :return: id of the item (string) """ return str(cls.get_or_create(slug).pk) @classmethod def get_or_create_pks(cls, slugs): """ Get and merge a list of ids from a slug list. Create the associated items if needed. :param slugs: textual ids :return: string with ids separated by "_" """ items = [] for slug in slugs: items.append(str(cls.get_or_create(slug).pk)) return "_".join(items) @classmethod def get_help(cls, dct=None, exclude=None, force=False, full_hierarchy=None): if not dct: dct = {} if not exclude: exclude = [] keys = ["__get_help"] keys += ["{}".format(ex) for ex in exclude] keys += ["{}-{}".format(str(k), dct[k]) for k in dct] cache_key, value = get_cache(cls, keys) if value and not force: return mark_safe(value) help_text = cls.HELP_TEXT c_rank = -1 help_items = "\n" for item in cls.get_types(dct=dct, instances=True, exclude=exclude): if hasattr(item, "__iter__"): pk = item[0] item = cls.objects.get(pk=pk) item.rank = c_rank + 1 if hasattr(item, "parent"): c_item = item parents = [] while c_item.parent: parents.append(c_item.parent.label) c_item = c_item.parent parents.reverse() parents.append(item.label) item.label = " / ".join(parents) if not item.comment: continue if c_rank > item.rank: help_items += "\n" elif c_rank < item.rank: help_items += "
\n" c_rank = item.rank help_items += "
%s
%s
" % ( item.label, "
".join(item.comment.split("\n")), ) c_rank += 1 if c_rank: help_items += c_rank * "
" if help_text or help_items != "\n": help_text = help_text + help_items else: help_text = "" django_cache.set(cache_key, help_text, settings.CACHE_TIMEOUT) return mark_safe(help_text) @classmethod def _get_initial_types(cls, initial, type_pks, instance=False): new_vals = [] if not initial: return [] if not isinstance(initial, (list, tuple)): initial = [initial] for value in initial: try: pk = int(value) except (ValueError, TypeError): continue if pk in type_pks: continue try: extra_type = cls.objects.get(pk=pk) if instance: new_vals.append(extra_type) else: new_vals.append((extra_type.pk, str(extra_type))) except cls.DoesNotExist: continue return new_vals @classmethod def get_types( cls, dct=None, instances=False, exclude=None, empty_first=True, default=None, initial=None, force=False, full_hierarchy=False, ): if not dct: dct = {} if not exclude: exclude = [] types = [] if not instances and empty_first and not default: types = [("", "--")] types += cls._pre_get_types( dct, instances, exclude, default, force, get_full_hierarchy=full_hierarchy ) if not initial: return types new_vals = cls._get_initial_types(initial, [idx for idx, lbl in types]) types += new_vals return types @classmethod def _pre_get_types( cls, dct=None, instances=False, exclude=None, default=None, force=False, get_full_hierarchy=False, ): if not dct: dct = {} if not exclude: exclude = [] # cache cache_key = None if not instances: keys = ["__get_types"] keys += ["{}".format(ex) for ex in exclude] + ["{}".format(default)] keys += ["{}-{}".format(str(k), dct[k]) for k in dct] cache_key, value = get_cache(cls, keys) if value and not force: return value base_dct = dct.copy() if hasattr(cls, "parent"): if not cache_key: return cls._get_parent_types( base_dct, instances, exclude=exclude, default=default, get_full_hierarchy=get_full_hierarchy, ) vals = [ v for v in cls._get_parent_types( base_dct, instances, exclude=exclude, default=default, get_full_hierarchy=get_full_hierarchy, ) ] django_cache.set(cache_key, vals, settings.CACHE_TIMEOUT) return vals if not cache_key: return cls._get_types(base_dct, instances, exclude=exclude, default=default) vals = [ v for v in cls._get_types( base_dct, instances, exclude=exclude, default=default ) ] django_cache.set(cache_key, vals, settings.CACHE_TIMEOUT) return vals @classmethod def _get_types(cls, dct=None, instances=False, exclude=None, default=None): if not dct: dct = {} if not exclude: exclude = [] dct["available"] = True if default: try: default = cls.objects.get(txt_idx=default) yield (default.pk, str(default)) except cls.DoesNotExist: pass items = cls.objects.filter(**dct) if default and default != "None": if hasattr(default, "txt_idx"): exclude.append(default.txt_idx) else: exclude.append(default) if exclude: items = items.exclude(txt_idx__in=exclude) for item in items.order_by(*cls._meta.ordering).all(): if instances: item.rank = 0 yield item else: yield item.pk, str(item) if item and str(item) else "" @classmethod def _get_childs_list(cls, dct=None, exclude=None, instances=False): if not dct: dct = {} if not exclude: exclude = [] if "parent" in dct: dct.pop("parent") childs = cls.objects.filter(**dct) if exclude: childs = childs.exclude(txt_idx__in=exclude) ordering = cls._meta.ordering if not ordering and hasattr(cls, "order"): childs = childs.order_by("order") else: childs = childs.order_by(*ordering) res = {} if instances: for item in childs.all(): parent_id = item.parent_id or 0 if parent_id not in res: res[parent_id] = [] res[parent_id].append(item) else: for item in childs.values("id", "parent_id", "label").all(): parent_id = item["parent_id"] or 0 if item["id"] == item["parent_id"]: parent_id = 0 if parent_id not in res: res[parent_id] = [] res[parent_id].append((item["id"], item["label"])) return res @classmethod def _get_childs( cls, item, child_list, prefix=0, instances=False, is_last=False, last_of=None, get_full_hierarchy=False, ): if not last_of: last_of = [] prefix += 1 current_child_lst = [] if item in child_list: current_child_lst = child_list[item] lst = [] total = len(current_child_lst) full_hierarchy_initial = get_full_hierarchy for idx, child in enumerate(current_child_lst): mylast_of = last_of[:] p = "" if instances: child.rank = prefix lst.append(child) else: if full_hierarchy_initial: if isinstance(full_hierarchy_initial, str): p = full_hierarchy_initial + " > " else: p = "" else: cprefix = prefix while cprefix: cprefix -= 1 if not cprefix: if (idx + 1) == total: p += GENERAL_TYPE_PREFIX["prefix_last"] else: p += GENERAL_TYPE_PREFIX["prefix_medium"] elif is_last: if mylast_of: clast = mylast_of.pop(0) if clast: p += GENERAL_TYPE_PREFIX["prefix_empty"] else: p += GENERAL_TYPE_PREFIX["prefix"] else: p += GENERAL_TYPE_PREFIX["prefix_empty"] else: p += GENERAL_TYPE_PREFIX["prefix"] lst.append((child[0], SafeText(p + child[1]))) clast_of = last_of[:] clast_of.append(idx + 1 == total) if instances: child_id = child.id else: child_id = child[0] if get_full_hierarchy: if p: if not p.endswith(" > "): p += " > " get_full_hierarchy = p + child[1] else: get_full_hierarchy = child[1] for sub_child in cls._get_childs( child_id, child_list, prefix, instances, is_last=((idx + 1) == total), last_of=clast_of, get_full_hierarchy=get_full_hierarchy, ): lst.append(sub_child) return lst @classmethod def _get_parent_types( cls, dct=None, instances=False, exclude=None, default=None, get_full_hierarchy=False, ): if not dct: dct = {} if not exclude: exclude = [] dct["available"] = True child_list = cls._get_childs_list(dct, exclude, instances) if 0 in child_list: for item in child_list[0]: if instances: item.rank = 0 item_id = item.pk yield item else: item_id = item[0] yield item if get_full_hierarchy: get_full_hierarchy = item[1] for child in cls._get_childs( item_id, child_list, instances=instances, get_full_hierarchy=get_full_hierarchy, ): yield child def set_txt_idx(self): base_q = self.__class__.objects if self.pk: base_q = base_q.exclude(pk=self.pk) count, txt_idx = True, None idx = 0 while count: if not txt_idx: txt_idx = slugify(self.label)[:100] else: txt_idx = txt_idx[:-5] + f"{idx:05d}" q = base_q.filter(txt_idx=txt_idx) count = q.count() idx += 1 self.txt_idx = txt_idx def save(self, *args, **kwargs): ItemKey = apps.get_model("ishtar_common", "ItemKey") if not self.id and not self.label: txt_idx = self.txt_idx if isinstance(txt_idx, list): txt_idx = txt_idx[0] self.txt_idx = txt_idx self.label = " ".join(" ".join(self.txt_idx.split("-")).split("_")).title() if not self.txt_idx: self.set_txt_idx() # clean old keys if self.pk: old = self.__class__.objects.get(pk=self.pk) content_type = ContentType.objects.get_for_model(self.__class__) if slugify(self.label) != slugify(old.label): ItemKey.objects.filter( object_id=self.pk, key=slugify(old.label), content_type=content_type ).delete() if self.txt_idx != old.txt_idx: ItemKey.objects.filter( object_id=self.pk, key=old.txt_idx, content_type=content_type ).delete() obj = super(GeneralType, self).save(*args, **kwargs) self.generate_key(force=True) return obj def add_key(self, key, force=False, importer_type=None, ishtar_import=None, group=None, user=None): ItemKey = apps.get_model("ishtar_common", "ItemKey") content_type = ContentType.objects.get_for_model(self.__class__) if ( not ishtar_import and not force and ItemKey.objects.filter(key=key, content_type=content_type).count() ): return filtr = {"key": key, "content_type": content_type} if importer_type: filtr["importer_type"] = importer_type elif group: filtr["group"] = group filtr["importer_type"] = ishtar_import.importer_type if ishtar_import else None elif user: filtr["user"] = user filtr["importer_type"] = ishtar_import.importer_type if ishtar_import else None else: filtr["ishtar_import"] = ishtar_import filtr["importer_type"] = ishtar_import.importer_type if ishtar_import else None if force: ItemKey.objects.filter(**filtr).exclude(object_id=self.pk).delete() filtr["object_id"] = self.pk ItemKey.objects.get_or_create(**filtr) def generate_key(self, force=False): for key in (slugify(self.label), self.txt_idx): self.add_key(key) def get_keys(self, current_import): importer_type = current_import.importer_type ItemKey = apps.get_model("ishtar_common", "ItemKey") keys = [self.txt_idx] content_type = ContentType.objects.get_for_model(self.__class__) base_q = Q(content_type=content_type, object_id=self.pk) subquery = Q(importer_type__isnull=True, user__isnull=True, group__isnull=True) subquery |= Q(user__isnull=True, group__isnull=True, importer_type=importer_type) if current_import.user: subquery |= Q(user=current_import.user, group__isnull=True, importer_type=importer_type) if current_import.associated_group: subquery |= Q( user__isnull=True, group=current_import.associated_group, importer_type=importer_type ) q = ItemKey.objects.filter(base_q & subquery) for ik in q.exclude(key=self.txt_idx).all(): keys.append(ik.key) return keys @classmethod def generate_keys(cls): # content_type = ContentType.objects.get_for_model(cls) for item in cls.objects.all(): item.generate_key() class OrderedModel(models.Model): order = models.IntegerField(_("Order"), default=10) class Meta: abstract = True class OrderedType(OrderedModel, GeneralType): class Meta: abstract = True class HierarchicalType(GeneralType): parent = models.ForeignKey( "self", blank=True, null=True, on_delete=models.SET_NULL, verbose_name=_("Parent"), ) class Meta: abstract = True has_full_label = True def full_label(self): lbls = [self.label] item = self parents = [self.pk] # prevent loop while item.parent and item.parent_id not in parents: parents.append(item.parent_id) item = item.parent lbls.append(item.label) return " > ".join(reversed(lbls)) @property def first_parent(self): parent = self.parent parents = [] while parent: if parent in parents: # prevent circular return parent parents.append(parent) if not parent.parent: return parent parent = parent.parent class OrderedHierarchicalType(OrderedModel, HierarchicalType): class Meta: abstract = True class StatisticItem: STATISTIC_MODALITIES = [] # example: "year", "operation_type__label" STATISTIC_MODALITIES_OPTIONS = OrderedDict() # example: # OrderedDict([('year', _("Year")), # ("operation_type__label", _("Operation type"))]) STATISTIC_SUM_VARIABLE = OrderedDict( (("pk", (_("Number"), 1)),) ) # example: "Price", "Volume" - the number is a multiplier class TemplateItem: @classmethod def _label_templates_q(cls): model_name = "{}.{}".format(cls.__module__, cls.__name__) q = Q(associated_model__klass=model_name, for_labels=True, available=True) alt_model_name = model_name.replace("models_finds", "models").replace( "models_treatments", "models" ) if alt_model_name != model_name: q |= Q(associated_model__klass=model_name, for_labels=True, available=True) DocumentTemplate = apps.get_model("ishtar_common", "DocumentTemplate") return DocumentTemplate.objects.filter(q) @classmethod def has_label_templates(cls): return cls._label_templates_q().count() @classmethod def label_templates(cls): return cls._label_templates_q() def get_extra_templates(self, request): cls = self.__class__ templates = [] name = str(cls.__name__) module = str(cls.__module__) if "archaeological_finds" in module: if "models_finds" in name or "models_treatments" in name: names = [ name, name.replace("models_finds", "models").replace( "models_treatments", "models" ), ] else: names = [ name, name.replace("models", "models_finds"), name.replace("models", "models_treatments"), ] else: names = [name] model_names = ["{}.{}".format(module, name) for name in names] DocumentTemplate = apps.get_model("ishtar_common", "DocumentTemplate") q = DocumentTemplate.objects.filter( associated_model__klass__in=model_names, for_labels=False, available=True ) for template in q.all(): urlname = "generate-document" templates.append( (template.name, reverse(urlname, args=[template.slug, self.pk])) ) return templates class BaseSheetFilter(models.Model): key = models.TextField(_("Key"), default="-") class Meta: abstract = True def get_template(self): raise NotImplemented() def get_keys(self): attrs = re.compile(r"item\.([_a-zA-Z]+)") includes = re.compile(r"""\{\% *include *["'](/?(?:[^/]+/?)+)["'] *\%\}""") main_template = self.get_template() templates = [main_template] with open(main_template, "r") as fle: content = fle.read() keys = attrs.findall(content) for line in content.split("\n"): for tpl_name in includes.findall(line): if tpl_name in templates: continue templates.append(tpl_name) tpl = loader.get_template(tpl_name) with open(tpl.template.origin.name, "r") as fle: sub_content = fle.read() keys += attrs.findall(sub_content) return sorted(set([k.replace("_not_available", "") for k in keys])) class SheetFilter(BaseSheetFilter): content_type = models.ForeignKey( ContentType, related_name="content_type_sheetfilter", on_delete=models.CASCADE ) exclude_or_include = models.CharField( _("Exclude or include"), default="E", max_length=1, choices=(("E", _("exclude")), ("I", _("include"))) ) class Meta: verbose_name = _("Filtered sheet - Filter") verbose_name_plural = _("Filtered sheet - Filters") ADMIN_SECTION = _("Account") def __str__(self): exc = _("exclude") if self.exclude_or_include == "I": exc = _("include") return f"{self.content_type.model_class()._meta.verbose_name} | {exc} | {self.key}" def get_template(self): ct = self.content_type model = apps.get_model(ct.app_label, ct.model) tpl = loader.get_template(f"ishtar/sheet_{model.SLUG}.html") return tpl.template.origin.name class FilteredSheet(models.Model): name = models.CharField(_("Name"), max_length=200) content_type = models.ForeignKey( ContentType, related_name="content_type_filteredsheet", on_delete=models.CASCADE ) filters = models.ManyToManyField( SheetFilter, blank=True, verbose_name=_("Filters"), related_name="filtered_sheet" ) class Meta: verbose_name = _("Filtered sheet - Sheet") verbose_name_plural = _("Filtered sheet - Sheets") ADMIN_SECTION = _("Account") def __str__(self): return f"{self.content_type.model_class()._meta.verbose_name} | {self.name}" @property def exclude_or_include(self): q = self.filters.filter(exclude_or_include="E") return "E" if q.count() else "I" class FullSearch(models.Model): search_vector = SearchVectorField( _("Search vector"), blank=True, null=True, help_text=_("Auto filled at save") ) EXTRA_REQUEST_KEYS = {} DYNAMIC_REQUESTS = {} ALT_NAMES = {} BOOL_FIELDS = [] NUMBER_FIELDS = [] REVERSED_BOOL_FIELDS = [] CALLABLE_BOOL_FIELDS = [] BASE_SEARCH_VECTORS = [] PROPERTY_SEARCH_VECTORS = [] INT_SEARCH_VECTORS = [] M2M_SEARCH_VECTORS = [] PARENT_SEARCH_VECTORS = [] # prevent circular dependency PARENT_ONLY_SEARCH_VECTORS = [] # tuple (module, class) in text for dynamic import DEFAULT_SEARCH_FORM = tuple() class Meta: abstract = True @classmethod def get_default_search_form(cls): # DEFAULT_SEARCH_FORM is used to get the form when exporting tables if not cls.DEFAULT_SEARCH_FORM: return form = getattr(import_module(cls.DEFAULT_SEARCH_FORM[0]), cls.DEFAULT_SEARCH_FORM[1]) return form @classmethod def general_types(cls): for k in get_all_field_names(cls): field = cls._meta.get_field(k) if not hasattr(field, "remote_field") or not field.remote_field: continue rel_model = field.remote_field.model if issubclass(rel_model, (GeneralType, HierarchicalType)): yield k @classmethod def get_alt_names(cls): alt_names = cls.ALT_NAMES.copy() for dr_k in cls.DYNAMIC_REQUESTS: alt_names.update(cls.DYNAMIC_REQUESTS[dr_k].get_alt_names()) return alt_names @classmethod def get_query_parameters(cls): query_parameters = {} for v in cls.get_alt_names().values(): for language_code, language_lbl in settings.LANGUAGES: activate(language_code) query_parameters[str(v.search_key)] = v deactivate() return query_parameters @classmethod def _update_raw_search_field(cls, value): result = [] if not value: value = "" for val in value.split("'"): result.append(f"'{val.lower()}':1") SEPS = [" ", "-", "/"] values = [] # split ID terms for idx, sep in enumerate(SEPS): if not idx: values = value.split(sep) continue new_values = [] for val in values: new_values += val.split(sep) values = new_values for val in values: if len(val) < 2: continue val = val.replace("'", "").lower() result.append(f"'{val}':1") return result def _update_search_field(self, search_vector_conf, search_vectors, data): for value in search_vector_conf.format(data): if search_vector_conf.language == "raw": search_vectors += self._update_raw_search_field(value) continue with connection.cursor() as cursor: cursor.execute( "SELECT to_tsvector(%s, %s)", [search_vector_conf.language, value] ) row = cursor.fetchone() search_vectors.append(row[0]) def _update_search_number_field(self, search_vectors, val): try: search_vectors.append("'{}':1".format(int(val))) except (ValueError, TypeError): pass def update_search_vector(self, save=True, exclude_parent=False): """ Update the search vector :param save: True if you want to save the object immediately :return: True if modified """ if getattr(self, "_search_vector_updated", None): return self._search_vector_updated = True if not hasattr(self, "search_vector"): return if not self.pk: # logger.warning("Cannot update search vector before save or " # "after deletion.") return if ( not self.BASE_SEARCH_VECTORS and not self.M2M_SEARCH_VECTORS and not self.INT_SEARCH_VECTORS and not self.PROPERTY_SEARCH_VECTORS and not self.PARENT_SEARCH_VECTORS ): logger.warning("No search_vectors defined for {}".format(self.__class__)) return if getattr(self, "_search_updated", None): if not save: return self.search_vector return JsonDataField = apps.get_model("ishtar_common", "JsonDataField") self._search_updated = True old_search = "" if self.search_vector: old_search = self.search_vector[:] search_vectors = [] base_q = self.__class__.objects.filter(pk=self.pk) # many to many have to be queried one by one otherwise only one is fetch for m2m_search_vector in self.M2M_SEARCH_VECTORS: key_splitted = m2m_search_vector.key.split("__") key = key_splitted[0] rel_key = getattr(self, key) if len(key_splitted) == 2: attr = key_splitted[1] if m2m_search_vector.language == "raw": values = list(rel_key.values_list(attr, flat=True)) else: values = list( rel_key.annotate( search=SearchVector( attr, config=m2m_search_vector.language )).values_list("search", flat=True) ) for value in values: search_vectors += self._update_raw_search_field(value) else: for item in rel_key.values("pk").all(): query_dct = {key + "__pk": item["pk"]} q = copy.copy(base_q).filter(**query_dct) if m2m_search_vector.language == "raw": q = q.values_list(m2m_search_vector.key, flat=True) search_vectors += self._update_raw_search_field(q.all()[0]) continue query_dct = {key + "__pk": item["pk"]} q = copy.copy(base_q).filter(**query_dct) q = q.annotate( search=SearchVector( m2m_search_vector.key, config=m2m_search_vector.language ) ).values("search") search_vectors.append(q.all()[0]["search"]) # int/float are not well managed by the SearchVector for int_search_vector in self.INT_SEARCH_VECTORS: q = base_q.values(int_search_vector.key) for val in int_search_vector.format(q.all()[0][int_search_vector.key]): self._update_search_number_field(search_vectors, val) if not exclude_parent: # copy parent vector fields for PARENT_SEARCH_VECTOR in self.PARENT_SEARCH_VECTORS: parent = getattr(self, PARENT_SEARCH_VECTOR) if hasattr(parent, "all"): # m2m for p in parent.all(): search_vectors.append(p.search_vector) elif parent: search_vectors.append(parent.search_vector) for PARENT_ONLY_SEARCH_VECTOR in self.PARENT_ONLY_SEARCH_VECTORS: parent = getattr(self, PARENT_ONLY_SEARCH_VECTOR) if hasattr(parent, "all"): # m2m for p in parent.all(): search_vectors.append( p.update_search_vector(save=False, exclude_parent=True) ) elif parent: search_vectors.append( parent.update_search_vector(save=False, exclude_parent=True) ) if self.BASE_SEARCH_VECTORS: # query "simple" fields q = base_q.values(*[sv.key for sv in self.BASE_SEARCH_VECTORS]) res = q.all()[0] for base_search_vector in self.BASE_SEARCH_VECTORS: data = res[base_search_vector.key] data = unidecode(str(data)) self._update_search_field(base_search_vector, search_vectors, data) if self.PROPERTY_SEARCH_VECTORS: for property_search_vector in self.PROPERTY_SEARCH_VECTORS: data = getattr(self, property_search_vector.key) if callable(data): data = data() if not data: continue data = str(data) self._update_search_field(property_search_vector, search_vectors, data) if hasattr(self, "data") and self.data: content_type = ContentType.objects.get_for_model(self) for json_field in JsonDataField.objects.filter( content_type=content_type, search_index=True ).all(): data = copy.deepcopy(self.data) no_data = False for key in json_field.key.split("__"): if key not in data: no_data = True break data = data[key] if no_data or not data: continue if json_field.value_type == "B": if data is True: data = json_field.name else: continue elif json_field.value_type in ("I", "F"): self._update_search_number_field(search_vectors, data) continue elif json_field.value_type == "D": # only index year if hasattr(data, "year"): self._update_search_number_field(search_vectors, data.year) else: y = None for d in data.split("-"): if len(d) == 4: # should be the year try: y = int(d) except ValueError: y = None if y: self._update_search_number_field(search_vectors, y) continue datas = [data] if json_field.value_type == "MC": datas = data for d in datas: for lang in ("simple", settings.ISHTAR_SEARCH_LANGUAGE): with connection.cursor() as cursor: cursor.execute("SELECT to_tsvector(%s, %s)", [lang, d]) row = cursor.fetchone() search_vectors.append(row[0]) # TODO - performance: could be very slow -> cf. DGM CD17 new_search_vector = merge_tsvectors(search_vectors) changed = old_search != new_search_vector self.search_vector = new_search_vector if save and changed: self.__class__.objects.filter(pk=self.pk).update( search_vector=new_search_vector ) elif not save: return new_search_vector return changed class Imported(models.Model): imports = models.ManyToManyField( Import, blank=True, related_name="imported_%(app_label)s_%(class)s", verbose_name=_("Created by imports") ) timestamp_geo = models.IntegerField(_("Timestamp geo"), null=True, blank=True) timestamp_label = models.IntegerField(_("Timestamp label"), null=True, blank=True) imports_updated = models.ManyToManyField( Import, blank=True, related_name="import_updated_%(app_label)s_%(class)s", verbose_name=_("Updated by imports") ) ALT_NAMES = { "imports": SearchAltName( pgettext_lazy("key for text search", "imports"), "imports__name__iexact", ), } class Meta: abstract = True def _get_imports(self, user, key, limit): ishtaruser = getattr(user, "ishtaruser", None) if not ishtaruser: return [] q = getattr(self, key) if ishtaruser.has_permission("ishtaradmin") or \ ishtaruser.has_permission("ishtar_common.view_import"): q = q.all() elif ishtaruser.has_permission("ishtar_common.view_own_import"): q = q.filter(Q(user=user.ishtaruser) | Q( importer_type__users__pk=user.ishtaruser.pk) ) else: return [] q = q.order_by("-id") has_limit = False if q.count() > limit: has_limit = True q = q[:limit] lst = list(q.all()) new_lst = [] for imprt in lst: if imprt.group: new_lst.append(imprt.group) else: new_lst.append(imprt) if has_limit: new_lst.append("...") return new_lst def get_imports(self, user, limit=None): return self._get_imports(user, "imports", limit) def get_imports_updated(self, user, limit=None): return self._get_imports(user, "imports_updated", limit) class JsonData(models.Model, CachedGen): data = JSONField(default=dict, blank=True) class Meta: abstract = True def pre_save(self): if not self.data: self.data = {} @property def json_sections(self): sections = [] try: content_type = ContentType.objects.get_for_model(self) except ContentType.DoesNotExist: return sections JsonDataField = apps.get_model("ishtar_common", "JsonDataField") fields = list( JsonDataField.objects.filter( content_type=content_type, display=True, section__isnull=True ).all() ) # no section fields fields += list( JsonDataField.objects.filter( content_type=content_type, display=True, section__isnull=False ) .order_by("section__order", "order") .all() ) for field in fields: value = None data = self.data.copy() for key in field.key.split("__"): if key in data: value = copy.copy(data[key]) data = data[key] else: value = None break if value is None: continue if type(value) in (list, tuple): value = " ; ".join([field.format_value(v) for v in value if v]) if not value: continue else: value = field.format_value(value) section_name = field.section.name if field.section else None if not sections or section_name != sections[-1][0]: # if section name is identical it is the same sections.append((section_name, [])) sections[-1][1].append((field.name, value)) return sections @classmethod def refresh_cache(cls): __, refreshed = get_cache(cls, ["cache_refreshed"]) if refreshed and time.time() - refreshed < 1: return cache_ckey, current_keys = get_cache(cls, ["_current_keys"]) if not current_keys: return for keys in current_keys: if isinstance(keys, (list, tuple)) and keys[0] == "__get_dynamic_choices": cls._get_dynamic_choices(keys[1], force=True) @classmethod def _get_dynamic_choices(cls, key, force=False): """ Get choice from existing values :param key: data key :param force: if set to True do not use cache :return: tuple of choices (id, value) """ cache_key, value = get_cache(cls, ["__get_dynamic_choices", key]) if not force and value: return value choices = set() splitted_key = key[len("data__"):].split("__") q = cls.objects.filter(data__has_key=key[len("data__"):]).values_list( "id", "data" ) multi = False for pk, value in q.all(): for k in splitted_key: value = value[k] if isinstance(value, list): # fix bad recording if len(value) == 1 and isinstance(value[0], str) \ and value[0].startswith("['") \ and value[0].endswith("']"): value = value[0][2:-2] value = value.split("', '") obj = cls.objects.get(pk=pk) data = copy.deepcopy(obj.data) if len(splitted_key) == 1: data[splitted_key[0]] = value elif len(splitted_key) == 2: data[splitted_key[0]][splitted_key[1]] = value elif len(splitted_key) == 3: data[splitted_key[0]][splitted_key[1]][splitted_key[2]] = value else: print("To many level in json field - fix not managed") obj.data = data obj.no_post_process() obj.save() multi = True for v in value: if v: choices.add(v) else: choices.add(value) c = [] if not multi: c = [("", "")] c += [(v, v) for v in sorted(list(choices)) if v] django_cache.set(cache_key, c, settings.CACHE_SMALLTIMEOUT) return c class FixAssociated: ASSOCIATED = {} def fix_associated(self): for key in self.ASSOCIATED: item = getattr(self, key) if not item: continue dct = self.ASSOCIATED[key] for dct_key in dct: subkey, ctype = dct_key expected_values = dct[dct_key] if not isinstance(expected_values, (list, tuple)): expected_values = [expected_values] if hasattr(ctype, "txt_idx"): try: expected_values = [ ctype.objects.get(txt_idx=v) for v in expected_values ] except ctype.DoesNotExist: # type not yet initialized return current_vals = getattr(item, subkey) is_many = False if hasattr(current_vals, "all"): is_many = True current_vals = current_vals.all() else: current_vals = [current_vals] is_ok = False for current_val in current_vals: if current_val in expected_values: is_ok = True break if is_ok: continue # the first value is used new_value = expected_values[0] if is_many: getattr(item, subkey).add(new_value) else: setattr(item, subkey, new_value) class HistoricalRecords(BaseHistoricalRecords): def get_extra_fields(self, model, fields): def get_history_m2m(attr): def _get_history_m2m(self): q = model.objects.filter(pk=getattr(self, model._meta.pk.attname)) if q.count(): item = q.all()[0] if attr in item.history_m2m: return item.history_m2m[attr] return _get_history_m2m def get_serialize_call(attr): def _get_serialize_call(self): q = model.objects.filter(pk=getattr(self, model._meta.pk.attname)) if q.count(): return getattr(q.all()[0], attr)() return _get_serialize_call def get_serialize_properties(attr): def _get_serialize_properties(self): q = model.objects.filter(pk=getattr(self, model._meta.pk.attname)) if q.count(): return getattr(q.all()[0], attr) return _get_serialize_properties extra_fields = super().get_extra_fields(model, fields) # initialize default empty fields fields = [f.name for f in model._meta.fields] lst = ["documents"] for key in lst: extra_fields[key] = "" for k in getattr(model, "SERIALIZE_PROPERTIES", []): if k not in fields: extra_fields[k] = get_serialize_properties(k) for k in getattr(model, "SERIALIZE_CALL", []): if k not in fields: extra_fields[k] = get_serialize_call(k) for k in getattr(model, "HISTORICAL_M2M", []): if k not in fields: extra_fields[k] = get_history_m2m(k) return extra_fields def _save_historic( self, manager, instance, history_date, history_type, history_user, history_change_reason, using, attrs, ): history_instance = manager.model( history_date=history_date, history_type=history_type, history_user=history_user, history_change_reason=history_change_reason, **attrs, ) pre_create_historical_record.send( sender=manager.model, instance=instance, history_date=history_date, history_user=history_user, history_change_reason=history_change_reason, history_instance=history_instance, using=using, ) history_instance.save(using=using) post_create_historical_record.send( sender=manager.model, instance=instance, history_instance=history_instance, history_date=history_date, history_user=history_user, history_change_reason=history_change_reason, using=using, ) def create_historical_record(self, instance, history_type, using=None): try: history_modifier = getattr(instance, "history_modifier", None) except User.DoesNotExist: # on batch removing of users, user could have disappeared return if not history_modifier: return history_date = getattr(instance, "_history_date", timezone.now()) history_change_reason = getattr(instance, "changeReason", None) force = getattr(instance, "_force_history", False) manager = getattr(instance, self.manager_name) attrs = {} for field in instance._meta.fields: attrs[field.attname] = getattr(instance, field.attname) q_history = instance.history.filter( history_modifier_id=history_modifier.pk ).order_by("-history_date", "-history_id") # instance.skip_history_when_saving = True if not q_history.count(): if force: delattr(instance, "_force_history") self._save_historic( manager, instance, history_date, history_type, history_modifier, history_change_reason, using, attrs, ) return old_instance = q_history.all()[0] # multiple saving by the same user in a very short time are generaly # caused by post_save signals it is not relevant to keep them min_history_date = timezone.now() - datetime.timedelta(seconds=5) q = q_history.filter( history_date__isnull=False, history_date__gt=min_history_date ).order_by("-history_date", "-history_id") if not force and q.count(): return if force: delattr(instance, "_force_history") # record a new version only if data have been changed for field in instance._meta.fields: if getattr(old_instance, field.attname) != attrs[field.attname]: self._save_historic( manager, instance, history_date, history_type, history_modifier, history_change_reason, using, attrs, ) return class BaseHistorizedItem( StatisticItem, TemplateItem, FullSearch, Imported, JsonData, FixAssociated ): """ Historized item with external ID management. All historized items are searchable and have a data json field. Historized items can be "locked" for edition. Historized items can have associated ishtar user for permission management """ IS_BASKET = False EXTERNAL_ID_KEY = "" EXTERNAL_ID_DEPENDENCIES = [] HISTORICAL_M2M = [] history_modifier = models.ForeignKey( User, related_name="+", on_delete=models.SET_NULL, verbose_name=_("Last editor"), blank=True, null=True, ) history_creator = models.ForeignKey( User, related_name="+", on_delete=models.SET_NULL, verbose_name=_("Creator"), blank=True, null=True, ) last_modified = models.DateTimeField(blank=True, default=timezone.now) created = models.DateTimeField(blank=True, default=timezone.now) history_m2m = JSONField(default=dict, blank=True) need_update = models.BooleanField(verbose_name=_("Need update"), default=False) locked = models.BooleanField( verbose_name=_("Item locked for edition"), default=False ) lock_user = models.ForeignKey( User, related_name="+", on_delete=models.SET_NULL, verbose_name=_("Locked by"), blank=True, null=True, ) ishtar_users = models.ManyToManyField( "ishtar_common.IshtarUser", blank=True, related_name='%(class)s_associated' ) class Meta: abstract = True DATED_FIELDS = [ "created", "last_modified", ] BOOL_FIELDS = ["locked"] ALT_NAMES = { "history_creator": SearchAltName( pgettext_lazy("key for text search", "created-by"), "history_creator__ishtaruser__person__cached_label__iexact", ), "history_modifier": SearchAltName( pgettext_lazy("key for text search", "modified-by"), "history_modifier__ishtaruser__person__cached_label__iexact", ), "created": SearchAltName( pgettext_lazy("key for text search", "created"), "created", ), "modified": SearchAltName( pgettext_lazy("key for text search", "modified"), "last_modified", ), "locked": SearchAltName( pgettext_lazy("key for text search", "locked"), "locked" ), "ishtar_users": SearchAltName( pgettext_lazy("key for text search", "user-attached"), "ishtar_users__person__cached_label__iexact" ) } @classmethod def get_verbose_name(cls): return cls._meta.verbose_name def is_locked(self, user=None): if not user: return self.locked return self.locked and (not self.lock_user or self.lock_user != user) def merge(self, item, keep_old=False): merge_model_objects(self, item, keep_old=keep_old) def public_representation(self): return {} def duplicate(self, user=None, data=None): return duplicate_item(self, user, data) def update_external_id(self, save=False, no_set=False): if not hasattr(self, "external_id"): return if self.external_id and not getattr(self, "auto_external_id", False): return external_id_key = self.EXTERNAL_ID_KEY or ( hasattr(self, "SLUG") and (self.SLUG + "_external_id") ) if not external_id_key: return external_id = get_generated_id(external_id_key, self) if external_id == self.external_id: return if no_set: return external_id try: self.auto_external_id = True except AttributeError: pass try: self.external_id = external_id except AttributeError: return self._cached_label_checked = False if save: if self.pk: self.__class__.objects.filter(pk=self.pk).update( auto_external_id=True, external_id=external_id ) else: self.skip_history_when_saving = True self.save() return external_id def get_last_history_date(self): q = self.history.values("history_date").order_by("-history_date") if not q.count(): return return q.all()[0]["history_date"] def get_previous(self, step=None, date=None, strict=False): """ Get a "step" previous state of the item """ if not step and not date: raise AttributeError("Need to provide step or date") historized = self.history.all() item = None if step: if len(historized) <= step: # silently return the last step if too far in the history item = historized[len(historized) - 1] else: item = historized[step] else: date_match = False for step, item in enumerate(historized): if item.history_date == timezone.make_aware(date, item.history_date.tzinfo): date_match = True break # ended with no match if not date_match: return item._step = step if len(historized) != (step + 1): item._previous = historized[step + 1].history_date else: item._previous = None if step > 0: item._next = historized[step - 1].history_date else: item._next = None item.history_date = historized[step].history_date model = self.__class__ for k in get_all_field_names(model): field = model._meta.get_field(k) if hasattr(field, "rel") and field.rel: if not hasattr(item, k + "_id"): setattr(item, k, getattr(self, k)) continue val = getattr(item, k + "_id") if not val: setattr(item, k, None) continue try: val = field.remote_field.model.objects.get(pk=val) setattr(item, k, val) except ObjectDoesNotExist: if strict: raise HistoryError( "The class %s has no pk %d" % (str(field.remote_field.model), val) ) setattr(item, k, None) item.pk = self.pk return item @property def last_edition_date(self): try: return self.history.order_by("-history_date").all()[0].history_date except (AttributeError, IndexError): return @property def history_creation_date(self): try: return self.history.order_by("history_date").all()[0].history_date except (AttributeError, IndexError): return def rollback(self, date): """ Rollback to a previous state """ to_del, new_item = [], None for item in self.history.all(): if item.history_date == date: new_item = item break to_del.append(item) if not new_item: raise HistoryError("The date to rollback to doesn't exist.") try: field_keys = [f.name for f in self._meta.fields] for k in field_keys: if k != "id" and hasattr(self, k): if not hasattr(new_item, k): k = k + "_id" setattr(self, k, getattr(new_item, k)) try: self.history_modifier = User.objects.get( pk=new_item.history_modifier_id ) except User.ObjectDoesNotExist: pass self.save() saved_m2m = new_item.history_m2m.copy() for hist_key in self.HISTORICAL_M2M: # after each association m2m is rewrite - force the original # to be reset new_item.history_m2m = saved_m2m values = new_item.m2m_listing(hist_key, create=True) or [] hist_field = getattr(self, hist_key) hist_field.clear() for val in values: hist_field.add(val) # force label regeneration self._cached_label_checked = False self.save() except ObjectDoesNotExist: raise HistoryError("The rollback has failed.") # clean the obsolete history for historized_item in to_del: historized_item.delete() def m2m_listing(self, key): return getattr(self, key).all() def values(self): values = {} for f in self._meta.fields: k = f.name if k != "id": values[k] = getattr(self, k) return values @property def associated_filename(self): if [ True for attr in ( "get_town_label", "get_department", "reference", "short_class_name", ) if not hasattr(self, attr) ]: return "" items = [ slugify(self.get_department()), slugify(self.get_town_label()).upper(), slugify(self.short_class_name), slugify(self.reference), slugify(self.name or "").replace("-", "_").capitalize(), ] last_edition_date = self.last_edition_date if last_edition_date: items.append(last_edition_date.strftime("%Y%m%d")) else: items.append("00000000") return "-".join([str(item) for item in items]) def save(self, *args, **kwargs): created = not self.pk if (not getattr(self, "skip_history_when_saving", False) and not getattr(self, "_no_last_modified_update", False)) \ or not self.last_modified: self.last_modified = timezone.now() if not getattr(self, "skip_history_when_saving", False): if not hasattr(self, "history_modifier"): raise NotImplementedError("Should have a history_modifier field.") if created: self.history_creator = self.history_modifier # external ID can have related item not available before save external_id_updated = ( kwargs.pop("external_id_updated") if "external_id_updated" in kwargs else False ) if not created and not external_id_updated: self.update_external_id() super(BaseHistorizedItem, self).save(*args, **kwargs) if created and self.update_external_id(): # force resave for external ID creation self.skip_history_when_saving = True self._updated_id = True return self.save(external_id_updated=True) for dep in self.EXTERNAL_ID_DEPENDENCIES: for obj in getattr(self, dep).all(): obj.update_external_id(save=True) self.fix_associated() return True class LightHistorizedItem(BaseHistorizedItem): history_date = models.DateTimeField(default=timezone.now) class Meta: abstract = True def save(self, *args, **kwargs): super(LightHistorizedItem, self).save(*args, **kwargs) return self class DocumentItem: ALT_NAMES = { "documents__image__isnull": SearchAltName( pgettext_lazy("key for text search", "has-image"), "documents__image__isnull", ), "documents__associated_url__isnull": SearchAltName( pgettext_lazy("key for text search", "has-url"), "documents__associated_url__isnull", ), "documents__associated_file__isnull": SearchAltName( pgettext_lazy("key for text search", "has-attached-file"), "documents__associated_file__isnull", ), "documents__source_type": SearchAltName( pgettext_lazy("key for text search", "document-type"), "documents__source_type__label__iexact", ), } @classmethod def get_label_for_model_plural(cls): return cls._meta.verbose_name_plural def documents_list(self) -> list: Document = apps.get_model("ishtar_common", "Document") return self.get_associated_main_item_list("documents", Document) def public_representation(self): images = [] if getattr(self, "main_image", None): images.append(self.main_image.public_representation()) images += [ image.public_representation() for image in self.images_without_main_image.all() ] return {"images": images} @property def images(self): if not hasattr(self, "documents"): Document = apps.get_model("ishtar_common", "Document") return Document.objects.none() return ( self.documents.filter(image__isnull=False).exclude(image="").order_by("pk") ) @property def images_number(self): return self.images.count() @property def images_without_main_image(self): if not hasattr(self, "main_image") or not hasattr(self, "documents"): return self.images if not self.main_image: return ( self.documents.filter(image__isnull=False) .exclude(image="") .order_by("pk") ) return ( self.documents.filter(image__isnull=False) .exclude(image="") .exclude(pk=self.main_image.pk) .order_by("pk") ) @property def pdf_attached(self): for document in self.documents.filter( Q(associated_file__isnull=False) | Q(source__associated_file__isnull=False) ).all(): return document.pdf_attached def get_extra_actions(self, request): """ For sheet template: return "Add document / image" action """ # url, base_text, icon, extra_text, extra css class, is a quick action try: actions = super(DocumentItem, self).get_extra_actions(request) except AttributeError: actions = [] if not hasattr(self, "SLUG") or not hasattr(self, "can_do"): return actions if not hasattr(self, "can_do"): print(f"**WARNING** can_do not implemented for {self.__class__}") return actions can_add_doc = self.can_do(request, "ishtar_common.add_document") if can_add_doc and ( not hasattr(self, "is_locked") or not self.is_locked(request.user) ): actions += [ ( reverse("create-document") + "?{}={}".format(self.SLUG, self.pk), _("Add document/image"), "fa fa-plus", _("doc./image"), "", False, ) ] return actions def clean_duplicate_association(document, related_item, action): profile = get_current_profile() if not profile.clean_redundant_document_association or action != "post_add": return class_name = related_item.__class__.__name__ if class_name not in ("Find", "ContextRecord", "Operation"): return if class_name == "Find": for cr in document.context_records.filter( base_finds__find__pk=related_item.pk ).all(): document.context_records.remove(cr) for ope in document.operations.filter( context_record__base_finds__find__pk=related_item.pk ).all(): document.operations.remove(ope) return if class_name == "ContextRecord": for ope in document.operations.filter(context_record__pk=related_item.pk).all(): document.operations.remove(ope) if document.finds.filter(base_finds__context_record=related_item.pk).count(): document.context_records.remove(related_item) return if class_name == "Operation": if document.context_records.filter(operation=related_item.pk).count(): document.operations.remove(related_item) return if document.finds.filter( base_finds__context_record__operation=related_item.pk ).count(): document.operations.remove(related_item) return def document_attached_changed(sender, **kwargs): # associate a default main image instance = kwargs.get("instance", None) model = kwargs.get("model", None) pk_set = kwargs.get("pk_set", None) if not instance or not model: return if hasattr(instance, "documents"): items = [instance] else: if not pk_set: return try: items = [model.objects.get(pk=pk) for pk in pk_set] except model.DoesNotExist: return for item in items: clean_duplicate_association(instance, item, kwargs.get("action", None)) for doc in item.documents.all(): doc.regenerate_all_ids() q = item.documents.filter(image__isnull=False).exclude(image="") if item.main_image: if q.filter(pk=item.main_image.pk).count(): return # the association has disappear not the main image anymore item.main_image = None item.skip_history_when_saving = True item.save() if not q.count(): return # by default get the lowest pk item.main_image = q.order_by("pk").all()[0] item.skip_history_when_saving = True item.save() class NumberManager(models.Manager): def get_by_natural_key(self, number): return self.get(number=number) class State(models.Model): label = models.CharField(_("Label"), max_length=30) number = models.CharField(_("Number"), unique=True, max_length=10) objects = NumberManager() class Meta: verbose_name = _("State") ordering = ["number"] def __str__(self): return self.label def natural_key(self): return (self.number,) class Department(models.Model): label = models.CharField(_("Label"), max_length=30) number = models.CharField(_("Number"), unique=True, max_length=3) state = models.ForeignKey( "State", verbose_name=_("State"), blank=True, null=True, on_delete=models.SET_NULL, ) objects = NumberManager() class Meta: verbose_name = _("Department") verbose_name_plural = _("Departments") ordering = ["number"] ADMIN_SECTION = _("Geography") def __str__(self): return self.label def natural_key(self): return (self.number,) def history_compress(self): return self.number @classmethod def history_decompress(cls, full_value, create=False): if not full_value: return [] res = [] for value in full_value: try: res.append(cls.objects.get(number=value)) except cls.DoesNotExist: continue return res class Arrondissement(models.Model): name = models.CharField("Nom", max_length=30) department = models.ForeignKey( Department, verbose_name="Département", on_delete=models.CASCADE ) def __str__(self): return settings.JOINT.join((self.name, str(self.department))) class Canton(models.Model): name = models.CharField("Nom", max_length=30) arrondissement = models.ForeignKey( Arrondissement, verbose_name="Arrondissement", on_delete=models.CASCADE ) def __str__(self): return settings.JOINT.join((self.name, str(self.arrondissement))) class SpatialReferenceSystem(GeneralType): order = models.IntegerField(_("Order"), default=10) auth_name = models.CharField(_("Authority name"), default="EPSG", max_length=256) srid = models.IntegerField(_("Authority SRID")) round = models.IntegerField(_("Number of decimal places"), default=5) round_z = models.IntegerField(_("Number of decimal places for Z"), default=3) class Meta: verbose_name = _("Geographic - Spatial reference system") verbose_name_plural = _("Geographic - Spatial reference systems") ordering = ( "order", "label", ) ADMIN_SECTION = _("Geography") @classmethod def get_documentation_string(cls): """ Used for automatic documentation generation """ doc = super(SpatialReferenceSystem, cls).get_documentation_string() doc += ", **srid** {}, **auth_name** {}".format( _("Authority SRID"), _("Authority name") ) return doc post_save.connect(post_save_cache, sender=SpatialReferenceSystem) post_delete.connect(post_save_cache, sender=SpatialReferenceSystem) class GeoOriginType(HierarchicalType): """ ex: topographical surveys, georeferencing, ... """ order = models.IntegerField(_("Order"), default=10) class Meta: verbose_name = _("Geographic - Origin type") verbose_name_plural = _("Geographic - Origin types") ordering = ( "order", "label", ) ADMIN_SECTION = _("Geography") class GeoDataType(HierarchicalType): """ ex: outline, z-sup, ... """ order = models.IntegerField(_("Order"), default=10) class Meta: verbose_name = _("Geographic - Data type") verbose_name_plural = _("Geographic - Data types") ordering = ( "order", "label", ) ADMIN_SECTION = _("Geography") class GeoProviderType(HierarchicalType): """ ex: GeoNames, IGN, ... """ order = models.IntegerField(_("Order"), default=10) class Meta: verbose_name = _("Geographic - Provider type") verbose_name_plural = _("Geographic - Provider types") ordering = ( "order", "label", ) ADMIN_SECTION = _("Geography") class GeoBufferType(GeneralType): order = models.IntegerField(_("Order"), default=10) class Meta: verbose_name = _("Geographic - Buffer type") verbose_name_plural = _("Geographic - Buffer types") ordering = ( "order", "label", ) ADMIN_SECTION = _("Geography") GEOJSON_POINT_TPL = { "type": "FeatureCollection", "features": [ { "type": "Feature", "geometry": { "type": "Point", "coordinates": [] }, "properties": { } } ] } GEOMETRY_TYPE_LBL = { "POINT": _("Point"), "MULTILINE": _("Line(s)"), "MULTIPOINTS": _("Point(s)"), "MULTIPOLYGON": _("Polygon(s)"), } GEOTYPE_TO_GEOVECTOR = { # key: (geom attr, need list convert) "Point": ("point_2d", False), "LineString": ("multi_line", True), "Polygon": ("multi_polygon", True), "MultiPoint": ("multi_points", False), "MultiLineString": ("multi_line", False), "MultiPolygon": ("multi_polygon", False), } class GeoVectorData(Imported, OwnPerms): SLUG = "geovectordata" RELATED_MODELS = [ "related_items_ishtar_common_town", "related_items_archaeological_operations_operation", "related_items_archaeological_operations_archaeologicalsite", "related_items_archaeological_context_records_contextrecord", "related_items_archaeological_finds_basefind", "related_items_archaeological_warehouse_warehouse", "related_items_archaeological_warehouse_container", ] UPPER_PERMISSIONS = [ (("archaeological_operations", "operation"), "related_items_archaeological_operations_operation__pk"), (("archaeological_operations", "archaeologicalsite"), "related_items_archaeological_operations_archaeologicalsite__pk"), (("archaeological_context_records", "contextrecord"), "related_items_archaeological_context_records_contextrecord__pk"), (("archaeological_finds", "find"), "related_items_archaeological_finds_basefind__find__pk"), (("archaeological_warehouse", "warehouse"), "related_items_archaeological_warehouse_warehouse__pk"), (("archaeological_warehouse", "container"), "related_items_archaeological_warehouse_container__pk"), ] buffer = models.FloatField( _("Buffer"), blank=True, null=True ) buffer_type = models.ForeignKey(GeoBufferType, blank=True, null=True, on_delete=models.CASCADE) need_update = models.BooleanField(_("Need update"), default=False) name = models.TextField(_("Name"), default="-") source_content_type = models.ForeignKey( ContentType, related_name="content_type_geovectordata", on_delete=models.CASCADE, blank=True, null=True ) source_id = models.PositiveIntegerField(blank=True, null=True) source = GenericForeignKey("source_content_type", "source_id") import_key = models.TextField(_("Import key"), blank=True, null=True, help_text=_("Use this for update imports")) origin = models.ForeignKey( GeoOriginType, blank=True, null=True, on_delete=models.PROTECT, verbose_name=_("Origin"), help_text=_("For instance: topographical survey, georeferencing, ..."), ) data_type = models.ForeignKey( GeoDataType, blank=True, null=True, on_delete=models.PROTECT, verbose_name=_("Data type"), help_text=_("For instance: outline, z-sup, ..."), ) provider = models.ForeignKey( GeoProviderType, blank=True, null=True, on_delete=models.PROTECT, verbose_name=_("Provider"), help_text=_("Data provider"), ) acquisition_date = models.DateField(blank=True, null=True) comment = models.TextField(_("Comment"), default="", blank=True) x = models.FloatField(_("X"), blank=True, null=True, help_text=_("User input")) y = models.FloatField(_("Y"), blank=True, null=True, help_text=_("User input")) z = models.FloatField(_("Z"), blank=True, null=True, help_text=_("User input")) # x == cached_x if user input else get it from other sources # cached is converted to the display SRID cached_x = models.FloatField(_("X (cached)"), blank=True, null=True) cached_y = models.FloatField(_("Y (cached)"), blank=True, null=True) cached_z = models.FloatField(_("Z (cached)"), blank=True, null=True) estimated_error_x = models.FloatField( _("Estimated error for X"), blank=True, null=True ) estimated_error_y = models.FloatField( _("Estimated error for Y"), blank=True, null=True ) estimated_error_z = models.FloatField( _("Estimated error for Z"), blank=True, null=True ) spatial_reference_system = models.ForeignKey( SpatialReferenceSystem, verbose_name=_("Spatial Reference System"), blank=True, null=True, on_delete=models.PROTECT, ) point_2d = models.PointField(_("Point (2D)"), blank=True, null=True) point_3d = models.PointField(_("Point (3D)"), blank=True, null=True, dim=3) multi_points = models.MultiPointField(_("Multi points"), blank=True, null=True) multi_line = models.MultiLineStringField(_("Multi lines"), blank=True, null=True) multi_polygon = models.MultiPolygonField(_("Multi polygons"), blank=True, null=True) class Meta: verbose_name = _("Geographic - Vector data") verbose_name_plural = _("Geographic - Vector data") unique_together = ("source_content_type", "source_id", "import_key") permissions = ( ("view_own_geovectordata", "Can view own Geographic - Vector data"), ("change_own_geovectordata", "Can change own Geographic - Vector data"), ("delete_own_geovectordata", "Can delete own Geographic - Vector data"), ) ADMIN_SECTION = _("Geography") def __str__(self): name = self.name if not name or name == "-": for related_model in self.RELATED_MODELS: q = getattr(self, related_model) cached_label_key = "cached_label" if getattr(q.model, "GEO_LABEL", None): cached_label_key = q.model.GEO_LABEL if q.count(): # arbitrary return the first item name = str(q.values_list(cached_label_key, flat=True).all()[0]) break if self.data_type: name += f" ({str(self.data_type).lower()})" return name def is_own(self, ishtaruser, alt_query_own=None): ct = self.source_content_type model = apps.get_model(ct.app_label, ct.model) if not hasattr(model, "_get_query_owns_dicts"): return False sub_q = model.get_query_owns(ishtaruser) if not sub_q: return False return self.source_id in list( model.objects.filter(sub_q).values_list("id", flat=True) ) SERIALIZE_EXCLUDE = [] def full_serialize(self, search_model, recursion=False, request=None): dct = {} fields = [ "id", "buffer", "buffer_type", "name", "origin", "data_type", "provider", "comment", "spatial_reference_system", "source", "estimated_error_x", "estimated_error_y", "estimated_error_z", ] for field_name in fields: value = getattr(self, field_name) dct[field_name] = str(value) if value else "" dct["geojson"] = self.geojson return dct @classmethod def get_query_owns(cls, ishtaruser): q = None for app_label, model_name in ( ("archaeological_operations", "Operation"), ("archaeological_operations", "ArchaeologicalSite"), ("archaeological_context_records", "ContextRecord"), ("archaeological_finds", "BaseFind"), ("archaeological_warehouse", "Warehouse"), ("archaeological_warehouse", "Container"), ): model = apps.get_model(app_label, model_name) sub_q = cls._construct_query_own( model, "", model._get_query_owns_dicts(ishtaruser) ) if not sub_q: continue q2 = Q( source_id__in=list( model.objects.filter(sub_q).values_list("id", flat=True) ), source_content_type__app_label=app_label, source_content_type__model=model_name.lower(), ) if not q: q = q2 else: q |= q2 return q @property def source_label(self): return str(self.source or "-") def display_coordinates_3d(self): return self.display_coordinates(dim=3) def display_coordinates(self, rounded=None, rounded_z=None, dim=2, srid=None, cache=True): spatial_reference_system = None if not srid: if self.spatial_reference_system and self.spatial_reference_system.srid: spatial_reference_system = self.spatial_reference_system else: profile = get_current_profile() if profile.display_srs and profile.display_srs.srid: spatial_reference_system = profile.display_srs if spatial_reference_system: srid = spatial_reference_system.srid if not srid: srid = 4326 if not spatial_reference_system: q = SpatialReferenceSystem.objects.filter(srid=srid) if q.count(): spatial_reference_system = q.all()[0] if not rounded: if spatial_reference_system: rounded = spatial_reference_system.round else: rounded = 5 if not rounded_z: if spatial_reference_system: rounded_z = spatial_reference_system.round_z else: rounded_z = 3 try: return self.get_coordinates(rounded=rounded, rounded_z=rounded_z, srid=srid, dim=dim, cache=cache) except GDALException: # bad conversion return def get_coordinates(self, rounded=5, rounded_z=3, srid: int = None, dim=2, cache=False): if dim not in (2, 3): raise ValueError(_("Only 2 or 3 dimensions")) if cache and srid == 4326: coordinates = [self.cached_x, self.cached_y] if dim == 3: coordinates.append(self.cached_z) else: if self.x or self.y: # user input if not srid or not self.spatial_reference_system or \ srid == self.spatial_reference_system.srid: coordinates = [self.x, self.y] if dim == 3: coordinates.append(self.z) if not self.spatial_reference_system: q = SpatialReferenceSystem.objects.filter(srid=4326) if q.count(): self.spatial_reference_system = q.all()[0] else: args = { "x": self.x, "y": self.y, "srid": self.spatial_reference_system.srid, } if dim == 3: args["z"] = self.z point = Point(**args).transform(srid, clone=True) coordinates = [point.x, point.y] if srid == 4326: coordinates = list(reversed(coordinates)) if dim == 3: coordinates.append(point.z) else: if self.point_2d: geom = self.point_2d elif self.point_3d: geom = self.point_3d elif self.multi_points: geom = self.multi_points.centroid elif self.multi_line: geom = self.multi_line.centroid elif self.multi_polygon: geom = self.multi_polygon.centroid else: if dim == 2: return [None, None] return [None, None, self.z or None] point = geom if not srid or srid != geom.srid: point = geom.transform(srid, clone=True) x, y = point.x, point.y else: x, y = point.x, point.y if dim == 2: coordinates = [x, y] else: coordinates = [x, y, point.z] if rounded is None: return coordinates if coordinates[0]: if rounded <= 0: coordinates[0] = int(coordinates[0]) coordinates[1] = int(coordinates[1]) else: coordinates[0] = round(coordinates[0] or 0, rounded) coordinates[1] = round(coordinates[1] or 0, rounded) if dim == 3 and rounded_z is not None and coordinates[2] is not None: if rounded_z <= 0: coordinates[2] = round(coordinates[2] or 0, rounded_z) else: coordinates[2] = round(coordinates[2] or 0, rounded_z) return coordinates def get_coordinates_from_polygon(self, rounded=5, srid: int = None): if self.multi_polygon: return self.convert_coordinates( self.multi_polygon.centroid, rounded=rounded, srid=srid ) def get_x(self, srid: int = None) -> float: coord = self.get_coordinates(srid) if coord: return coord[0] def get_y(self, srid: int = None) -> float: coord = self.get_coordinates(srid) if coord: return coord[1] def get_z(self, srid: int = None) -> float: coord = self.get_coordinates(srid, dim=3) if coord: return coord[2] @property def display_spatial_reference_system(self): if self.spatial_reference_system: return self.spatial_reference_system profile = get_current_profile() return profile.srs @classmethod def _get_geo_item_list(cls, q, current_geodata, url, precision, rounded): collection_id, items = [], [] q = q.values("main_geodata", "id", "cached_label") for item in q.distinct().all(): geodata_id = item["main_geodata"] if geodata_id not in current_geodata: collection_id.append(geodata_id) items.append((item["id"], item["cached_label"])) current_geodata.append(geodata_id) collection = [] for idx in range(len(collection_id)): geo = json.loads(GeoVectorData.objects.get(pk=collection_id[idx]).geojson) geo_type = geo.get("type", None) current_id, label = items[idx] url_geo = url.format(current_id) if geo_type == "FeatureCollection": for feat in geo["features"]: if "properties" in feat: feat["properties"]["url"] = url_geo feat["properties"]["name"] = label collection += geo["features"] elif geo_type: if "properties" in geo: geo["properties"]["url"] = url_geo geo["properties"]["name"] = label collection.append(geo) if not precision and rounded: precision = 6 # AFAC: r = re.compile(r"(\d+)\.(\d{6})(\d*)") new_collection = [] for feat in collection: geom_type = feat["geometry"].get("type", None) if geom_type == "Point": if precision is not None: feat["geometry"]["coordinates"] = [ round(feat["geometry"]["coordinates"][0], precision), round(feat["geometry"]["coordinates"][1], precision), ] if not (-90 <= feat["geometry"]["coordinates"][1] <= 90) or not ( -180 <= feat["geometry"]["coordinates"][0] <= 180): # probably a bad projection continue new_collection.append(feat) return new_collection def get_geo_items(self, rounded=5): dct = {"type": "Feature", "geometry": {}, "properties": {"label": str(self)}} if self.multi_polygon: list_coords = [] for polygon in self.multi_polygon: list_coords.append([]) for linear_ring in range(len(polygon)): list_coords[-1].append([]) for coords in polygon[linear_ring].coords: point_2d = Point( coords[0], coords[1], srid=self.multi_polygon.srid ) list_coords[-1][linear_ring].append( self.convert_coordinates(point_2d, rounded) ) dct["geometry"]["type"] = "MultiPolygon" dct["geometry"]["coordinates"] = list_coords elif self.multi_points: list_coords = [] for coords in self.multi_points: point_2d = Point( coords.x, coords.y, srid=self.multi_points.srid ) list_coords.append( self.convert_coordinates(point_2d, rounded) ) dct["geometry"]["type"] = "MultiPoint" dct["geometry"]["coordinates"] = list_coords elif self.multi_line: list_coords = [] for idx, line in enumerate(self.multi_line): if not idx: list_coords.append([]) for coords in line: point_2d = Point( coords.x, coords.y, srid=self.multi_line.srid ) list_coords[-1].append( self.convert_coordinates(point_2d, rounded) ) dct["geometry"]["type"] = "MultiLine" dct["geometry"]["coordinates"] = list_coords else: dct["geometry"]["type"] = "Point" coords = self.display_coordinates(srid=4326) if coords: dct["geometry"]["coordinates"] = coords else: return {} return dct def convert_coordinates(self, point_2d, rounded=5, srid=None): if not srid: profile = get_current_profile() if profile.display_srs and profile.display_srs.srid: srid = profile.display_srs.srid if not srid: srid = 4326 point = point_2d.transform(srid, clone=True) x, y = point.x, point.y if rounded: return [round(x, rounded), round(y, rounded)] return [x, y] def _geojson_serialize(self, geom_attr): if not hasattr(self, geom_attr): return "{}" geojson = serialize( "geojson", self.__class__.objects.filter(pk=self.pk), geometry_field=geom_attr, fields=("name",), ) geojson_dct = json.loads(geojson) return self._geojson_base_serialize(geojson_dct) def _geojson_base_serialize(self, geojson_dct): profile = get_current_profile() precision = profile.point_precision features = geojson_dct.pop("features") for idx in range(len(features)): feature = features[idx] lbl = self.name or "" feature["properties"]["name"] = lbl feature["properties"]["id"] = self.pk if precision is not None: geom_type = feature["geometry"].get("type", None) if geom_type == "Point": feature["geometry"]["coordinates"] = [ round(coord, precision) for coord in feature["geometry"]["coordinates"] ] if self.buffer: feature["properties"]["buffer"] = self.buffer geojson_dct["features"] = features try: geojson_dct["link_template"] = simple_link_to_window(self).replace( "999999", "" ) except NoReverseMatch: pass geojson = json.dumps(geojson_dct) return geojson @property def point_2d_geojson(self): return self._geojson_serialize("point_2d") @property def multi_polygon_geojson(self): return self._geojson_serialize("multi_polygon") @property def geometry_type(self): if self.x or self.y or self.z or self.point_2d or self.point_3d: return "POINT" if self.multi_line: return "MULTILINE" if self.multi_points: return "MULTIPOINTS" if self.multi_polygon: return "MULTIPOLYGON" return "" @property def geometry_type_label(self): return GEOMETRY_TYPE_LBL.get(self.geometry_type, "") @property def geojson(self): if self.x or self.y or self.z: geo = GEOJSON_POINT_TPL.copy() geo["features"][0]["geometry"]["coordinates"] = [ self.cached_x, self.cached_y ] return self._geojson_base_serialize(geo) if self.point_2d: return self._geojson_serialize("point_2d") if self.point_3d: return self._geojson_serialize("point_3d") if self.multi_line: return self._geojson_serialize("multi_line") if self.multi_points: return self._geojson_serialize("multi_points") if self.multi_polygon: return self._geojson_serialize("multi_polygon") return "{}" def update_from_geojson(self, geojson: str, save=False) -> bool: """ Update geometry from a geojson string :param geojson: geojson :param save: save the object if set to True :return: True if update """ if not geojson: # no data provided return False with fiona.open(geojson) as src: if len(src) != 1: # ambiguous -> need only one feature return False feature = src[0] geom = feature["geometry"] if geom["type"] not in GEOTYPE_TO_GEOVECTOR: # unknown geometry type return False feature_attr, need_list_convert = GEOTYPE_TO_GEOVECTOR[geom["type"]] if need_list_convert: geom["coordinates"] = [geom["coordinates"]] geom = json.dumps(geom) setattr(self, feature_attr, GEOSGeometry(geom)) if save: self.save() geo_attrs = ["point_2d", "point_3d", "x", "y", "z", "multi_line", "multi_points", "multi_polygon"] geo_attrs.pop(geo_attrs.index(feature_attr)) for geo_attr in geo_attrs: setattr(self, geo_attr, None) if save: self.save() return True @classmethod def migrate_srid(cls, new_srid): fields = ( "point_2d", "point_3d", "multi_points", "multi_line", "multi_polygon", ) with connection.cursor() as cursor: for name in fields: cursor.execute( "UPDATE ishtar_common_geovectordata SET %s=ST_SetSRID(%s, %s);", [name, name, new_srid] ) items = cls.objects.all() for item in items: for name in fields: getattr(item, name).transform(new_srid) item._no_geo_check = True item.save() post_save.connect(post_save_geodata, sender=GeoVectorData) def geodata_attached_post_add(model, instance, pk_set): item_pks = list(model.objects.filter(pk__in=pk_set).values_list("pk", flat=True)) if not item_pks: return q = model.objects.filter(pk__in=pk_set).filter(source_id__isnull=True) if q.count(): content_type = ContentType.objects.get( app_label=instance.__class__._meta.app_label, model=instance.__class__.__name__.lower() ) q.update(source_id=instance.pk, source_content_type=content_type) # use a cache to manage during geodata attach if not hasattr(instance, "_geodata"): instance._geodata = [] if not instance.main_geodata_id: instance.main_geodata_id = item_pks[0] instance.skip_history_when_saving = True instance._no_move = True if not hasattr(instance, "_geodata"): instance._geodata = [] instance._geodata += [pk for pk in item_pks if pk not in instance._geodata] instance.save() # for all sub item verify that the geo items are present for query in instance.geodata_child_item_queries(): child_model = query.model m2m_model = child_model.geodata.through m2m_key = f"{child_model._meta.model_name}_id" geoitems = {} for child_id in query.values_list("id", flat=True): for pk in item_pks: q = m2m_model.objects.filter(**{m2m_key: child_id, "geovectordata_id": pk}) if q.count(): continue if pk not in geoitems: geoitems[pk] = GeoVectorData.objects.get(pk=pk) child = child_model.objects.get(pk=child_id) child.geodata.add(geoitems[pk]) child._manage_main_geo() def geodata_attached_remove(model, instance, pk_set=None, clear=False): if clear: item_pks = getattr(instance, "_geodata_clear_item_pks", []) else: item_pks = list(model.objects.filter(pk__in=pk_set).values_list("pk", flat=True)) if not item_pks: return # use a cache to manage during geodata attach if instance.main_geodata_id in item_pks: instance.main_geodata_id = None instance.skip_history_when_saving = True instance._no_move = True instance.save() # for all sub item verify that the geo items are present for query in instance.geodata_child_item_queries(): child_model = query.model m2m_model = child_model.geodata.through m2m_key = f"{child_model._meta.model_name}_id" geoitems = {} for child_id in query.values_list("id", flat=True): for pk in item_pks: q = child_model.objects.filter(pk=child_id, main_geodata_id=pk) if q.count(): q.update(main_geodata_id=None) q = m2m_model.objects.filter(**{m2m_key: child_id, "geovectordata_id": pk}) if not q.count(): continue if pk not in geoitems: geoitems[pk] = GeoVectorData.objects.get(pk=pk) child = child_model.objects.get(pk=child_id) child.geodata.remove(geoitems[pk]) child._manage_main_geo() def geodata_attached_changed(sender, **kwargs): if getattr(settings, "ISHTAR_MIGRATE_V4", False): # disable on first migration return # manage main geoitem and cascade association profile = get_current_profile() if not profile.mapping: return instance = kwargs.get("instance", None) model = kwargs.get("model", None) pk_set = kwargs.get("pk_set", None) action = kwargs.get("action", None) if not instance or not model: return if model != GeoVectorData: # reverse post attributes instance_pk = instance.pk instance = model.objects.get(pk=list(pk_set)[0]) model = GeoVectorData pk_set = {instance_pk} if action == "post_add": geodata_attached_post_add(model, instance, pk_set) elif action == "post_remove": geodata_attached_remove(model, instance, pk_set) elif action == "pre_clear": instance._geodata_clear_item_pks = list( instance.geodata.values_list("id", flat=True) ) elif action == "post_clear": geodata_attached_remove(model, instance, clear=True) class GeographicItem(models.Model): main_geodata = models.ForeignKey( GeoVectorData, on_delete=models.SET_NULL, blank=True, null=True, related_name="main_related_items_%(app_label)s_%(class)s", verbose_name=_("Main geodata") ) geodata = models.ManyToManyField( GeoVectorData, blank=True, related_name="related_items_%(app_label)s_%(class)s", verbose_name=_("Geodata") ) ALT_NAMES = { "geodata__name": SearchAltName( pgettext_lazy("key for text search", "geo-name"), "geodata__name__iexact" ), "geodata__data_type": SearchAltName( pgettext_lazy("key for text search", "geo-type"), "geodata__data_type__label__iexact" ), "geodata__origin": SearchAltName( pgettext_lazy("key for text search", "geo-origin"), "geodata__origin__label__iexact" ), "geodata__provider": SearchAltName( pgettext_lazy("key for text search", "geo-provider"), "geodata__provider__label__iexact" ), "geodata__z": SearchAltName( pgettext_lazy("key for text search", "z"), "geodata__cached_z" ), "geodata__comment": SearchAltName( pgettext_lazy("key for text search", "geo-comment"), "geodata__comment__iexact" ), } @classmethod def ALT_NAMES_FOR_FIND(cls): dct = {} for k in cls.ALT_NAMES: sa = cls.ALT_NAMES[k] dct[k] = SearchAltName(sa.search_key, "base_finds__" + sa.search_query) return dct class Meta: abstract = True @classmethod def get_label_for_model_plural(cls): return cls._meta.verbose_name_plural def get_add_geo_action(self): return ( reverse("create-pre-geo", args=[ self.__class__._meta.app_label, self.__class__._meta.model_name, self.pk ]), _("Add geographic item"), "fa fa-plus", _("geo."), "", False ) def geodata_child_item_queries(self): """ :return: list of queries associated geographically with this item. When geographic data is add to this item all sub items get the geographic data. For instance an operation return the list of context records associated, so when you add the syrvey limit, it is associated to all context records of the operation. """ return [] def _manage_main_geo(self): if self.main_geodata and not self.geodata.filter( pk=self.main_geodata.pk).count(): try: with transaction.atomic(): self.geodata.add(self.main_geodata) except (OperationalError, IntegrityError): pass if self.main_geodata or not self.geodata.count(): return try: with transaction.atomic(): # arbitrary associate the first to geodata data_id = self.geodata.order_by("pk").values_list("pk", flat=True)[0] self.__class__.objects.filter(pk=self.pk).update(main_geodata_id=data_id) except (OperationalError, IntegrityError, IndexError): pass def save(self, *args, **kwargs): super().save(*args, **kwargs) self._manage_main_geo() @property def geodata_list(self): if getattr(self, "_geodata_list", None): return self._geodata_list lst = [] if self.main_geodata: lst.append(self.main_geodata) for geo in self.geodata.all(): if geo != self.main_geodata: lst.append(geo) self._geodata_list = lst return lst class PermissionQuery(models.Model): model = models.ForeignKey(ContentType, related_name="permissions", verbose_name=_("Model"), on_delete=models.CASCADE) name = models.CharField(_("Name"), max_length=200) slug = models.SlugField(_("Slug"), unique=True) request = models.TextField( _("Request"), blank=True, null=False, default="", help_text=_("Use 'text' request used in Ishtar search input") ) active = models.BooleanField(_("Active"), default=True) include_associated_items = models.BooleanField( _("Include associated items"), default=True, help_text=_("All items associated items match the request") ) include_upstream_items = models.BooleanField( _("Include upstream items"), default=False, help_text=_( "All items associated by upstream link math the request. " "For instance, match is done for all finds associated with own " "context records") ) limit_to_attached_areas = models.BooleanField( _("Limit request to attached areas"), default=False, help_text=_("Request is limited to areas attached to the ishtar user") ) ADMIN_SECTION = _("Account") class Meta: verbose_name = _("Permission query") verbose_name_plural = _("Permissions queries") def __str__(self): return f"{self.model} - {self.name}" def post_save_permission_query(sender, **kwargs): permission_query = kwargs["instance"] IshtarUser = apps.get_model("ishtar_common", "IshtarUser") IshtarUser.objects.filter( person__profiles__profile_type__permission_queries__pk=permission_query.pk ).update(need_permission_update=True) post_save.connect(post_save_permission_query, sender=PermissionQuery) class SerializeItem: SERIALIZE_EXCLUDE = ["search_vector"] SERIALIZE_PROPERTIES = [ "external_id", "multi_polygon_geojson", "point_2d_geojson", "images_number", "json_sections", ] SERIALIZE_CALL = {} SERIALIZE_DATES = [] SERIALIZATION_FILES = [] SERIALIZE_STRING = [] def full_serialize(self, search_model=None, recursion=False, request=None) -> dict: """ API serialization :return: data dict """ full_result = {} serialize_fields = [] exclude = [] if search_model: exclude = [sf.key for sf in search_model.sheet_filters.distinct().all()] no_geodata = False for prop in ("main_geodata", "geodata", "geodata_list"): if prop in self.SERIALIZE_EXCLUDE or prop in exclude: no_geodata = True break for field in self._meta.get_fields(): field_name = field.name if field_name in self.SERIALIZE_EXCLUDE: continue elif field_name in exclude: full_result[field_name] = "" elif field.many_to_one or field.one_to_one: try: value = getattr(self, field_name) except (MultipleObjectsReturned, ObjectDoesNotExist): value = None if value: if ( field_name not in self.SERIALIZE_STRING and hasattr(value, "full_serialize") and not recursion ): # print(field.name, self.__class__, self) if field_name == "main_geodata" and no_geodata: continue value = value.full_serialize(search_model, recursion=True, request=request) elif field_name in self.SERIALIZATION_FILES: try: value = {"url": request.build_absolute_uri(value.url)} except ValueError: value = None else: value = str(value) else: value = None full_result[field_name] = value if field_name == "main_geodata": full_result["geodata_list"] = [value] elif field.many_to_many: values = getattr(self, field_name) if values.count(): first_value = values.all()[0] if ( field_name not in self.SERIALIZE_STRING and hasattr(first_value, "full_serialize") and not recursion ): # print(field.name, self.__class__, self) values = [ v.full_serialize(search_model, recursion=True, request=request) for v in values.all() ] else: if first_value in self.SERIALIZATION_FILES: values = [] for v in values: try: values.append({"url": request.build_absolute_uri(v.url)}) except ValueError: pass else: values = [str(v) for v in values.all()] else: values = [] full_result[field_name] = values else: if field_name in self.SERIALIZATION_FILES: value = getattr(self, field_name) try: value = {"url": request.build_absolute_uri(value.url)} except ValueError: value = None full_result[field.name] = value else: serialize_fields.append(field_name) result = json.loads(serialize("json", [self], fields=serialize_fields)) full_result.update(result[0]["fields"]) for prop in self.SERIALIZE_PROPERTIES: if prop in self.SERIALIZE_EXCLUDE or prop in exclude: continue if hasattr(self, prop) and prop not in full_result: full_result[prop] = getattr(self, prop) if "point_2d_geojson" in full_result: full_result["point_2d"] = True if "multi_polygon_geojson" in full_result: full_result["multi_polygon"] = True for prop in self.SERIALIZE_DATES: if prop in self.SERIALIZE_EXCLUDE or prop in exclude: continue dt = getattr(self, prop) or "" if dt: dt = human_date(dt) full_result[prop] = dt for k in self.SERIALIZE_CALL: if k in self.SERIALIZE_EXCLUDE or k in exclude: continue full_result[k] = getattr(self, self.SERIALIZE_CALL[k])() full_result["SLUG"] = self.SLUG full_result["pk"] = f"external_{self.pk}" full_result["id"] = f"external_{self.id}" return full_result def get_associated_main_item_list(self, attr, model) -> list: items = getattr(self, attr) if not items.count(): return [] lst = [] table_cols = model.TABLE_COLS if callable(table_cols): table_cols = table_cols() for colname in table_cols: if colname in model.COL_LABELS: lst.append(str(model.COL_LABELS[colname])) else: lst.append(model._meta.get_field(colname).verbose_name) lst = [lst] for values in items.values_list(*table_cols): lst.append(["-" if v is None else v for v in values]) return lst class ShortMenuItem: """ Item available in the short menu """ UP_MODEL_QUERY = {} @classmethod def get_short_menu_class(cls, pk): return "" @property def short_class_name(self): return "" class MainItem(ShortMenuItem, SerializeItem, SheetItem): """ Item with quick actions available from tables Extra actions are available from sheets Manage cascade updated, has_changed and no_post_process """ QUICK_ACTIONS = [] SLUG = "" SHOW_URL = None DOWN_MODEL_UPDATE = [] INITIAL_VALUES = [] # list of field checkable if changed on save OLD_SHEET_EXPORT = True def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._initial_values = {} for field_name in self.INITIAL_VALUES: self._initial_values[field_name] = getattr(self, field_name) def has_changed(self): """ Check which field have a changed value between INITIAL_VALUES :return: list of changed fields """ changed = [] for field_name in self._initial_values: value = getattr(self, field_name) if self._initial_values[field_name] != value: changed.append(field_name) self._initial_values[field_name] = value return changed def cascade_update(self, changed=True): if not changed: return if getattr(self, "_no_down_model_update", False): return queue = getattr(self, "_queue", settings.CELERY_DEFAULT_QUEUE) for down_model in self.DOWN_MODEL_UPDATE: if not settings.USE_BACKGROUND_TASK: rel = getattr(self, down_model) if hasattr(rel.model, "need_update"): rel.update(need_update=True) continue for item in getattr(self, down_model).all(): if hasattr(self, "_timestamp"): item._timestamp = self._timestamp item._queue = queue if hasattr(item, "cached_label_changed"): item.cached_label_changed() if hasattr(item, "main_geodata"): item.post_save_geo() def no_post_process(self): self.skip_history_when_saving = True self._cached_label_checked = True self._post_saved_geo = True self._external_id_checked = True self._search_updated = True self._no_move = True self._no_down_model_update = True @classmethod def app_label(cls): return cls._meta.app_label @classmethod def model_name(cls): return cls._meta.model_name @classmethod def class_verbose_name(cls): return cls._meta.verbose_name def get_absolute_url(self): try: return reverse("display-item", args=[self.SLUG, self.pk]) except NoReverseMatch: return @classmethod def get_columns(cls, table_cols_attr="TABLE_COLS", dict_col_labels=True): """ :param table_cols_attr: "TABLE_COLS" if not specified :param dict_col_labels: (default: True) if set to False return list matching with table_cols list :return: (table_cols, table_col_labels) """ return get_columns_from_class(cls, table_cols_attr=table_cols_attr, dict_col_labels=dict_col_labels) def get_search_url(self): if self.SLUG: try: return reverse(self.SLUG + "_search") except NoReverseMatch: pass @classmethod def get_quick_actions(cls, user): """ Get a list of (url, title, icon, target) actions for an user """ qas = [] for action in cls.QUICK_ACTIONS: if not action.is_available(user): continue qas.append( [ action.base_url, mark_safe(action.text), action.btn_class, mark_safe(action.rendered_icon), action.target or "", action.is_popup, ] ) return qas @classmethod def get_quick_action_by_url(cls, url): for action in cls.QUICK_ACTIONS: if action.url == url: return action def regenerate_external_id(self): if not hasattr(self, "external_id"): return try: self.external_id = "" self.auto_external_id = True except AttributeError: return self.skip_history_when_saving = True self._no_move = True self.save() def cached_label_changed(self): self.no_post_process() self._cached_label_checked = False cached_label_changed(self.__class__, instance=self, created=False) def post_save_geo(self, save=True): if getattr(self, "_post_saved_geo", False): return self.no_post_process() post_save_geo(self.__class__, instance=self, created=False) return False def external_id_changed(self): self.no_post_process() self._external_id_checked = False external_id_changed(self.__class__, instance=self, created=False) def can_do(self, request, action_name): """ Check permission availability for the current object. :param request: request object :param action_name: action name eg: "archaeological_finds.change_find" :return: boolean """ # overload with OwnPerm when _own_ is relevant if not getattr(request.user, "ishtaruser", None): return False user = request.user return user.ishtaruser.has_permission(action_name) def get_extra_actions(self, request): if not hasattr(self, "SLUG"): return [] actions = [] if request.user.ishtaruser.is_ishtaradmin and hasattr(self, "auto_external_id"): actions += [ ( reverse("regenerate-external-id") + "?{}={}".format(self.SLUG, self.pk), _("Regenerate ID"), "fa fa-key", _("regen."), "btn-warning", True, 200, ) ] return actions class TownManager(models.Manager): def get_by_natural_key(self, numero_insee, year): return self.get(numero_insee=numero_insee, year=year) class Town(GeographicItem, Imported, DocumentItem, MainItem, models.Model): SLUG = "town" name = models.CharField(_("Name"), max_length=100) surface = models.IntegerField(_("Surface (m2)"), blank=True, null=True) center = models.PointField( _("Localisation"), srid=settings.SRID, blank=True, null=True ) limit = models.MultiPolygonField(_("Limit"), blank=True, null=True) numero_insee = models.CharField("Code commune (numéro INSEE)", max_length=120) notice = models.TextField(_("Notice"), blank=True, default="") departement = models.ForeignKey( Department, verbose_name=_("Department"), on_delete=models.SET_NULL, null=True, blank=True, ) year = models.IntegerField( _("Year of creation"), null=True, blank=True, help_text=_( "Filling this field is relevant to distinguish old towns " "from new towns." ), ) children = models.ManyToManyField( "Town", verbose_name=_("Town children"), blank=True, related_name="parents" ) cached_label = models.CharField( _("Cached name"), max_length=500, null=True, blank=True, db_index=True ) documents = models.ManyToManyField( "Document", related_name="towns", verbose_name=_("Documents"), blank=True ) main_image = models.ForeignKey( "Document", related_name="main_image_towns", on_delete=models.SET_NULL, verbose_name=_("Main image"), blank=True, null=True, ) objects = TownManager() class Meta: verbose_name = _("Town") verbose_name_plural = _("Towns") if settings.COUNTRY == "fr": ordering = ["numero_insee"] unique_together = (("numero_insee", "year"),) ADMIN_SECTION = _("Geography") def natural_key(self): return (self.numero_insee, self.year) def history_compress(self): return {"numero_insee": self.numero_insee, "year": self.year or ""} def can_edit(self, request): print("TODO: define specific permissions for towns") return False @classmethod def get_documentation_string(cls): """ Used for automatic documentation generation """ return "**name** {}, **numero_insee** {}, **cached_label** {}".format( _("Name"), "Code commune (numéro INSEE)", _("Cached name") ) @property def surface_ha(self): if not self.surface: return 0 return round(self.surface / 10000.0, 5) @classmethod def _geodata_import_update_town(cls, fields, geo_datas, verbose=False, limit=None): """ specific geodata_import method - import town """ values = fields.copy() geos = [] missing_lbl = _("INSEE code is missing") missing_geo_lbl = _("Geo INSEE is missing") for geo in values.pop("geodata"): geo_id = geo[2] if geo_id not in geo_datas: if verbose: sys.stdout.write(f"\n{BColors.FAIL}geodata: {missing_lbl} ") sys.stdout.write(f"{geo_id}{BColors.ENDC}\n") else: geos.append(geo_datas[geo_id]) main_geo = values["main_geodata"][2] if main_geo not in geo_datas: if verbose: sys.stdout.write(f"\n{BColors.FAIL}main_geodata: {missing_geo_lbl}") sys.stdout.write(f" {main_geo}{BColors.ENDC}\n") values.pop("main_geodata") else: values["main_geodata"] = geo_datas[main_geo] q = Town.objects.filter(numero_insee=values["numero_insee"]) created = False if q.count(): q.update(**values) town = q.all()[0] else: created = True cls.__town_created += 1 town = Town.objects.create(**values) for geo in geos: town.geodata.add(geo) return town, created @classmethod def _geodata_import_update_geodata(cls, fields, geo_datas, limit=None): """ specific geodata_import method - import geodata """ if not getattr(cls, "_town_content_type", None): cls._town_content_type = ContentType.objects.get( app_label="ishtar_common", model="town" ) if not getattr(cls, "_town_town_data_type", None): cls._town_data_type, __ = GeoDataType.objects.get_or_create( txt_idx="town-limit", defaults={"label": _("Town limit")} ) numero_insee = fields.pop('source') if limit and not numero_insee.startswith(limit): return q = Town.objects.filter(numero_insee=numero_insee) values = { "comment": fields["comment"], "multi_polygon": fields["multi_polygon"] } if fields.get("provider", None): values["provider"], __ = GeoProviderType.objects.get_or_create( txt_idx=fields["provider"], defaults={"label": fields["provider"]} ) if q.count(): source_id = q.all()[0].pk q2 = GeoVectorData.objects.filter( source_id=source_id, source_content_type=cls._town_content_type, data_type=cls._town_data_type ) if q2.count(): geo = q2.all()[0] changed = False for k in values: if k == "multi_polygon": if geo.multi_polygon.wkt != values[k]: setattr(geo, k, values[k]) changed = True elif getattr(geo, k) != values[k]: setattr(geo, k, values[k]) changed = True if changed: cls.__geo_updated += 1 geo.save() geo_datas[numero_insee] = geo return values.update({ "source_content_type": cls._town_content_type, "data_type": cls._town_data_type }) cls.__geo_created += 1 geo = GeoVectorData.objects.create(**values) geo_datas[numero_insee] = geo @classmethod def _geodata_import_get_children(cls, insee, towns, children, verbose=False): """ specific geodata_import method - make link between towns """ missing_lbl = _("INSEE code is missing") if insee not in towns: if verbose: sys.stdout.write(f"\n{BColors.FAIL}children: {missing_lbl}") sys.stdout.write(f" {insee}{BColors.ENDC}\n") return town = towns[insee] current_children = list(town.children.values_list("id", flat=True)) for child in children[insee]: if child not in towns: q = Town.objects.filter(numero_insee=child) if not q.count(): sys.stdout.write(f"\n{BColors.FAIL}children-child: {missing_lbl}") sys.stdout.write(f" {child}{BColors.ENDC}\n") continue towns[child] = q.all()[0] if towns[child].id in current_children: continue cls.__nb_rels += 1 town.children.add(towns[child]) @classmethod def geodata_import(cls, src, limit=None, log_path=None, verbose=False): """ Import custom geodata format - src: geodata dict - limit: if provided, only import town with code starting with this limit - verbose: verbose output """ nb_lines = len(src) started = datetime.datetime.now() cls.__geo_updated, cls.__geo_created = 0, 0 cls.__town_created = 0 towns, geo_datas, children = {}, {}, {} for idx, values in enumerate(src): if verbose: sys.stdout.write(get_progress("processing", idx, nb_lines, started)) sys.stdout.flush() fields = values["fields"] if values["model"] == "ishtar_common.town": if limit and not values["numero_insee"].startswith(limit): continue c_children = fields.pop("children") if c_children: children[fields["numero_insee"]] = c_children towns[fields["numero_insee"]], created = cls._geodata_import_update_town( fields, geo_datas, verbose, limit ) if values["model"] == "ishtar_common.geovectordata": cls._geodata_import_update_geodata(fields, geo_datas, limit) # manage geo sources missing_lbl = _("INSEE code is missing") for insee in geo_datas: if insee not in towns: if verbose: sys.stdout.write( f"\n{BColors.FAIL}geodata source: {missing_lbl} " ) sys.stdout.write( f" {insee}{BColors.ENDC}\n" ) else: g = geo_datas[insee] if g.source_id != towns[insee].pk: g.source_id = towns[insee].pk g.save() nb_lines = len(children) started = datetime.datetime.now() cls.__nb_rels = 0 if verbose: print() # management childrens for idx, insee in enumerate(children): if verbose: sys.stdout.write(get_progress("update children", idx, nb_lines, started)) sys.stdout.flush() cls._geodata_import_get_children(insee, towns, children, verbose=verbose) return cls.__town_created, cls.__geo_created, cls.__geo_updated, cls.__nb_rels @property def geodata_export(self): """ Custom geodata export format to manage easily parent and main geodata """ main_geodata, geodata = None, [] if self.main_geodata: main_geodata = ["ishtar_common", "town", self.numero_insee] geodata = [main_geodata] geo = self.main_geodata result = [ { "model": "ishtar_common.town", "fields": { "name": self.name, "surface": self.surface, "numero_insee": self.numero_insee, "notice": self.notice, "year": self.year, "cached_label": self.cached_label, "main_geodata": main_geodata, "geodata": geodata, "children": [ t.numero_insee for t in self.children.filter(numero_insee__isnull=False).all() ] } } ] if not self.main_geodata: return result # put the geodata before the town result = [ { "model": "ishtar_common.geovectordata", "fields": { "name": geo.name if geo.name and geo.name != "-" else self.cached_label, "source_content_type": [ "ishtar_common", "town" ], "source": self.numero_insee, "data_type": [ "town-limit" ], "provider": geo.provider.txt_idx if geo.provider else None, "comment": geo.comment, "cached_x": geo.cached_x, "cached_y": geo.cached_y, "spatial_reference_system": None, "multi_polygon": geo.multi_polygon.wkt } } ] + result return result def get_filename(self): if self.numero_insee: return f"{self.numero_insee} - {slugify(self.name)}" return slugify(self.name) def associated_filename(self): return self.get_filename() def get_values(self, prefix="", **kwargs): return { prefix or "label": str(self), prefix + "name": self.name, prefix + "numero_insee": self.numero_insee, } @classmethod def history_decompress(cls, full_value, create=False): if not full_value: return [] res = [] for value in full_value: try: res.append( cls.objects.get( numero_insee=value["numero_insee"], year=value["year"] or None ) ) except cls.DoesNotExist: continue return res def __str__(self): return self.cached_label or "" def geodata_child_item_queries(self): return [self.sites, self.operations] @property def label_with_areas(self): label = [self.name] if self.numero_insee: label.append("({})".format(self.numero_insee)) for area in self.areas.all(): label.append(" - ") label.append(area.full_label) label = " ".join(label) if self.children.count(): label += str(_(", old town of ")) + " ; ".join([ "{label} ({code})".format(label=p.name, code=p.numero_insee) if p.numero_insee else p.name for p in self.children.all() ]) return label @property def detail_label(self): return self.label_with_areas def generate_geo(self, force=False): force = self.generate_limit(force=force) self.generate_center(force=force) self.generate_area(force=force) def generate_limit(self, force=False): if not force and self.limit: return parents = None if not self.parents.count(): return for parent in self.parents.all(): if not parent.limit: return if not parents: parents = parent.limit else: parents = parents.union(parent.limit) # if union is a simple polygon make it a multi if "MULTI" not in parents.wkt: parents = parents.wkt.replace("POLYGON", "MULTIPOLYGON(") + ")" if not parents: return self.limit = parents self.save() return True def generate_center(self, force=False): if not force and (self.center or not self.limit): return self.center = self.limit.centroid if not self.center: return False self.save() return True def generate_area(self, force=False): if not force and (self.surface or not self.limit): return try: surface = self.limit.transform(settings.SURFACE_SRID, clone=True).area except GDALException: return False if surface > 214748364 or not surface: return False self.surface = surface self.save() return True def update_town_code(self): if not self.numero_insee or not self.children.count() or not self.year: return old_num = self.numero_insee[:] numero = old_num.split("-")[0] base_insee = "{}-{}".format(numero, self.year) self.numero_insee = base_insee idx = 0 while Town.objects.filter( year=self.year, numero_insee=self.numero_insee).exclude( pk=self.pk).count(): idx += 1 self.numero_insee = base_insee + "-" + str(idx) if self.numero_insee != old_num: return True def _get_base_image_path(self): if self.numero_insee and len(self.numero_insee) == 5: prefix = self.numero_insee[:2] return f"{self.SLUG}/{prefix}" return self.SLUG def _generate_cached_label(self): cached_label = self.name if settings.COUNTRY == "fr" and self.numero_insee \ and "X" not in self.numero_insee: dpt_len = 2 if ( self.numero_insee.startswith("97") or self.numero_insee.startswith("98") or self.numero_insee[0] not in ("0", "1", "2", "3", "4", "5", "6", "7", "8", "9") ): dpt_len = 3 cached_label = "%s - %s" % (self.name, self.numero_insee[:dpt_len]) if self.year and self.children.count(): cached_label += " ({})".format(self.year) return cached_label def get_extra_actions(self, request): """ For sheet template """ # url, base_text, icon, extra_text, extra css class, is a quick action actions = super().get_extra_actions(request) profile = get_current_profile() can_add_geo = profile.mapping and self.can_do( request, "ishtar_common.add_geovectordata" ) if can_add_geo: actions.append(self.get_add_geo_action()) return actions def post_save_town(sender, **kwargs): cached_label_changed(sender, **kwargs) town = kwargs["instance"] town.generate_geo() if town.update_town_code(): town.save() post_save.connect(post_save_town, sender=Town) m2m_changed.connect(geodata_attached_changed, sender=Town.geodata.through) m2m_changed.connect(document_attached_changed, sender=Town.documents.through) def town_child_changed(sender, **kwargs): town = kwargs["instance"] if town.update_town_code(): town.save() m2m_changed.connect(town_child_changed, sender=Town.children.through) def geotown_attached_changed(sender, **kwargs): # manage associated geoitem profile = get_current_profile() if not profile.mapping: return instance = kwargs.get("instance", None) model = kwargs.get("model", None) pk_set = kwargs.get("pk_set", None) action = kwargs.get("action", None) if not instance or not model or not hasattr(instance, "post_save_geo"): return instance._post_save_geo_ok = False if action in ("post_add", "post_remove", "post_clear"): instance.post_save_geo(save=True) class Address(BaseHistorizedItem): FIELDS = ( "address", "address_complement", "postal_code", "town", "precise_town_id", "country", "alt_address", "alt_address_complement", "alt_postal_code", "alt_town", "alt_country", "phone", "phone_desc", "phone2", "phone_desc2", "phone3", "phone_desc3", "raw_phone", "mobile_phone", "email", "alt_address_is_prefered", ) address = models.TextField(_("Address"), blank=True, default="") address_complement = models.TextField( _("Address complement"), blank=True, default="" ) postal_code = models.CharField( _("Postal code"), max_length=10, null=True, blank=True ) town = models.CharField(_("Town (freeform)"), max_length=150, null=True, blank=True) precise_town_id = models.PositiveIntegerField( verbose_name=_("Town (precise)"), null=True, blank=True, ) country = models.CharField(_("Country"), max_length=30, null=True, blank=True) alt_address = models.TextField(_("Other address: address"), blank=True, default="") alt_address_complement = models.TextField( _("Other address: address complement"), blank=True, default="" ) alt_postal_code = models.CharField( _("Other address: postal code"), max_length=10, null=True, blank=True ) alt_town = models.CharField( _("Other address: town"), max_length=70, null=True, blank=True ) alt_country = models.CharField( _("Other address: country"), max_length=30, null=True, blank=True ) phone = models.CharField(_("Phone"), max_length=32, null=True, blank=True) phone_desc = models.CharField( _("Phone description"), max_length=300, null=True, blank=True ) phone2 = models.CharField( _("Phone description 2"), max_length=32, null=True, blank=True ) phone_desc2 = models.CharField( _("Phone description 2"), max_length=300, null=True, blank=True ) phone3 = models.CharField(_("Phone 3"), max_length=32, null=True, blank=True) phone_desc3 = models.CharField( _("Phone description 3"), max_length=300, null=True, blank=True ) raw_phone = models.TextField(_("Raw phone"), blank=True, default="") mobile_phone = models.CharField( _("Mobile phone"), max_length=32, null=True, blank=True ) email = models.EmailField(_("Email"), max_length=300, blank=True, null=True) alt_address_is_prefered = models.BooleanField( _("Alternative address is prefered"), default=False ) history = HistoricalRecords(inherit=True) SUB_ADDRESSES = [] ALT_NAMES = { "precise_town_id": SearchAltName( pgettext_lazy("key for text search", "town"), "precise_town_id" ), "area": SearchAltName( pgettext_lazy("key for text search", "area"), "precise_town__areas__label__iexact" ), } class Meta: abstract = True @property def precise_town(self): if hasattr(self, "_precise_town"): return self._precise_town if not self.precise_town_id: self._precise_town = None else: try: self._precise_town = Town.objects.get(id=self.precise_town_id) except Town.DoesNotExist: self._precise_town = None return self._precise_town @property def precise_town_name(self): if not self.precise_town: return "" return self.precise_town.name def get_values(self, prefix="", no_values=False, filtr=None, **kwargs): values = super().get_values( prefix=prefix, no_values=no_values, filtr=filtr, **kwargs ) values[f"{prefix}precise_town_name"] = self.precise_town_name return values @post_importer_action def set_town_by_code(self, context, value): try: town = Town.objects.get(numero_insee=value) except Town.DoesNotExist: raise ImporterError( str(_("Town with code: {} does not exists")).format(value) ) self.precise_town_id = town.pk self.skip_history_when_saving = True self.save() return self set_town_by_code.post_save = True def get_short_html_items(self): items = [] if self.address: items.append("""{}""".format(self.address)) if self.address_complement: items.append( """{}""".format( self.address_complement ) ) if self.postal_code: items.append( """{}""".format(self.postal_code) ) if self.precise_town: items.append( """{}""".format(self.precise_town.name) ) elif self.town: items.append("""{}""".format(self.town)) if self.country: items.append("""{}""".format(self.country)) return items def get_short_html_detail(self): html = """
""" items = self.get_short_html_items() if not items: items = [ "{}".format(_("No associated address")) ] html += "".join(items) html += """
""" return html def get_town_centroid(self): if self.precise_town: return self.precise_town.center, self._meta.verbose_name for sub_address in self.SUB_ADDRESSES: sub_item = getattr(self, sub_address) if sub_item and sub_item.precise_town: return sub_item.precise_town.center, sub_item._meta.verbose_name def get_town_polygons(self): if self.precise_town: return self.precise_town.limit, self._meta.verbose_name for sub_address in self.SUB_ADDRESSES: sub_item = getattr(self, sub_address) if sub_item and sub_item.precise_town: return sub_item.precise_town.limit, sub_item._meta.verbose_name def get_attribute(self, attr): if self.town or self.precise_town: return getattr(self, attr) for sub_address in self.SUB_ADDRESSES: sub_item = getattr(self, sub_address) if not sub_item: continue if sub_item.town or sub_item.precise_town: return getattr(sub_item, attr) return getattr(self, attr) def get_address(self): return self.get_attribute("address") def get_address_complement(self): return self.get_attribute("address_complement") def get_postal_code(self): return self.get_attribute("postal_code") def get_town(self): return self.get_attribute("town") def get_precise_town(self): return self.get_attribute("precise_town") def get_country(self): return self.get_attribute("country") def simple_lbl(self): return str(self) def full_address(self): lbl = self.simple_lbl() if lbl: lbl += "\n" lbl += self.address_lbl() return lbl def address_lbl(self, list=False): lbls = [] prefix = "" if self.alt_address_is_prefered: prefix = "alt_" if getattr(self, prefix + "address"): lbls.append(( getattr(self, prefix + "address"), _("Address") )) if getattr(self, prefix + "address_complement"): lbls.append(( getattr(self, prefix + "address_complement"), _("Address complement") )) postal_code = getattr(self, prefix + "postal_code") town = getattr(self, prefix + "town") if postal_code or town: lbls.append(( " ".join([postal_code, town]), _("Postal code - Town") )) if self.phone: lbls.append(( self.phone, _("Phone") )) if self.mobile_phone: lbls.append(( self.mobile_phone, _("Mobile") )) if self.email: lbls.append(( self.email, _("Email") )) if list: return lbls return "\n".join([ value for value, lbl in lbls ]) def address_lbl_list(self): return self.address_lbl(list=True) class Merge(models.Model): merge_key = models.TextField(_("Merge key"), blank=True, null=True) merge_candidate = models.ManyToManyField("self", blank=True) merge_exclusion = models.ManyToManyField("self", blank=True) archived = models.BooleanField(default=False, blank=True, null=True) # 1 for one word similarity, 2 for two word similarity, etc. MERGE_CLEMENCY = None EMPTY_MERGE_KEY = "--" MERGE_ATTRIBUTE = "name" class Meta: abstract = True def generate_merge_key(self): if self.archived: return merge_attr = getattr(self, self.MERGE_ATTRIBUTE) self.merge_key = slugify(merge_attr if merge_attr else "") if not self.merge_key: self.merge_key = self.EMPTY_MERGE_KEY self.merge_key = self.merge_key def generate_merge_candidate(self): if self.archived: return if not self.merge_key: self.generate_merge_key() self.save(merge_key_generated=True) if not self.pk or self.merge_key == self.EMPTY_MERGE_KEY: return q = ( self.__class__.objects.exclude(pk=self.pk) .exclude(merge_exclusion=self) .exclude(merge_candidate=self) .exclude(archived=True) ) if not self.MERGE_CLEMENCY: q = q.filter(merge_key=self.merge_key) else: subkeys_front = "-".join(self.merge_key.split("-")[: self.MERGE_CLEMENCY]) subkeys_back = "-".join(self.merge_key.split("-")[-self.MERGE_CLEMENCY :]) q = q.filter( Q(merge_key__istartswith=subkeys_front) | Q(merge_key__iendswith=subkeys_back) ) for item in q.all(): self.merge_candidate.add(item) def save(self, *args, **kwargs): # prevent circular save merge_key_generated = False if "merge_key_generated" in kwargs: merge_key_generated = kwargs.pop("merge_key_generated") self.generate_merge_key() item = super(Merge, self).save(*args, **kwargs) if not merge_key_generated: self.merge_candidate.clear() self.generate_merge_candidate() return item def archive(self): self.archived = True self.save() self.merge_candidate.clear() self.merge_exclusion.clear() def merge(self, item, keep_old=False, exclude_fields=None): merge_model_objects( self, item, keep_old=keep_old, exclude_fields=exclude_fields ) self.generate_merge_candidate() def __get_stats_cache_values(model_name, model_pk): StatsCache = apps.get_model("ishtar_common", "StatsCache") q = StatsCache.objects.filter(model=model_name, model_pk=model_pk) nb = q.count() if nb >= 1: sc = q.all()[0] for extra in q.order_by("-id").all()[1:]: extra.delete() else: sc = StatsCache.objects.create(model=model_name, model_pk=model_pk) values = sc.values if not values: values = {} return sc, values @task() def _update_stats(app, model, model_pk, funcname): model_name = app + "." + model model = apps.get_model(app, model) try: item = model.objects.get(pk=model_pk) except model.DoesNotExist: return value = getattr(item, funcname)() sc, current_values = __get_stats_cache_values(model_name, model_pk) current_values[funcname] = value sc.values = current_values sc.update_requested = None sc.updated = timezone.now() sc.save() def update_stats(statscache, item, funcname): if not settings.USE_BACKGROUND_TASK: current_values = statscache.values if not current_values: current_values = {} value = getattr(item, funcname)() current_values[funcname] = value statscache.values = current_values statscache.updated = timezone.now() statscache.save() return current_values now = timezone.now() app_name = item._meta.app_label model_name = item._meta.model_name statscache.update_requested = now.isoformat() statscache.save() _update_stats.delay(app_name, model_name, item.pk, funcname) return statscache.values class DashboardFormItem: """ Provide methods to manage statistics """ def last_stats_update(self): model_name = self._meta.app_label + "." + self._meta.model_name StatsCache = apps.get_model("ishtar_common", "StatsCache") q = StatsCache.objects.filter(model=model_name, model_pk=self.pk).order_by( "-updated" ) if not q.count(): return return q.all()[0].updated def _get_or_set_stats(self, funcname, update=False, expected_type=None): model_name = self._meta.app_label + "." + self._meta.model_name StatsCache = apps.get_model("ishtar_common", "StatsCache") sc, __ = StatsCache.objects.get_or_create(model=model_name, model_pk=self.pk) if not update: values = sc.values if funcname not in values: if expected_type is not None: return expected_type() return 0 else: values = update_stats(sc, self, funcname) if funcname in values: values = values[funcname] else: values = 0 if expected_type is not None and not isinstance(values, expected_type): return expected_type() return values @classmethod def get_periods(cls, slice="month", fltr={}, date_source="creation"): date_var = date_source + "_date" q = cls.objects.filter(**{date_var + "__isnull": False}) if fltr: q = q.filter(**fltr) if slice == "year": return [ res[date_var].year for res in list(q.values(date_var).annotate(Count("id")).order_by()) ] elif slice == "month": return [ (res[date_var].year, res[date_var].month) for res in list(q.values(date_var).annotate(Count("id")).order_by()) ] return [] @classmethod def get_by_year(cls, year, fltr={}, date_source="creation"): date_var = date_source + "_date" q = cls.objects.filter(**{date_var + "__isnull": False}) if fltr: q = q.filter(**fltr) return q.filter(**{date_var + "__year": year}).order_by("pk").distinct("pk") @classmethod def get_by_month(cls, year, month, fltr={}, date_source="creation"): date_var = date_source + "_date" q = cls.objects.filter(**{date_var + "__isnull": False}) if fltr: q = q.filter(**fltr) q = q.filter(**{date_var + "__year": year, date_var + "__month": month}) return q.order_by("pk").distinct("pk") @classmethod def get_total_number(cls, fltr=None): q = cls.objects if fltr: q = q.filter(**fltr) return q.order_by("pk").distinct("pk").count() class QuickAction: """ Quick action available from tables """ def __init__( self, url, icon_class="", text="", target=None, rights=None, module=None, is_popup=True, btn_class="btn-success" ): self.url = url self.icon_class = icon_class self.text = text self.rights = rights self.target = target self.module = module self.is_popup = is_popup self.btn_class = btn_class if self.target not in ("one", "many", None): raise AttributeError("target must be one, many or None") def is_available(self, user, obj=None): if self.module and not getattr(get_current_profile(), self.module): return False if not self.rights: # no restriction return True if not user or not hasattr(user, "ishtaruser") or not user.ishtaruser: return False user = user.ishtaruser for right in self.rights: if user.has_permission(right, obj=obj): return True return False @property def rendered_icon(self): if not self.icon_class: return "" return "".format(self.icon_class) @property def base_url(self): if self.target is None: url = reverse(self.url) else: # put arbitrary pk for the target url = reverse(self.url, args=[0]) url = url[:-2] # all quick action url have to finish with the # pk of the selected item and a "/" return url class DynamicRequest: def __init__( self, label, app_name, model_name, form_key, search_key, type_query, search_query, ): self.label = label self.form_key = form_key self.search_key = search_key self.app_name = app_name self.model_name = model_name self.type_query = type_query self.search_query = search_query def get_all_types(self): model = apps.get_app_config(self.app_name).get_model(self.model_name) return model.objects.filter(available=True) def get_form_fields(self): fields = {} for item in self.get_all_types().all(): fields[self.form_key + "-" + item.txt_idx] = forms.CharField( label=str(self.label) + " " + str(item), required=False ) return fields def get_extra_query(self, slug): return {self.type_query: slug} def get_alt_names(self): alt_names = {} for item in self.get_all_types().all(): alt_names[self.form_key + "-" + item.txt_idx] = SearchAltName( self.search_key + "-" + item.txt_idx, self.search_query, self.get_extra_query(item.txt_idx), distinct_query=True, ) return alt_names class GeoItem(GeographicItem): NUMBER_FIELDS = ["geodata__cached_z"] # gis - to be removed GEO_SOURCE = (("T", _("Town")), ("P", _("Precise")), ("M", _("Polygon"))) x = models.FloatField(_("X"), blank=True, null=True) y = models.FloatField(_("Y"), blank=True, null=True) z = models.FloatField(_("Z"), blank=True, null=True) estimated_error_x = models.FloatField( _("Estimated error for X"), blank=True, null=True ) estimated_error_y = models.FloatField( _("Estimated error for Y"), blank=True, null=True ) estimated_error_z = models.FloatField( _("Estimated error for Z"), blank=True, null=True ) spatial_reference_system = models.ForeignKey( SpatialReferenceSystem, verbose_name=_("Spatial Reference System"), blank=True, null=True, on_delete=models.PROTECT, ) point = models.PointField(_("Point"), blank=True, null=True, dim=3) point_2d = models.PointField(_("Point (2D)"), blank=True, null=True) point_source = models.CharField( _("Point source"), choices=GEO_SOURCE, max_length=1, blank=True, null=True ) point_source_item = models.CharField( _("Point source item"), max_length=100, blank=True, null=True ) multi_polygon = models.MultiPolygonField(_("Multi polygon"), blank=True, null=True) multi_polygon_source = models.CharField( _("Multi-polygon source"), choices=GEO_SOURCE, max_length=1, blank=True, null=True, ) multi_polygon_source_item = models.CharField( _("Multi polygon source item"), max_length=100, blank=True, null=True ) GEO_LABEL = "" class Meta: abstract = True def get_town_centroid(self): raise NotImplementedError def get_town_polygons(self): raise NotImplementedError @property def X(self): """x coordinates using the default SRS""" coord = self.display_coordinates if not coord: return return coord[0] @property def Y(self): """y coordinates using the default SRS""" coord = self.display_coordinates if not coord: return return coord[1] @property def display_coordinates(self, rounded=True): if not self.main_geodata: return "" return self.main_geodata.display_coordinates(rounded=rounded) @property def display_spatial_reference_system(self): profile = get_current_profile() if not profile.display_srs or not profile.display_srs.srid: return self.spatial_reference_system return profile.display_srs def get_precise_points(self): if self.point_source == "P" and self.point_2d: return self.point_2d, self.point, self.point_source_item def get_precise_polygons(self): if self.multi_polygon_source == "P" and self.multi_polygon: return self.multi_polygon, self.multi_polygon_source_item def get_geo_items(self, rounded=5): if not self.main_geodata: return {} return self.main_geodata.get_geo_items(rounded) def convert_coordinates(self, point_2d, rounded): profile = get_current_profile() if ( not profile.display_srs or not profile.display_srs.srid or ( profile.display_srs == self.spatial_reference_system and point_2d.x and point_2d.y ) ): x, y = point_2d.x, point_2d.y else: point = point_2d.transform(profile.display_srs.srid, clone=True) x, y = point.x, point.y if rounded: return [round(x, 5), round(y, 5)] return [x, y] def _geojson_serialize(self, geom_attr): if not hasattr(self, geom_attr): return "" cached_label_key = "cached_label" if self.GEO_LABEL: cached_label_key = self.GEO_LABEL if getattr(self, "CACHED_LABELS", None): cached_label_key = self.CACHED_LABELS[-1] geojson = serialize( "geojson", self.__class__.objects.filter(pk=self.pk), geometry_field=geom_attr, fields=(cached_label_key,), ) geojson_dct = json.loads(geojson) profile = get_current_profile() precision = profile.point_precision features = geojson_dct.pop("features") for idx in range(len(features)): feature = features[idx] lbl = feature["properties"].pop(cached_label_key) feature["properties"]["name"] = lbl feature["properties"]["id"] = self.pk if precision is not None: geom_type = feature["geometry"].get("type", None) if geom_type == "Point": feature["geometry"]["coordinates"] = [ round(coord, precision) for coord in feature["geometry"]["coordinates"] ] geojson_dct["features"] = features geojson_dct["link_template"] = simple_link_to_window(self).replace( "999999", "" ) geojson = json.dumps(geojson_dct) return geojson @property def point_2d_geojson(self): return self._geojson_serialize("point_2d") @property def multi_polygon_geojson(self): return self._geojson_serialize("multi_polygon") class ImageContainerModel: def _get_image_path(self, filename): return "{}/{}".format(self._get_base_image_path(), filename) def _get_base_image_path(self): n = timezone.now() return "upload/{}/{:02d}/{:02d}".format(n.year, n.month, n.day) class CompleteIdentifierItem(models.Model, ImageContainerModel): HAS_QR_CODE = True cached_label = models.TextField( _("Cached name"), blank=True, default="", db_index=True, help_text=_("Generated automatically - do not edit"), ) complete_identifier = models.TextField( _("Complete identifier"), blank=True, default="" ) custom_index = models.IntegerField(_("Custom index"), blank=True, null=True) qrcode = models.ImageField( upload_to=get_image_path, blank=True, null=True, max_length=255 ) class Meta: abstract = True @property def qrcode_path(self): if not self.qrcode or ( self.qrcode.path and not os.path.exists(self.qrcode.path)): self.generate_qrcode() if not self.qrcode: # error on qrcode generation return "" return self.qrcode.path def _profile_generate_cached_label(self): slug = getattr(self, "SLUG", None) if not slug: return return get_generated_id(slug + "_cached_label", self) def _generate_cached_label(self): label = self._profile_generate_cached_label() if not label: # to be eventually overloaded by parent class return str(self) return label def generate_qrcode(self, request=None, secure=True, tmpdir=None): if not hasattr(self, "get_absolute_url"): return url = self.get_absolute_url() site = Site.objects.get_current() if request: scheme = request.scheme else: if secure: scheme = "https" else: scheme = "http" url = scheme + "://" + site.domain + url TinyUrl = apps.get_model("ishtar_common", "TinyUrl") tiny_url = TinyUrl() tiny_url.link = url tiny_url.save() short_url = ( scheme + "://" + site.domain + reverse("tiny-redirect", args=[tiny_url.get_short_id()]) ) qr = pyqrcode.create(short_url, version=settings.ISHTAR_QRCODE_VERSION) tmpdir_created = False if not tmpdir: tmpdir = tempfile.mkdtemp("-qrcode") tmpdir_created = True filename = tmpdir + os.sep + "qrcode.png" qr.png(filename, scale=settings.ISHTAR_QRCODE_SCALE) self.skip_history_when_saving = True self._no_move = True self._no_geo_check = True with open(filename, "rb") as qrfile: self.qrcode.save("qrcode.png", File(qrfile)) self.save() if tmpdir_created: shutil.rmtree(tmpdir) def generate_complete_identifier(self): SLUG = getattr(self, "SLUG", None) if not SLUG: return "" complete_identifier = get_generated_id(SLUG + "_complete_identifier", self) if complete_identifier: return complete_identifier cached_label_key = "cached_label" if getattr(self, "GEO_LABEL", None): cached_label_key = getattr(self, "GEO_LABEL", None) if hasattr(self, "CACHED_COMPLETE_ID"): cached_label_key = self.CACHED_COMPLETE_ID if not cached_label_key: return complete_identifier = getattr(self, cached_label_key) return complete_identifier def get_index_whole_db(self): q = self.__class__.objects.exclude(custom_index__isnull=True) q = q.order_by("-custom_index") if q.count(): current_index = q.values_list("custom_index", flat=True).all()[0] return current_index + 1 return 1 def generate_custom_index(self, force=False): if not self.pk: return if self.custom_index and not force: return self.custom_index SLUG = getattr(self, "SLUG", None) if not SLUG: return k = SLUG + "_custom_index" profile = get_current_profile() if not hasattr(profile, k): return key = getattr(profile, k) if not key or not key.strip(): return keys = key.strip().split(";") if len(keys) == 1 and hasattr(self, "get_index_" + keys[0]): # custom index generation return getattr(self, "get_index_" + key)() model = self.__class__ try: self_keys = set(list(model.objects.filter(pk=self.pk).values_list(*keys))) except Exception: # bad settings - not managed here print("Bad settings for custom_index {}".format(";".join(keys))) return if len(self_keys) != 1: # key is not distinct return self_key = self_keys.pop() return self._get_index(keys, self_key) def _get_index(self, keys: list, self_keys: list): model = self.__class__ q = model.objects if self.pk: q = model.objects.exclude(pk=self.pk) for idx, key in enumerate(keys): q = q.filter(**{key: self_keys[idx]}) try: r = q.aggregate(max_index=Max("custom_index")) except Exception: # bad settings return if not r["max_index"]: return 1 return r["max_index"] + 1 def save(self, *args, **kwargs): super(CompleteIdentifierItem, self).save(*args, **kwargs) self.regenerate_all_ids() def regenerate_all_ids(self, save=True): if getattr(self, "_prevent_loop", False): return {} updated = {} custom_index = self.generate_custom_index() if custom_index != self.custom_index: self.custom_index = custom_index updated["custom_index"] = custom_index complete_id = self.generate_complete_identifier() if complete_id and complete_id != self.complete_identifier: self.complete_identifier = complete_id updated["complete_identifier"] = complete_id if updated: self._prevent_loop = True self.skip_history_when_saving = True if save: if self.pk: self.__class__.objects.filter(pk=self.pk).update(**updated) else: self.save() return updated class SearchVectorConfig: def __init__(self, key, language=None, func=None): self.key = key if language: self.language = language if language == "local": self.language = settings.ISHTAR_SEARCH_LANGUAGE else: self.language = "simple" self.func = func def format(self, value): if value == "None": value = "" if not self.func: return [value] value = self.func(value) if not isinstance(value, list): return [value] return value