summaryrefslogtreecommitdiff
path: root/ishtar_common/models.py
diff options
context:
space:
mode:
authorÉtienne Loks <etienne.loks@iggdrasil.net>2020-10-07 19:09:30 +0200
committerÉtienne Loks <etienne.loks@iggdrasil.net>2021-02-28 12:15:21 +0100
commit1e3da04336b9095e4497d098ea19c3178bc74cf6 (patch)
tree9cd21bf7e51d271b958a9a4b2b85367adbb97992 /ishtar_common/models.py
parentcf6139abc5a64a2ff52eccdcf0424870cff6d57c (diff)
downloadIshtar-1e3da04336b9095e4497d098ea19c3178bc74cf6.tar.bz2
Ishtar-1e3da04336b9095e4497d098ea19c3178bc74cf6.zip
Refactoring of models. Document container - declare only id
Diffstat (limited to 'ishtar_common/models.py')
-rw-r--r--ishtar_common/models.py2890
1 files changed, 54 insertions, 2836 deletions
diff --git a/ishtar_common/models.py b/ishtar_common/models.py
index 001c1894f..f23cab926 100644
--- a/ishtar_common/models.py
+++ b/ishtar_common/models.py
@@ -21,7 +21,6 @@
Models description
"""
import copy
-from collections import OrderedDict
import datetime
import inspect
from importlib import import_module
@@ -29,9 +28,6 @@ from jinja2 import TemplateSyntaxError, UndefinedError
import json
import logging
import os
-import pyqrcode
-import re
-import shutil
import string
import tempfile
import time
@@ -46,7 +42,6 @@ import zipfile
from urllib.parse import urlencode
from xml.etree import ElementTree as ET
-from django import forms
from django.apps import apps
from django.conf import settings
from django.contrib.auth.models import User, Group
@@ -54,32 +49,22 @@ from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.contrib.gis.db import models
from django.contrib.postgres.fields import JSONField
-from django.contrib.postgres.search import SearchVectorField, SearchVector
from django.contrib.postgres.indexes import GinIndex
from django.contrib.sites.models import Site
from django.core.cache import cache
from django.core.exceptions import ObjectDoesNotExist, ValidationError, \
MultipleObjectsReturned
from django.core.files.uploadedfile import SimpleUploadedFile
-from django.core.files import File
-from django.core.serializers import serialize
from django.core.urlresolvers import reverse, NoReverseMatch
-from django.core.validators import validate_slug
-from django.db import connection
from django.db.models import Q, Max, Count, F
from django.db.models.signals import post_save, post_delete, m2m_changed
from django.db.utils import DatabaseError
from django.template.defaultfilters import slugify
from django.utils.functional import lazy
-from django.utils.safestring import SafeText, mark_safe
-from django.utils.translation import activate, deactivate
from ishtar_common.utils import ugettext_lazy as _, ugettext, \
- pgettext_lazy
+ pgettext_lazy, get_external_id, get_current_profile, duplicate_item, \
+ get_image_path
from ishtar_common.utils_secretary import IshtarSecretaryRenderer
-from simple_history.models import HistoricalRecords as BaseHistoricalRecords
-from simple_history.signals import post_create_historical_record, \
- pre_create_historical_record
-from unidecode import unidecode
from ishtar_common.alternative_configs import ALTERNATE_CONFIGS, \
ALTERNATE_CONFIGS_CHOICES
@@ -87,16 +72,24 @@ from ishtar_common.alternative_configs import ALTERNATE_CONFIGS, \
from ishtar_common.data_importer import pre_importer_action
from ishtar_common.model_managers import SlugModelManager, ExternalIdManager, \
- TypeManager, UUIDModelManager
+ UUIDModelManager
from ishtar_common.model_merging import merge_model_objects
from ishtar_common.models_imports import ImporterModel, ImporterType, \
ImporterDefault, ImporterDefaultValues, ImporterColumn, \
ImporterDuplicateField, Regexp, ImportTarget, TargetKey, FormaterType, \
Import, TargetKeyGroup, ValueFormater
-from ishtar_common.templatetags.link_to_window import simple_link_to_window
-from ishtar_common.utils import get_cache, disable_for_loaddata, create_slug, \
- get_all_field_names, merge_tsvectors, cached_label_changed, post_save_geo, \
- generate_relation_graph, max_size_help, task
+from ishtar_common.utils import get_cache, create_slug, \
+ get_all_field_names, cached_label_changed, \
+ generate_relation_graph, max_size_help
+
+from ishtar_common.models_common import GeneralType, HierarchicalType, \
+ BaseHistorizedItem, LightHistorizedItem, FullSearch, Imported, \
+ FixAssociated, SearchAltName, HistoryError, OwnPerms, Cached, \
+ Address, post_save_cache, TemplateItem, SpatialReferenceSystem, \
+ DashboardFormItem, document_attached_changed, SearchAltName, \
+ DynamicRequest, GeoItem, QRCodeItem, SearchVectorConfig, DocumentItem, \
+ QuickAction, MainItem, Merge, ShortMenuItem, Town, ImageContainerModel, \
+ StatisticItem, CachedGen, CascasdeUpdate, Department, State
__all__ = [
'ImporterModel', 'ImporterType', 'ImporterDefault', 'ImporterDefaultValues',
@@ -107,7 +100,8 @@ __all__ = [
'LightHistorizedItem', 'OwnPerms', 'Address', 'post_save_cache',
'DashboardFormItem', 'ShortMenuItem', 'document_attached_changed',
'SearchAltName', 'DynamicRequest', 'GeoItem', 'QRCodeItem',
- 'SearchVectorConfig', 'DocumentItem'
+ 'SearchVectorConfig', 'DocumentItem', 'CachedGen', 'StatisticItem',
+ 'CascasdeUpdate', 'Department', 'State'
]
logger = logging.getLogger(__name__)
@@ -262,88 +256,6 @@ class HistoryModel(models.Model):
create=create)
-class HistoricalRecords(BaseHistoricalRecords):
- def _save_historic(self, manager, instance, history_date, history_type,
- history_user, history_change_reason, using, attrs):
- history_instance = manager.model(
- history_date=history_date,
- history_type=history_type,
- history_user=history_user,
- history_change_reason=history_change_reason,
- **attrs
- )
-
- pre_create_historical_record.send(
- sender=manager.model,
- instance=instance,
- history_date=history_date,
- history_user=history_user,
- history_change_reason=history_change_reason,
- history_instance=history_instance,
- using=using,
- )
-
- history_instance.save(using=using)
-
- post_create_historical_record.send(
- sender=manager.model,
- instance=instance,
- history_instance=history_instance,
- history_date=history_date,
- history_user=history_user,
- history_change_reason=history_change_reason,
- using=using,
- )
-
- def create_historical_record(self, instance, history_type, using=None):
- try:
- history_modifier = getattr(instance, 'history_modifier', None)
- assert history_modifier
- except (User.DoesNotExist, AssertionError):
- # on batch removing of users, user could have disappeared
- return
- history_date = getattr(instance, "_history_date",
- datetime.datetime.now())
- history_change_reason = getattr(instance, "changeReason", None)
- force = getattr(instance, "_force_history", False)
- manager = getattr(instance, self.manager_name)
- attrs = {}
- for field in instance._meta.fields:
- attrs[field.attname] = getattr(instance, field.attname)
- q_history = instance.history \
- .filter(history_modifier_id=history_modifier.pk) \
- .order_by('-history_date', '-history_id')
- # instance.skip_history_when_saving = True
- if not q_history.count():
- if force:
- delattr(instance, '_force_history')
- self._save_historic(
- manager, instance, history_date, history_type, history_modifier,
- history_change_reason, using, attrs)
- return
- old_instance = q_history.all()[0]
- # multiple saving by the same user in a very short time are generaly
- # caused by post_save signals it is not relevant to keep them
- min_history_date = datetime.datetime.now() \
- - datetime.timedelta(seconds=5)
- q = q_history.filter(history_date__isnull=False,
- history_date__gt=min_history_date) \
- .order_by('-history_date', '-history_id')
- if not force and q.count():
- return
-
- if force:
- delattr(instance, '_force_history')
-
- # record a new version only if data have been changed
- for field in instance._meta.fields:
- if getattr(old_instance, field.attname) != attrs[field.attname]:
- self._save_historic(manager, instance, history_date,
- history_type, history_modifier,
- history_change_reason, using, attrs)
- return
-
-
def valid_id(cls):
# valid ID validator for models
def func(value):
@@ -383,741 +295,6 @@ def is_unique(cls, field):
return func
-class OwnPerms(object):
- """
- Manage special permissions for object's owner
- """
-
- @classmethod
- def get_query_owns(cls, ishtaruser):
- """
- Query object to get own items
- """
- return None # implement for each object
-
- def can_view(self, request):
- if hasattr(self, "LONG_SLUG"):
- perm = "view_" + self.LONG_SLUG
- else:
- perm = "view_" + self.SLUG
- return self.can_do(request, perm)
-
- def can_do(self, request, action_name):
- """
- Check permission availability for the current object.
- :param request: request object
- :param action_name: action name eg: "change_find" - "own" variation is
- checked
- :return: boolean
- """
- if not getattr(request.user, 'ishtaruser', None):
- return False
- splited = action_name.split('_')
- action_own_name = splited[0] + '_own_' + '_'.join(splited[1:])
- user = request.user
- if action_own_name == "view_own_findbasket":
- action_own_name = "view_own_find"
- return user.ishtaruser.has_right(action_name, request.session) or \
- (user.ishtaruser.has_right(action_own_name, request.session)
- and self.is_own(user.ishtaruser))
-
- def is_own(self, user, alt_query_own=None):
- """
- Check if the current object is owned by the user
- """
- if isinstance(user, IshtarUser):
- ishtaruser = user
- elif hasattr(user, 'ishtaruser'):
- ishtaruser = user.ishtaruser
- else:
- return False
- if not alt_query_own:
- query = self.get_query_owns(ishtaruser)
- else:
- query = getattr(self, alt_query_own)(ishtaruser)
- if not query:
- return False
- query &= Q(pk=self.pk)
- return self.__class__.objects.filter(query).count()
-
- @classmethod
- def has_item_of(cls, user):
- """
- Check if the user own some items
- """
- if isinstance(user, IshtarUser):
- ishtaruser = user
- elif hasattr(user, 'ishtaruser'):
- ishtaruser = user.ishtaruser
- else:
- return False
- query = cls.get_query_owns(ishtaruser)
- if not query:
- return False
- return cls.objects.filter(query).count()
-
- @classmethod
- def _return_get_owns(cls, owns, values, get_short_menu_class,
- label_key='cached_label'):
- if not owns:
- return []
- sorted_values = []
- if hasattr(cls, 'BASKET_MODEL'):
- owns_len = len(owns)
- for idx, item in enumerate(reversed(owns)):
- if get_short_menu_class:
- item = item[0]
- if type(item) == cls.BASKET_MODEL:
- basket = owns.pop(owns_len - idx - 1)
- sorted_values.append(basket)
- sorted_values = list(reversed(sorted_values))
- if not values:
- if not get_short_menu_class:
- return sorted_values + list(
- sorted(owns, key=lambda x: getattr(x, label_key) or ""))
- return sorted_values + list(
- sorted(owns, key=lambda x: getattr(x[0], label_key) or ""))
- if not get_short_menu_class:
- return sorted_values + list(
- sorted(owns, key=lambda x: x[label_key] or ""))
- return sorted_values + list(
- sorted(owns, key=lambda x: x[0][label_key] or ""))
-
- @classmethod
- def get_owns(cls, user, replace_query=None, limit=None, values=None,
- get_short_menu_class=False, menu_filtr=None):
- """
- Get Own items
- """
- if not replace_query:
- replace_query = {}
- if hasattr(user, 'is_authenticated') and not user.is_authenticated():
- returned = cls.objects.filter(pk__isnull=True)
- if values:
- returned = []
- return returned
- if isinstance(user, User):
- try:
- ishtaruser = IshtarUser.objects.get(user_ptr=user)
- except IshtarUser.DoesNotExist:
- returned = cls.objects.filter(pk__isnull=True)
- if values:
- returned = []
- return returned
- elif isinstance(user, IshtarUser):
- ishtaruser = user
- else:
- if values:
- return []
- return cls.objects.filter(pk__isnull=True)
- items = []
- if hasattr(cls, 'BASKET_MODEL'):
- items = list(cls.BASKET_MODEL.objects.filter(user=ishtaruser).all())
- query = cls.get_query_owns(ishtaruser)
- if not query and not replace_query:
- returned = cls.objects.filter(pk__isnull=True)
- if values:
- returned = []
- return returned
- if query:
- q = cls.objects.filter(query)
- else: # replace_query
- q = cls.objects.filter(replace_query)
- if values:
- q = q.values(*values)
- if limit:
- items += list(q.order_by('-pk')[:limit])
- else:
- items += list(q.order_by(*cls._meta.ordering).all())
- if get_short_menu_class:
- if values:
- if 'id' not in values:
- raise NotImplementedError(
- "Call of get_owns with get_short_menu_class option and"
- " no 'id' in values is not implemented")
- my_items = []
- for i in items:
- if hasattr(cls, 'BASKET_MODEL') and \
- type(i) == cls.BASKET_MODEL:
- dct = dict([(k, getattr(i, k)) for k in values])
- my_items.append(
- (dct, cls.BASKET_MODEL.get_short_menu_class(i.pk)))
- else:
- my_items.append((i, cls.get_short_menu_class(i['id'])))
- items = my_items
- else:
- items = [(i, cls.get_short_menu_class(i.pk)) for i in items]
- return items
-
- @classmethod
- def _get_query_owns_dicts(cls, ishtaruser):
- """
- List of query own dict to construct the query.
- Each dict are join with an AND operator, each dict key, values are
- joined with OR operator
- """
- return []
-
- @classmethod
- def _construct_query_own(cls, prefix, dct_list):
- q = None
- for subquery_dict in dct_list:
- subquery = None
- for k in subquery_dict:
- subsubquery = Q(**{prefix + k: subquery_dict[k]})
- if subquery:
- subquery |= subsubquery
- else:
- subquery = subsubquery
- if not subquery:
- continue
- if q:
- q &= subquery
- else:
- q = subquery
- return q
-
-
-class CachedGen(object):
- @classmethod
- def refresh_cache(cls):
- raise NotImplementedError()
-
- @classmethod
- def _add_cache_key_to_refresh(cls, keys):
- cache_ckey, current_keys = get_cache(cls, ['_current_keys'])
- if type(current_keys) != list:
- current_keys = []
- if keys not in current_keys:
- current_keys.append(keys)
- cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT)
-
-
-class Cached(CachedGen):
- slug_field = 'txt_idx'
-
- @classmethod
- def refresh_cache(cls):
- cache_ckey, current_keys = get_cache(cls, ['_current_keys'])
- if not current_keys:
- return
- for keys in current_keys:
- if len(keys) == 2 and keys[0] == '__slug':
- cls.get_cache(keys[1], force=True)
- elif keys[0] == '__get_types':
- default = None
- empty_first = True
- exclude = []
- if len(keys) >= 2:
- default = keys.pop()
- if len(keys) > 1:
- empty_first = bool(keys.pop())
- exclude = keys[1:]
- cls.get_types(
- exclude=exclude, empty_first=empty_first, default=default,
- force=True)
- elif keys[0] == '__get_help':
- cls.get_help(force=True)
-
- @classmethod
- def _add_cache_key_to_refresh(cls, keys):
- cache_ckey, current_keys = get_cache(cls, ['_current_keys'])
- if type(current_keys) != list:
- current_keys = []
- if keys not in current_keys:
- current_keys.append(keys)
- cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT)
-
- @classmethod
- def get_cache(cls, slug, force=False):
- cache_key, value = get_cache(cls, ['__slug', slug])
- if not force and value:
- return value
- try:
- k = {cls.slug_field: slug}
- obj = cls.objects.get(**k)
- cache.set(cache_key, obj, settings.CACHE_TIMEOUT)
- return obj
- except cls.DoesNotExist:
- cache.set(cache_key, None, settings.CACHE_TIMEOUT)
- return None
-
-
-@disable_for_loaddata
-def post_save_cache(sender, **kwargs):
- sender.refresh_cache()
-
-
-class GeneralType(Cached, models.Model):
- """
- Abstract class for "types"
- """
- label = models.TextField(_("Label"))
- txt_idx = models.TextField(
- _("Textual ID"), validators=[validate_slug],
- unique=True,
- help_text=_(
- "The slug is the standardized version of the name. It contains "
- "only lowercase letters, numbers and hyphens. Each slug must "
- "be unique."))
- comment = models.TextField(_("Comment"), blank=True, null=True)
- available = models.BooleanField(_("Available"), default=True)
- HELP_TEXT = ""
- objects = TypeManager()
-
- class Meta:
- abstract = True
-
- def __str__(self):
- return self.label
-
- def natural_key(self):
- return (self.txt_idx,)
-
- def history_compress(self):
- return self.txt_idx
-
- @classmethod
- def history_decompress(cls, value, create=False):
- if not value:
- return []
- res = []
- for txt_idx in value:
- try:
- res.append(cls.objects.get(txt_idx=txt_idx))
- except cls.DoesNotExist:
- continue
- return res
-
- @property
- def explicit_label(self):
- return "{} ({})".format(self.label, self._meta.verbose_name)
-
- @classmethod
- def create_default_for_test(cls):
- return [cls.objects.create(label='Test %d' % i) for i in range(5)]
-
- @property
- def short_label(self):
- return self.label
-
- @property
- def name(self):
- return self.label
-
- @classmethod
- def get_or_create(cls, slug, label=''):
- """
- Get or create a new item.
-
- :param slug: textual id
- :param label: label for initialization if the item doesn't exist (not
- mandatory)
-
- :return: instancied item of the base class
- """
-
- item = cls.get_cache(slug)
- if item:
- return item
- if isinstance(slug, list):
- slug = slug[0]
- item, __ = cls.objects.get_or_create(
- txt_idx=slug, defaults={'label': label})
- return item
-
- @classmethod
- def get_or_create_pk(cls, slug):
- """
- Get an id from a slug. Create the associated item if needed.
-
- :param slug: textual id
-
- :return: id of the item (string)
- """
- return str(cls.get_or_create(slug).pk)
-
- @classmethod
- def get_or_create_pks(cls, slugs):
- """
- Get and merge a list of ids from a slug list. Create the associated
- items if needed.
-
- :param slugs: textual ids
-
- :return: string with ids separated by "_"
- """
- items = []
- for slug in slugs:
- items.append(str(cls.get_or_create(slug).pk))
- return "_".join(items)
-
- @classmethod
- def get_help(cls, dct=None, exclude=None, force=False, full_hierarchy=None):
- if not dct:
- dct = {}
- if not exclude:
- exclude = []
- keys = ['__get_help']
- keys += ["{}".format(ex) for ex in exclude]
- keys += ['{}-{}'.format(str(k), dct[k]) for k in dct]
- cache_key, value = get_cache(cls, keys)
- if value and not force:
- return mark_safe(value)
- help_text = cls.HELP_TEXT
- c_rank = -1
- help_items = "\n"
- for item in cls.get_types(dct=dct, instances=True, exclude=exclude):
- if hasattr(item, '__iter__'):
- pk = item[0]
- item = cls.objects.get(pk=pk)
- item.rank = c_rank + 1
- if hasattr(item, 'parent'):
- c_item = item
- parents = []
- while c_item.parent:
- parents.append(c_item.parent.label)
- c_item = c_item.parent
- parents.reverse()
- parents.append(item.label)
- item.label = " / ".join(parents)
- if not item.comment:
- continue
- if c_rank > item.rank:
- help_items += "</dl>\n"
- elif c_rank < item.rank:
- help_items += "<dl>\n"
- c_rank = item.rank
- help_items += "<dt>%s</dt><dd>%s</dd>" % (
- item.label, "<br/>".join(item.comment.split('\n')))
- c_rank += 1
- if c_rank:
- help_items += c_rank * "</dl>"
- if help_text or help_items != u'\n':
- help_text = help_text + help_items
- else:
- help_text = ""
- cache.set(cache_key, help_text, settings.CACHE_TIMEOUT)
- return mark_safe(help_text)
-
- @classmethod
- def _get_initial_types(cls, initial, type_pks, instance=False):
- new_vals = []
- if not initial:
- return []
- if type(initial) not in (list, tuple):
- initial = [initial]
- for value in initial:
- try:
- pk = int(value)
- except (ValueError, TypeError):
- continue
- if pk in type_pks:
- continue
- try:
- extra_type = cls.objects.get(pk=pk)
- if instance:
- new_vals.append(extra_type)
- else:
- new_vals.append((extra_type.pk, str(extra_type)))
- except cls.DoesNotExist:
- continue
- return new_vals
-
- @classmethod
- def get_types(cls, dct=None, instances=False, exclude=None,
- empty_first=True, default=None, initial=None, force=False,
- full_hierarchy=False):
- if not dct:
- dct = {}
- if not exclude:
- exclude = []
- types = []
- if not instances and empty_first and not default:
- types = [('', '--')]
- types += cls._pre_get_types(dct, instances, exclude,
- default, force,
- get_full_hierarchy=full_hierarchy)
- if not initial:
- return types
- new_vals = cls._get_initial_types(initial, [idx for idx, lbl in types])
- types += new_vals
- return types
-
- @classmethod
- def _pre_get_types(cls, dct=None, instances=False, exclude=None,
- default=None, force=False, get_full_hierarchy=False):
- if not dct:
- dct = {}
- if not exclude:
- exclude = []
- # cache
- cache_key = None
- if not instances:
- keys = ['__get_types']
- keys += ["{}".format(ex) for ex in exclude] + \
- ["{}".format(default)]
- keys += ['{}-{}'.format(str(k), dct[k]) for k in dct]
- cache_key, value = get_cache(cls, keys)
- if value and not force:
- return value
- base_dct = dct.copy()
- if hasattr(cls, 'parent'):
- if not cache_key:
- return cls._get_parent_types(
- base_dct, instances, exclude=exclude,
- default=default, get_full_hierarchy=get_full_hierarchy)
- vals = [v for v in cls._get_parent_types(
- base_dct, instances, exclude=exclude,
- default=default, get_full_hierarchy=get_full_hierarchy)]
- cache.set(cache_key, vals, settings.CACHE_TIMEOUT)
- return vals
-
- if not cache_key:
- return cls._get_types(base_dct, instances, exclude=exclude,
- default=default)
- vals = [
- v for v in cls._get_types(base_dct, instances, exclude=exclude,
- default=default)
- ]
- cache.set(cache_key, vals, settings.CACHE_TIMEOUT)
- return vals
-
- @classmethod
- def _get_types(cls, dct=None, instances=False, exclude=None, default=None):
- if not dct:
- dct = {}
- if not exclude:
- exclude = []
- dct['available'] = True
- if default:
- try:
- default = cls.objects.get(txt_idx=default)
- yield (default.pk, _(str(default)))
- except cls.DoesNotExist:
- pass
- items = cls.objects.filter(**dct)
- if default and default != "None":
- if hasattr(default, 'txt_idx'):
- exclude.append(default.txt_idx)
- else:
- exclude.append(default)
- if exclude:
- items = items.exclude(txt_idx__in=exclude)
- for item in items.order_by(*cls._meta.ordering).all():
- if instances:
- item.rank = 0
- yield item
- else:
- yield (item.pk, _(str(item)) if item and str(item) else '')
-
- @classmethod
- def _get_childs_list(cls, dct=None, exclude=None, instances=False):
- if not dct:
- dct = {}
- if not exclude:
- exclude = []
- if 'parent' in dct:
- dct.pop('parent')
- childs = cls.objects.filter(**dct)
- if exclude:
- childs = childs.exclude(txt_idx__in=exclude)
- if hasattr(cls, 'order'):
- childs = childs.order_by('order')
- res = {}
- if instances:
- for item in childs.all():
- parent_id = item.parent_id or 0
- if parent_id not in res:
- res[parent_id] = []
- res[parent_id].append(item)
- else:
- for item in childs.values("id", "parent_id", "label").all():
- parent_id = item["parent_id"] or 0
- if item["id"] == item["parent_id"]:
- parent_id = 0
- if parent_id not in res:
- res[parent_id] = []
- res[parent_id].append((item["id"], item["label"]))
- return res
-
- PREFIX = "&#x2502; "
- PREFIX_EMPTY = "&nbsp; "
- PREFIX_MEDIUM = "&#x251C; "
- PREFIX_LAST = "&#x2514; "
- PREFIX_CODES = ["\u2502", "\u251C", "\u2514"]
-
- @classmethod
- def _get_childs(cls, item, child_list, prefix=0, instances=False,
- is_last=False, last_of=None, get_full_hierarchy=False):
- if not last_of:
- last_of = []
-
- prefix += 1
- current_child_lst = []
- if item in child_list:
- current_child_lst = child_list[item]
-
- lst = []
- total = len(current_child_lst)
- full_hierarchy_initial = get_full_hierarchy
- for idx, child in enumerate(current_child_lst):
- mylast_of = last_of[:]
- p = ''
- if instances:
- child.rank = prefix
- lst.append(child)
- else:
- if full_hierarchy_initial:
- if isinstance(full_hierarchy_initial, str):
- p = full_hierarchy_initial + " > "
- else:
- p = ""
- else:
- cprefix = prefix
- while cprefix:
- cprefix -= 1
- if not cprefix:
- if (idx + 1) == total:
- p += cls.PREFIX_LAST
- else:
- p += cls.PREFIX_MEDIUM
- elif is_last:
- if mylast_of:
- clast = mylast_of.pop(0)
- if clast:
- p += cls.PREFIX_EMPTY
- else:
- p += cls.PREFIX
- else:
- p += cls.PREFIX_EMPTY
- else:
- p += cls.PREFIX
- lst.append((
- child[0], SafeText(p + str(_(child[1])))
- ))
- clast_of = last_of[:]
- clast_of.append(idx + 1 == total)
- if instances:
- child_id = child.id
- else:
- child_id = child[0]
- if get_full_hierarchy:
- if p:
- if not p.endswith(" > "):
- p += " > "
- get_full_hierarchy = p + child[1]
- else:
- get_full_hierarchy = child[1]
- for sub_child in cls._get_childs(
- child_id, child_list, prefix, instances,
- is_last=((idx + 1) == total), last_of=clast_of,
- get_full_hierarchy=get_full_hierarchy):
- lst.append(sub_child)
- return lst
-
- @classmethod
- def _get_parent_types(cls, dct=None, instances=False, exclude=None,
- default=None, get_full_hierarchy=False):
- if not dct:
- dct = {}
- if not exclude:
- exclude = []
- dct['available'] = True
- child_list = cls._get_childs_list(dct, exclude, instances)
-
- if 0 in child_list:
- for item in child_list[0]:
- if instances:
- item.rank = 0
- item_id = item.pk
- yield item
- else:
- item_id = item[0]
- yield item
- if get_full_hierarchy:
- get_full_hierarchy = item[1]
- for child in cls._get_childs(
- item_id, child_list, instances=instances,
- get_full_hierarchy=get_full_hierarchy):
- yield child
-
- def save(self, *args, **kwargs):
- if not self.id and not self.label:
- txt_idx = self.txt_idx
- if isinstance(txt_idx, list):
- txt_idx = txt_idx[0]
- self.txt_idx = txt_idx
- self.label = " ".join(" ".join(self.txt_idx.split('-'))
- .split('_')).title()
- if not self.txt_idx:
- self.txt_idx = slugify(self.label)[:100]
-
- # clean old keys
- if self.pk:
- old = self.__class__.objects.get(pk=self.pk)
- content_type = ContentType.objects.get_for_model(self.__class__)
- if slugify(self.label) != slugify(old.label):
- ItemKey.objects.filter(
- object_id=self.pk, key=slugify(old.label),
- content_type=content_type).delete()
- if self.txt_idx != old.txt_idx:
- ItemKey.objects.filter(
- object_id=self.pk, key=old.txt_idx,
- content_type=content_type).delete()
-
- obj = super(GeneralType, self).save(*args, **kwargs)
- self.generate_key(force=True)
- return obj
-
- def add_key(self, key, force=False, importer=None, group=None,
- user=None):
- content_type = ContentType.objects.get_for_model(self.__class__)
- if not importer and not force and ItemKey.objects.filter(
- key=key, content_type=content_type).count():
- return
- filtr = {'key': key, 'content_type': content_type}
- if group:
- filtr['group'] = group
- elif user:
- filtr['user'] = user
- else:
- filtr['importer'] = importer
- if force:
- ItemKey.objects.filter(**filtr).exclude(object_id=self.pk).delete()
- filtr['object_id'] = self.pk
- ItemKey.objects.get_or_create(**filtr)
-
- def generate_key(self, force=False):
- for key in (slugify(self.label), self.txt_idx):
- self.add_key(key)
-
- def get_keys(self, importer):
- keys = [self.txt_idx]
- content_type = ContentType.objects.get_for_model(self.__class__)
- base_q = Q(content_type=content_type, object_id=self.pk)
- subquery = Q(importer__isnull=True, user__isnull=True,
- group__isnull=True)
- subquery |= Q(user__isnull=True, group__isnull=True,
- importer=importer)
- if importer.user:
- subquery |= Q(user=importer.user, group__isnull=True,
- importer=importer)
- if importer.associated_group:
- subquery |= Q(user__isnull=True, group=importer.associated_group,
- importer=importer)
- q = ItemKey.objects.filter(base_q & subquery)
- for ik in q.exclude(key=self.txt_idx).all():
- keys.append(ik.key)
- return keys
-
- @classmethod
- def generate_keys(cls):
- # content_type = ContentType.objects.get_for_model(cls)
- for item in cls.objects.all():
- item.generate_key()
-
-
def get_general_type_label(model, slug):
obj = model.get_cache(slug)
if not obj:
@@ -1125,23 +302,6 @@ def get_general_type_label(model, slug):
return str(obj)
-class HierarchicalType(GeneralType):
- parent = models.ForeignKey('self', blank=True, null=True,
- on_delete=models.SET_NULL,
- verbose_name=_("Parent"))
-
- class Meta:
- abstract = True
-
- def full_label(self):
- lbls = [self.label]
- item = self
- while item.parent:
- item = item.parent
- lbls.append(item.label)
- return " > ".join(reversed(lbls))
-
-
class TinyUrl(models.Model):
CHAR_MAP = string.ascii_letters + string.digits
CHAR_MAP_LEN = len(CHAR_MAP)
@@ -1183,24 +343,6 @@ class ItemKey(models.Model):
return self.key
-def get_image_path(instance, filename):
- # when using migrations instance is not a real ImageModel instance
- if not hasattr(instance, '_get_image_path'):
- n = datetime.datetime.now()
- return "upload/{}/{:02d}/{:02d}/{}".format(
- n.year, n.month, n.day, filename)
- return instance._get_image_path(filename)
-
-
-class ImageContainerModel(object):
- def _get_image_path(self, filename):
- return "{}/{}".format(self._get_base_image_path(), filename)
-
- def _get_base_image_path(self):
- n = datetime.datetime.now()
- return "upload/{}/{:02d}/{:02d}".format(n.year, n.month, n.day)
-
-
class ImageModel(models.Model, ImageContainerModel):
image = models.ImageField(upload_to=get_image_path, blank=True, null=True,
max_length=255, help_text=max_size_help())
@@ -1286,17 +428,6 @@ class ImageModel(models.Model, ImageContainerModel):
)
-class HistoryError(Exception):
- def __init__(self, value):
- self.value = value
-
- def __str__(self):
- return repr(self.value)
-
-
-PRIVATE_FIELDS = ('id', 'history_modifier', 'order', 'uuid')
-
-
class BulkUpdatedItem(object):
@classmethod
def bulk_recursion(cls, transaction_id, extra_args):
@@ -1456,1071 +587,6 @@ class JsonDataField(models.Model):
_("Content types of the field and of the menu do not match"))
-class JsonData(models.Model, CachedGen):
- data = JSONField(default={}, blank=True)
-
- class Meta:
- abstract = True
-
- def pre_save(self):
- if not self.data:
- self.data = {}
-
- @property
- def json_sections(self):
- sections = []
- try:
- content_type = ContentType.objects.get_for_model(self)
- except ContentType.DoesNotExists:
- return sections
- fields = list(JsonDataField.objects.filter(
- content_type=content_type, display=True, section__isnull=True
- ).all()) # no section fields
-
- fields += list(JsonDataField.objects.filter(
- content_type=content_type, display=True, section__isnull=False
- ).order_by('section__order', 'order').all())
-
- for field in fields:
- value = None
- data = self.data.copy()
- for key in field.key.split('__'):
- if key in data:
- value = copy.copy(data[key])
- data = data[key]
- else:
- value = None
- break
- if value is None:
- continue
- if type(value) in (list, tuple):
- value = " ; ".join([str(v) for v in value])
- section_name = field.section.name if field.section else None
- if not sections or section_name != sections[-1][0]:
- # if section name is identical it is the same
- sections.append((section_name, []))
- sections[-1][1].append((field.name, value))
- return sections
-
- @classmethod
- def refresh_cache(cls):
- __, refreshed = get_cache(cls, ['cache_refreshed'])
- if refreshed and time.time() - refreshed < 1:
- return
- cache_ckey, current_keys = get_cache(cls, ['_current_keys'])
- if not current_keys:
- return
- for keys in current_keys:
- if keys[0] == '__get_dynamic_choices':
- cls._get_dynamic_choices(keys[1], force=True)
-
- @classmethod
- def _get_dynamic_choices(cls, key, force=False):
- """
- Get choice from existing values
- :param key: data key
- :param force: if set to True do not use cache
- :return: tuple of choices (id, value)
- """
- cache_key, value = get_cache(cls, ['__get_dynamic_choices', key])
- if not force and value:
- return value
- choices = set()
- splitted_key = key[len('data__'):].split('__')
- q = cls.objects.filter(
- data__has_key=key[len('data__'):]).values_list('data', flat=True)
- for value in q.all():
- for k in splitted_key:
- value = value[k]
- choices.add(value)
- choices = [('', '')] + [(v, v) for v in sorted(list(choices))]
- cache.set(cache_key, choices, settings.CACHE_SMALLTIMEOUT)
- return choices
-
-
-class Imported(models.Model):
- imports = models.ManyToManyField(
- Import, blank=True,
- related_name="imported_%(app_label)s_%(class)s")
-
- class Meta:
- abstract = True
-
-
-class SearchAltName(object):
- def __init__(self, search_key, search_query, extra_query=None,
- distinct_query=False):
- self.search_key = search_key
- self.search_query = search_query
- self.extra_query = extra_query or {}
- self.distinct_query = distinct_query
-
-
-class DynamicRequest(object):
- def __init__(self, label, app_name, model_name, form_key, search_key,
- type_query, search_query):
- self.label = label
- self.form_key = form_key
- self.search_key = search_key
- self.app_name = app_name
- self.model_name = model_name
- self.type_query = type_query
- self.search_query = search_query
-
- def get_all_types(self):
- model = apps.get_app_config(self.app_name).get_model(self.model_name)
- return model.objects.filter(available=True)
-
- def get_form_fields(self):
- fields = {}
- for item in self.get_all_types().all():
- fields[self.form_key + "-" + item.txt_idx] = forms.CharField(
- label=str(self.label) + " " + str(item),
- required=False
- )
- return fields
-
- def get_extra_query(self, slug):
- return {
- self.type_query: slug
- }
-
- def get_alt_names(self):
- alt_names = {}
- for item in self.get_all_types().all():
- alt_names[self.form_key + "-" + item.txt_idx] = SearchAltName(
- self.search_key + "-" + item.txt_idx, self.search_query,
- self.get_extra_query(item.txt_idx), distinct_query=True
- )
- return alt_names
-
-
-class SearchVectorConfig(object):
- def __init__(self, key, language=None, func=None):
- self.key = key
- if language:
- self.language = language
- if language == "local":
- self.language = settings.ISHTAR_SEARCH_LANGUAGE
- else:
- self.language = "simple"
- self.func = func
-
- def format(self, value):
- if value == 'None':
- value = ''
- if not self.func:
- return [value]
- return self.func(value)
-
-
-class FullSearch(models.Model):
- search_vector = SearchVectorField(_("Search vector"), blank=True, null=True,
- help_text=_("Auto filled at save"))
-
- EXTRA_REQUEST_KEYS = {}
- DYNAMIC_REQUESTS = {}
- ALT_NAMES = {}
-
- BASE_SEARCH_VECTORS = []
- PROPERTY_SEARCH_VECTORS = []
- INT_SEARCH_VECTORS = []
- M2M_SEARCH_VECTORS = []
- PARENT_SEARCH_VECTORS = []
- # prevent circular dependency
- PARENT_ONLY_SEARCH_VECTORS = []
-
- class Meta:
- abstract = True
-
- @classmethod
- def general_types(cls):
- for k in get_all_field_names(cls):
- field = cls._meta.get_field(k)
- if not hasattr(field, 'rel') or not field.rel:
- continue
- rel_model = field.rel.to
- if issubclass(rel_model, (GeneralType, HierarchicalType)):
- yield k
-
- @classmethod
- def get_alt_names(cls):
- alt_names = cls.ALT_NAMES.copy()
- for dr_k in cls.DYNAMIC_REQUESTS:
- alt_names.update(cls.DYNAMIC_REQUESTS[dr_k].get_alt_names())
- return alt_names
-
- @classmethod
- def get_query_parameters(cls):
- query_parameters = {}
- for v in cls.get_alt_names().values():
- for language_code, language_lbl in settings.LANGUAGES:
- activate(language_code)
- query_parameters[str(v.search_key)] = v
- deactivate()
- return query_parameters
-
- def _update_search_field(self, search_vector_conf, search_vectors, data):
- for value in search_vector_conf.format(data):
- with connection.cursor() as cursor:
- cursor.execute("SELECT to_tsvector(%s, %s)", [
- search_vector_conf.language, value])
- row = cursor.fetchone()
- search_vectors.append(row[0])
-
- def _update_search_number_field(self, search_vectors, val):
- search_vectors.append("'{}':1".format(val))
-
- def update_search_vector(self, save=True, exclude_parent=False):
- """
- Update the search vector
- :param save: True if you want to save the object immediately
- :return: True if modified
- """
- if not hasattr(self, 'search_vector'):
- return
- if not self.pk:
- # logger.warning("Cannot update search vector before save or "
- # "after deletion.")
- return
- if not self.BASE_SEARCH_VECTORS and not self.M2M_SEARCH_VECTORS \
- and not self.INT_SEARCH_VECTORS \
- and not self.PROPERTY_SEARCH_VECTORS \
- and not self.PARENT_SEARCH_VECTORS:
- logger.warning("No search_vectors defined for {}".format(
- self.__class__))
- return
- if getattr(self, '_search_updated', None):
- return
- self._search_updated = True
-
- old_search = ""
- if self.search_vector:
- old_search = self.search_vector[:]
- search_vectors = []
- base_q = self.__class__.objects.filter(pk=self.pk)
-
- # many to many have to be queried one by one otherwise only one is fetch
- for m2m_search_vector in self.M2M_SEARCH_VECTORS:
- key = m2m_search_vector.key.split('__')[0]
- rel_key = getattr(self, key)
- for item in rel_key.values('pk').all():
- query_dct = {key + "__pk": item['pk']}
- q = copy.copy(base_q).filter(**query_dct)
- q = q.annotate(
- search=SearchVector(
- m2m_search_vector.key,
- config=m2m_search_vector.language)
- ).values('search')
- search_vectors.append(q.all()[0]['search'])
-
- # int/float are not well managed by the SearchVector
- for int_search_vector in self.INT_SEARCH_VECTORS:
- q = base_q.values(int_search_vector.key)
- for val in int_search_vector.format(
- q.all()[0][int_search_vector.key]):
- self._update_search_number_field(search_vectors, val)
-
- if not exclude_parent:
- # copy parent vector fields
- for PARENT_SEARCH_VECTOR in self.PARENT_SEARCH_VECTORS:
- parent = getattr(self, PARENT_SEARCH_VECTOR)
- if hasattr(parent, 'all'): # m2m
- for p in parent.all():
- search_vectors.append(p.search_vector)
- elif parent:
- search_vectors.append(parent.search_vector)
-
- for PARENT_ONLY_SEARCH_VECTOR in self.PARENT_ONLY_SEARCH_VECTORS:
- parent = getattr(self, PARENT_ONLY_SEARCH_VECTOR)
- if hasattr(parent, 'all'): # m2m
- for p in parent.all():
- search_vectors.append(
- p.update_search_vector(save=False, exclude_parent=True)
- )
- elif parent:
- search_vectors.append(
- parent.update_search_vector(save=False, exclude_parent=True)
- )
-
- if self.BASE_SEARCH_VECTORS:
- # query "simple" fields
- q = base_q.values(*[sv.key for sv in self.BASE_SEARCH_VECTORS])
- res = q.all()[0]
- for base_search_vector in self.BASE_SEARCH_VECTORS:
- data = res[base_search_vector.key]
- data = unidecode(str(data))
- self._update_search_field(base_search_vector,
- search_vectors, data)
-
- if self.PROPERTY_SEARCH_VECTORS:
- for property_search_vector in self.PROPERTY_SEARCH_VECTORS:
- data = getattr(self, property_search_vector.key)
- if callable(data):
- data = data()
- if not data:
- continue
- data = str(data)
- self._update_search_field(property_search_vector,
- search_vectors, data)
-
- if hasattr(self, 'data') and self.data:
- content_type = ContentType.objects.get_for_model(self)
- for json_field in JsonDataField.objects.filter(
- content_type=content_type,
- search_index=True).all():
- data = copy.deepcopy(self.data)
- no_data = False
- for key in json_field.key.split('__'):
- if key not in data:
- no_data = True
- break
- data = data[key]
- if no_data or not data:
- continue
-
- if json_field.value_type == 'B':
- if data is True:
- data = json_field.name
- else:
- continue
- elif json_field.value_type in ('I', 'F'):
- self._update_search_number_field(search_vectors, data)
- continue
- elif json_field.value_type == 'D':
- # only index year
- self._update_search_number_field(search_vectors, data.year)
- continue
- for lang in ("simple", settings.ISHTAR_SEARCH_LANGUAGE):
- with connection.cursor() as cursor:
- cursor.execute("SELECT to_tsvector(%s, %s)",
- [lang, data])
- row = cursor.fetchone()
- search_vectors.append(row[0])
- new_search_vector = merge_tsvectors(search_vectors)
- changed = old_search != new_search_vector
- self.search_vector = new_search_vector
- if save and changed:
- self.__class__.objects.filter(pk=self.pk).update(
- search_vector=new_search_vector)
- elif not save:
- return new_search_vector
- return changed
-
-
-class FixAssociated(object):
- ASSOCIATED = {}
-
- def fix_associated(self):
- for key in self.ASSOCIATED:
- item = getattr(self, key)
- if not item:
- continue
- dct = self.ASSOCIATED[key]
- for dct_key in dct:
- subkey, ctype = dct_key
- expected_values = dct[dct_key]
- if not isinstance(expected_values, (list, tuple)):
- expected_values = [expected_values]
- if hasattr(ctype, "txt_idx"):
- try:
- expected_values = [ctype.objects.get(txt_idx=v)
- for v in expected_values]
- except ctype.DoesNotExist:
- # type not yet initialized
- return
- current_vals = getattr(item, subkey)
- is_many = False
- if hasattr(current_vals, "all"):
- is_many = True
- current_vals = current_vals.all()
- else:
- current_vals = [current_vals]
- is_ok = False
- for current_val in current_vals:
- if current_val in expected_values:
- is_ok = True
- break
- if is_ok:
- continue
- # the first value is used
- new_value = expected_values[0]
- if is_many:
- getattr(item, subkey).add(new_value)
- else:
- setattr(item, subkey, new_value)
-
-
-class QRCodeItem(models.Model, ImageContainerModel):
- HAS_QR_CODE = True
- qrcode = models.ImageField(upload_to=get_image_path, blank=True, null=True,
- max_length=255)
-
- class Meta:
- abstract = True
-
- @property
- def qrcode_path(self):
- if not self.qrcode:
- self.generate_qrcode()
- if not self.qrcode: # error on qrcode generation
- return ""
- return self.qrcode.path
-
- def generate_qrcode(self, request=None, secure=True, tmpdir=None):
- url = self.get_absolute_url()
- site = Site.objects.get_current()
- if request:
- scheme = request.scheme
- else:
- if secure:
- scheme = "https"
- else:
- scheme = "http"
- url = scheme + "://" + site.domain + url
- tiny_url = TinyUrl()
- tiny_url.link = url
- tiny_url.save()
- short_url = scheme + "://" + site.domain + reverse(
- 'tiny-redirect', args=[tiny_url.get_short_id()])
- qr = pyqrcode.create(short_url, version=settings.ISHTAR_QRCODE_VERSION)
- tmpdir_created = False
- if not tmpdir:
- tmpdir = tempfile.mkdtemp("-qrcode")
- tmpdir_created = True
- filename = tmpdir + os.sep + 'qrcode.png'
- qr.png(filename, scale=settings.ISHTAR_QRCODE_SCALE)
- with open(filename, 'rb') as qrfile:
- self.qrcode.save("qrcode.png", File(qrfile))
- self.skip_history_when_saving = True
- self._no_move = True
- self.save()
- if tmpdir_created:
- shutil.rmtree(tmpdir)
-
-
-class DocumentItem(object):
- ALT_NAMES = {
- 'documents__image__isnull':
- SearchAltName(
- pgettext_lazy("key for text search", "has-image"),
- 'documents__image__isnull'),
- 'documents__associated_url__isnull':
- SearchAltName(
- pgettext_lazy("key for text search", "has-url"),
- 'documents__associated_url__isnull'),
- 'documents__associated_file__isnull':
- SearchAltName(
- pgettext_lazy("key for text search", "has-attached-file"),
- 'documents__associated_file__isnull'),
- }
-
- def public_representation(self):
- images = []
- if getattr(self, "main_image", None):
- images.append(self.main_image.public_representation())
- images += [
- image.public_representation()
- for image in self.images_without_main_image.all()
- ]
- return {"images": images}
-
- @property
- def images(self):
- if not hasattr(self, 'documents'):
- return Document.objects.none()
- return self.documents.filter(
- image__isnull=False).exclude(image="").order_by("pk")
-
- @property
- def images_without_main_image(self):
- if not hasattr(self, 'main_image') or not hasattr(self, 'documents'):
- return self.images
- if not self.main_image:
- return self.documents.filter(
- image__isnull=False).exclude(
- image="").order_by("pk")
- return self.documents.filter(
- image__isnull=False).exclude(
- image="").exclude(pk=self.main_image.pk).order_by("pk")
-
- def get_extra_actions(self, request):
- """
- For sheet template: return "Add document / image" action
- """
- # url, base_text, icon, extra_text, extra css class, is a quick action
- try:
- actions = super(DocumentItem, self).get_extra_actions(request)
- except AttributeError:
- actions = []
-
- if not hasattr(self, 'SLUG'):
- return actions
-
- can_add_doc = self.can_do(request, 'add_document')
- if can_add_doc and (
- not hasattr(self, "is_locked") or
- not self.is_locked(request.user)):
- actions += [
- (
- reverse("create-document") + "?{}={}".format(
- self.SLUG, self.pk),
- _("Add document/image"),
- "fa fa-plus",
- _("doc./image"),
- "",
- False
- )
- ]
- return actions
-
-
-class SpatialReferenceSystem(GeneralType):
- order = models.IntegerField(_("Order"), default=10)
- auth_name = models.CharField(
- _("Authority name"), default=u'EPSG', max_length=256)
- srid = models.IntegerField(_("Authority SRID"))
-
- class Meta:
- verbose_name = _("Spatial reference system")
- verbose_name_plural = _("Spatial reference systems")
- ordering = ('label',)
-
-
-post_save.connect(post_save_cache, sender=SpatialReferenceSystem)
-post_delete.connect(post_save_cache, sender=SpatialReferenceSystem)
-
-
-class GeoItem(models.Model):
- GEO_SOURCE = (
- ('T', _("Town")), ('P', _("Precise")), ('M', _("Polygon"))
- )
-
- # gis
- x = models.FloatField(_(u'X'), blank=True, null=True)
- y = models.FloatField(_(u'Y'), blank=True, null=True)
- z = models.FloatField(_(u'Z'), blank=True, null=True)
- estimated_error_x = models.FloatField(_(u'Estimated error for X'),
- blank=True, null=True)
- estimated_error_y = models.FloatField(_(u'Estimated error for Y'),
- blank=True, null=True)
- estimated_error_z = models.FloatField(_(u'Estimated error for Z'),
- blank=True, null=True)
- spatial_reference_system = models.ForeignKey(
- SpatialReferenceSystem, verbose_name=_("Spatial Reference System"),
- blank=True, null=True)
- point = models.PointField(_("Point"), blank=True, null=True, dim=3)
- point_2d = models.PointField(_("Point (2D)"), blank=True, null=True)
- point_source = models.CharField(
- _("Point source"), choices=GEO_SOURCE, max_length=1, blank=True,
- null=True)
- point_source_item = models.CharField(
- _("Point source item"), max_length=100, blank=True, null=True)
- multi_polygon = models.MultiPolygonField(_("Multi polygon"), blank=True,
- null=True)
- multi_polygon_source = models.CharField(
- _("Multi-polygon source"), choices=GEO_SOURCE, max_length=1,
- blank=True, null=True)
- multi_polygon_source_item = models.CharField(
- _("Multi polygon source item"), max_length=100, blank=True, null=True)
-
- GEO_LABEL = ""
-
- class Meta:
- abstract = True
-
- def get_town_centroid(self):
- raise NotImplementedError
-
- def get_town_polygons(self):
- raise NotImplementedError
-
- @property
- def display_coordinates(self):
- if not self.point_2d:
- return ""
- profile = get_current_profile()
- if not profile.display_srs or not profile.display_srs.srid:
- return self.x, self.y
- point = self.point_2d.transform(profile.display_srs.srid, clone=True)
- return round(point.x, 5), round(point.y, 5)
-
- @property
- def display_spatial_reference_system(self):
- profile = get_current_profile()
- if not profile.display_srs or not profile.display_srs.srid:
- return self.spatial_reference_system
- return profile.display_srs
-
- def get_precise_points(self):
- if self.point_source == 'P' and self.point_2d:
- return self.point_2d, self.point, self.point_source_item
-
- def get_precise_polygons(self):
- if self.multi_polygon_source == 'P' and self.multi_polygon:
- return self.multi_polygon, self.multi_polygon_source_item
-
- def most_precise_geo(self):
- if self.point_source == 'M':
- return 'multi_polygon'
- current_source = str(self.__class__._meta.verbose_name)
- if self.multi_polygon_source_item == current_source \
- and (self.multi_polygon_source == "P" or
- self.point_source_item != current_source):
- return 'multi_polygon'
- if self.point_source_item == current_source\
- and self.point_source == 'P':
- return 'point'
- if self.multi_polygon_source == 'P':
- return 'multi_polygon'
- if self.point_source == 'P':
- return 'point'
- if self.multi_polygon:
- return 'multi_polygon'
- if self.point_2d:
- return 'point'
-
- def geo_point_source(self):
- if not self.point_source:
- return ""
- src = "{} - {}".format(
- dict(self.GEO_SOURCE)[self.point_source],
- self.point_source_item
- )
- return src
-
- def geo_polygon_source(self):
- if not self.multi_polygon_source:
- return ""
- src = "{} - {}".format(
- dict(self.GEO_SOURCE)[self.multi_polygon_source],
- self.multi_polygon_source_item
- )
- return src
-
- def _geojson_serialize(self, geom_attr):
- if not hasattr(self, geom_attr):
- return ""
- cached_label_key = 'cached_label'
- if self.GEO_LABEL:
- cached_label_key = self.GEO_LABEL
- if getattr(self, "CACHED_LABELS", None):
- cached_label_key = self.CACHED_LABELS[-1]
- geojson = serialize(
- 'geojson',
- self.__class__.objects.filter(pk=self.pk),
- geometry_field=geom_attr, fields=(cached_label_key,))
- geojson_dct = json.loads(geojson)
- profile = get_current_profile()
- precision = profile.point_precision
-
- features = geojson_dct.pop('features')
- for idx in range(len(features)):
- feature = features[idx]
- lbl = feature['properties'].pop(cached_label_key)
- feature['properties']['name'] = lbl
- feature['properties']['id'] = self.pk
- if precision is not None:
- geom_type = feature["geometry"].get("type", None)
- if geom_type == "Point":
- feature["geometry"]["coordinates"] = [
- round(coord, precision)
- for coord in feature["geometry"]["coordinates"]
- ]
- geojson_dct['features'] = features
- geojson_dct['link_template'] = simple_link_to_window(self).replace(
- '999999', '<pk>'
- )
- geojson = json.dumps(geojson_dct)
- return geojson
-
- @property
- def point_2d_geojson(self):
- return self._geojson_serialize('point_2d')
-
- @property
- def multi_polygon_geojson(self):
- return self._geojson_serialize('multi_polygon')
-
-
-class TemplateItem:
- @classmethod
- def _label_templates_q(cls):
- model_name = "{}.{}".format(
- cls.__module__, cls.__name__)
- q = Q(associated_model__klass=model_name,
- for_labels=True, available=True)
- alt_model_name = model_name.replace(
- "models_finds", "models").replace(
- "models_treatments", "models")
- if alt_model_name != model_name:
- q |= Q(associated_model__klass=model_name,
- for_labels=True, available=True)
- return DocumentTemplate.objects.filter(q)
-
- @classmethod
- def has_label_templates(cls):
- return cls._label_templates_q().count()
-
- @classmethod
- def label_templates(cls):
- return cls._label_templates_q()
-
- def get_extra_templates(self, request):
- cls = self.__class__
- templates = []
- name = str(cls.__name__)
- module = str(cls.__module__)
- if "archaeological_finds" in module:
- if "models_finds" in name or "models_treatments" in name:
- names = [
- name,
- name.replace("models_finds", "models"
- ).replace("models_treatments", "models")
- ]
- else:
- names = [name, name.replace("models", "models_finds"),
- name.replace("models", "models_treatments")]
- else:
- names = [name]
- model_names = [
- "{}.{}".format(module, name) for name in names
- ]
- q = DocumentTemplate.objects.filter(
- associated_model__klass__in=model_names,
- for_labels=False, available=True)
- for template in q.all():
- urlname = "generate-document"
- templates.append(
- (template.name, reverse(
- urlname, args=[template.slug, self.pk]))
- )
- return templates
-
-
-class StatisticItem:
- STATISTIC_MODALITIES = [] # example: "year", "operation_type__label"
- STATISTIC_MODALITIES_OPTIONS = OrderedDict() # example:
- # OrderedDict([('year', _("Year")),
- # ("operation_type__label", _("Operation type"))])
- STATISTIC_SUM_VARIABLE = OrderedDict(
- (("pk", (_("Number"), 1)),)
- ) # example: "Price", "Volume" - the number is a multiplier
-
-
-class CascasdeUpdate:
- DOWN_MODEL_UPDATE = []
-
- def cascade_update(self):
- for down_model in self.DOWN_MODEL_UPDATE:
- if not settings.USE_BACKGROUND_TASK:
- rel = getattr(self, down_model)
- if hasattr(rel.model, "need_update"):
- rel.update(need_update=True)
- continue
- for item in getattr(self, down_model).all():
- cached_label_changed(item.__class__, instance=item)
- if hasattr(item, "point_2d"):
- post_save_geo(item.__class__, instance=item)
-
-
-def duplicate_item(item, user=None, data=None):
- model = item.__class__
- new = model.objects.get(pk=item.pk)
-
- for field in model._meta.fields:
- # pk is in PRIVATE_FIELDS so: new.pk = None and a new
- # item will be created on save
- if field.name == "uuid":
- new.uuid = uuid.uuid4()
- elif field.name in PRIVATE_FIELDS:
- setattr(new, field.name, None)
- if user:
- new.history_user = user
- if data:
- for k in data:
- setattr(new, k, data[k])
- new.save()
-
- # m2m fields
- m2m = [field.name for field in model._meta.many_to_many
- if field.name not in PRIVATE_FIELDS]
- for field in m2m:
- for val in getattr(item, field).all():
- if val not in getattr(new, field).all():
- getattr(new, field).add(val)
- return new
-
-
-class BaseHistorizedItem(StatisticItem, TemplateItem, FullSearch, Imported,
- JsonData, FixAssociated, CascasdeUpdate):
- """
- Historized item with external ID management.
- All historized items are searchable and have a data json field.
- Historized items can be "locked" for edition.
- """
- IS_BASKET = False
- SHOW_URL = None
- EXTERNAL_ID_KEY = ''
- EXTERNAL_ID_DEPENDENCIES = []
- HISTORICAL_M2M = []
-
- history_modifier = models.ForeignKey(
- User, related_name='+', on_delete=models.SET_NULL,
- verbose_name=_("Last editor"), blank=True, null=True)
- history_creator = models.ForeignKey(
- User, related_name='+', on_delete=models.SET_NULL,
- verbose_name=_("Creator"), blank=True, null=True)
- last_modified = models.DateTimeField(auto_now=True)
- history_m2m = JSONField(default={}, blank=True)
- need_update = models.BooleanField(
- verbose_name=_("Need update"), default=False)
- locked = models.BooleanField(
- verbose_name=_("Item locked for edition"), default=False)
- lock_user = models.ForeignKey(
- User, related_name='+', on_delete=models.SET_NULL,
- verbose_name=_("Locked by"), blank=True, null=True)
-
- ALT_NAMES = {
- 'history_creator': SearchAltName(
- pgettext_lazy("key for text search", u"created-by"),
- 'history_creator__ishtaruser__person__cached_label__iexact'
- ),
- 'history_modifier': SearchAltName(
- pgettext_lazy("key for text search", u"modified-by"),
- 'history_modifier__ishtaruser__person__cached_label__iexact'
- ),
- 'modified_before': SearchAltName(
- pgettext_lazy("key for text search", "modified-before"),
- 'last_modified__lte'
- ),
- 'modified_after': SearchAltName(
- pgettext_lazy("key for text search", "modified-after"),
- 'last_modified__gte'
- ),
- }
-
- class Meta:
- abstract = True
-
- @classmethod
- def get_verbose_name(cls):
- return cls._meta.verbose_name
-
- def is_locked(self, user=None):
- if not user:
- return self.locked
- return self.locked and (not self.lock_user or self.lock_user != user)
-
- def merge(self, item, keep_old=False):
- merge_model_objects(self, item, keep_old=keep_old)
-
- def public_representation(self):
- return {}
-
- def duplicate(self, user=None, data=None):
- return duplicate_item(self, user, data)
-
- def update_external_id(self, save=False):
- if not self.EXTERNAL_ID_KEY or (
- self.external_id and
- not getattr(self, 'auto_external_id', False)):
- return
- external_id = get_external_id(self.EXTERNAL_ID_KEY, self)
- if external_id == self.external_id:
- return
- self.auto_external_id = True
- self.external_id = external_id
- self._cached_label_checked = False
- if save:
- self.skip_history_when_saving = True
- self.save()
- return external_id
-
- def get_last_history_date(self):
- q = self.history.values("history_date").order_by('-history_date')
- if not q.count():
- return
- return q.all()[0]['history_date']
-
- def get_previous(self, step=None, date=None, strict=False):
- """
- Get a "step" previous state of the item
- """
- assert step or date
- historized = self.history.all()
- item = None
- if step:
- if len(historized) <= step:
- # silently return the last step if too far in the history
- item = historized[len(historized) - 1]
- else:
- item = historized[step]
- else:
- for step, item in enumerate(historized):
- if item.history_date == date:
- break
- # ended with no match
- if item.history_date != date:
- return
- item._step = step
- if len(historized) != (step + 1):
- item._previous = historized[step + 1].history_date
- else:
- item._previous = None
- if step > 0:
- item._next = historized[step - 1].history_date
- else:
- item._next = None
- item.history_date = historized[step].history_date
- model = self.__class__
- for k in get_all_field_names(model):
- field = model._meta.get_field(k)
- if hasattr(field, 'rel') and field.rel:
- if not hasattr(item, k + '_id'):
- setattr(item, k, getattr(self, k))
- continue
- val = getattr(item, k + '_id')
- if not val:
- setattr(item, k, None)
- continue
- try:
- val = field.rel.to.objects.get(pk=val)
- setattr(item, k, val)
- except ObjectDoesNotExist:
- if strict:
- raise HistoryError("The class %s has no pk %d" % (
- str(field.rel.to), val))
- setattr(item, k, None)
- item.pk = self.pk
- return item
-
- @property
- def last_edition_date(self):
- try:
- return self.history.order_by('-history_date').all()[0].history_date
- except (AttributeError, IndexError):
- return
-
- @property
- def history_creation_date(self):
- try:
- return self.history.order_by('history_date').all()[0].history_date
- except (AttributeError, IndexError):
- return
-
- def rollback(self, date):
- """
- Rollback to a previous state
- """
- to_del, new_item = [], None
- for item in self.history.all():
- if item.history_date == date:
- new_item = item
- break
- to_del.append(item)
- if not new_item:
- raise HistoryError("The date to rollback to doesn't exist.")
- try:
- field_keys = [f.name for f in self._meta.fields]
- for k in field_keys:
- if k != 'id' and hasattr(self, k):
- if not hasattr(new_item, k):
- k = k + "_id"
- setattr(self, k, getattr(new_item, k))
-
- try:
- self.history_modifier = User.objects.get(
- pk=new_item.history_modifier_id)
- except User.ObjectDoesNotExist:
- pass
- self.save()
- saved_m2m = new_item.history_m2m.copy()
- for hist_key in self.HISTORICAL_M2M:
- # after each association m2m is rewrite - force the original
- # to be reset
- new_item.history_m2m = saved_m2m
- values = new_item.m2m_listing(hist_key, create=True) or []
- hist_field = getattr(self, hist_key)
- hist_field.clear()
- for val in values:
- hist_field.add(val)
- # force label regeneration
- self._cached_label_checked = False
- self.save()
- except ObjectDoesNotExist:
- raise HistoryError("The rollback has failed.")
- # clean the obsolete history
- for historized_item in to_del:
- historized_item.delete()
-
- def m2m_listing(self, key):
- return getattr(self, key).all()
-
- def values(self):
- values = {}
- for f in self._meta.fields:
- k = f.name
- if k != 'id':
- values[k] = getattr(self, k)
- return values
-
- def get_absolute_url(self):
- try:
- return reverse('display-item', args=[self.SLUG, self.pk])
- except NoReverseMatch:
- return
-
- def get_show_url(self):
- show_url = self.SHOW_URL
- if not show_url:
- show_url = 'show-' + self.__class__.__name__.lower()
- try:
- return reverse(show_url, args=[self.pk, ''])
- except NoReverseMatch:
- return
-
- @property
- def associated_filename(self):
- if [True for attr in ('get_town_label', 'get_department', 'reference',
- 'short_class_name') if not hasattr(self, attr)]:
- return ''
- items = [slugify(self.get_department()),
- slugify(self.get_town_label()).upper(),
- slugify(self.short_class_name),
- slugify(self.reference),
- slugify(self.name or '').replace('-', '_').capitalize()]
- last_edition_date = self.last_edition_date
- if last_edition_date:
- items.append(last_edition_date.strftime('%Y%m%d'))
- else:
- items.append('00000000')
- return "-".join([str(item) for item in items])
-
- def save(self, *args, **kwargs):
- created = not self.pk
- if not getattr(self, 'skip_history_when_saving', False):
- assert hasattr(self, 'history_modifier')
- if created:
- self.history_creator = self.history_modifier
- # external ID can have related item not available before save
- external_id_updated = kwargs.pop('external_id_updated') \
- if 'external_id_updated' in kwargs else False
- if not created and not external_id_updated:
- self.update_external_id()
- super(BaseHistorizedItem, self).save(*args, **kwargs)
- if created and self.update_external_id():
- # force resave for external ID creation
- self.skip_history_when_saving = True
- self._updated_id = True
- return self.save(external_id_updated=True)
- for dep in self.EXTERNAL_ID_DEPENDENCIES:
- for obj in getattr(self, dep).all():
- obj.update_external_id(save=True)
- self.fix_associated()
- return True
-
-
LOGICAL_TYPES = (
('above', _("Above")),
('below', _("Below")),
@@ -2623,207 +689,10 @@ class SearchQuery(models.Model):
return str(self.label)
-class ShortMenuItem(object):
- """
- Item available in the short menu
- """
- UP_MODEL_QUERY = {}
-
- @classmethod
- def get_short_menu_class(cls, pk):
- return ''
-
- @property
- def short_class_name(self):
- return ""
-
-
-class QuickAction(object):
- """
- Quick action available from tables
- """
-
- def __init__(self, url, icon_class='', text='', target=None, rights=None,
- module=None):
- self.url = url
- self.icon_class = icon_class
- self.text = text
- self.rights = rights
- self.target = target
- self.module = module
- assert self.target in ('one', 'many', None)
-
- def is_available(self, user, session=None, obj=None):
- if self.module and not getattr(get_current_profile(), self.module):
- return False
- if not self.rights: # no restriction
- return True
- if not user or not hasattr(user, 'ishtaruser') or not user.ishtaruser:
- return False
- user = user.ishtaruser
-
- for right in self.rights:
- if user.has_perm(right, session=session, obj=obj):
- return True
- return False
-
- @property
- def rendered_icon(self):
- if not self.icon_class:
- return ""
- return "<i class='{}' aria-hidden='true'></i>".format(self.icon_class)
-
- @property
- def base_url(self):
- if self.target is None:
- url = reverse(self.url)
- else:
- # put arbitrary pk for the target
- url = reverse(self.url, args=[0])
- url = url[:-2] # all quick action url have to finish with the
- # pk of the selected item and a "/"
- return url
-
-
-class MainItem(ShortMenuItem):
- """
- Item with quick actions available from tables
- Extra actions are available from sheets
- """
- QUICK_ACTIONS = []
-
- @classmethod
- def get_quick_actions(cls, user, session=None, obj=None):
- """
- Get a list of (url, title, icon, target) actions for an user
- """
- qas = []
- for action in cls.QUICK_ACTIONS:
- if not action.is_available(user, session=session, obj=obj):
- continue
- qas.append([action.base_url,
- mark_safe(action.text),
- mark_safe(action.rendered_icon),
- action.target or ""])
- return qas
-
- @classmethod
- def get_quick_action_by_url(cls, url):
- for action in cls.QUICK_ACTIONS:
- if action.url == url:
- return action
-
- def regenerate_external_id(self):
- if not hasattr(self, "external_id"):
- return
- self.skip_history_when_saving = True
- self._no_move = True
- if hasattr(self, "auto_external_id"):
- self.external_id = None
- self.save()
-
- def get_extra_actions(self, request):
- if not hasattr(self, 'SLUG'):
- return []
-
- actions = []
- if request.user.is_superuser and hasattr(self, "auto_external_id"):
- actions += [
- (
- reverse("regenerate-external-id") + "?{}={}".format(
- self.SLUG, self.pk),
- _("Regenerate ID"),
- "fa fa-key",
- _("regen."),
- "",
- True
- )
- ]
-
- return actions
-
-
-class LightHistorizedItem(BaseHistorizedItem):
- history_date = models.DateTimeField(default=datetime.datetime.now)
-
- class Meta:
- abstract = True
-
- def save(self, *args, **kwargs):
- super(LightHistorizedItem, self).save(*args, **kwargs)
- return self
-
-
-PARSE_FORMULA = re.compile("{([^}]*)}")
-
-
-def _deduplicate(value):
- new_values = []
- for v in value.split(u'-'):
- if v not in new_values:
- new_values.append(v)
- return u'-'.join(new_values)
-
-
-FORMULA_FILTERS = {
- 'upper': lambda x: x.upper(),
- 'lower': lambda x: x.lower(),
- 'capitalize': lambda x: x.capitalize(),
- 'slug': lambda x: slugify(x),
- 'deduplicate': _deduplicate
-}
-
-
-def get_external_id(key, item):
- profile = get_current_profile()
- if not hasattr(profile, key):
- return
- formula = getattr(profile, key)
- dct = {}
- for fkey in PARSE_FORMULA.findall(formula):
- filtered = fkey.split('|')
- initial_key = fkey[:]
- fkey = filtered[0]
- filters = []
- for filtr in filtered[1:]:
- if filtr in FORMULA_FILTERS:
- filters.append(FORMULA_FILTERS[filtr])
- if fkey.startswith('settings__'):
- dct[fkey] = getattr(settings, fkey[len('settings__'):]) or ''
- continue
- obj = item
- for k in fkey.split('__'):
- try:
- obj = getattr(obj, k)
- except ObjectDoesNotExist:
- obj = None
- if hasattr(obj, 'all') and hasattr(obj, 'count'): # query manager
- if not obj.count():
- break
- obj = obj.all()[0]
- elif callable(obj):
- obj = obj()
- if obj is None:
- break
- if obj is None:
- dct[initial_key] = ''
- else:
- dct[initial_key] = str(obj)
- for filtr in filters:
- dct[initial_key] = filtr(dct[initial_key])
- values = formula.format(**dct).split('||')
- value = values[0]
- for filtr in values[1:]:
- if filtr not in FORMULA_FILTERS:
- value += '||' + filtr
- continue
- value = FORMULA_FILTERS[filtr](value)
- return value
-
-
class Language(GeneralType):
iso_code = models.CharField(_("ISO code"), null=True, blank=True,
max_length=2)
+
class Meta:
verbose_name = _("Language")
verbose_name_plural = _("Languages")
@@ -3109,10 +978,6 @@ class IshtarSiteProfile(models.Model, Cached):
return obj
-def get_current_profile(force=False):
- return IshtarSiteProfile.get_current_profile(force=force)
-
-
def _profile_mapping():
return get_current_profile().mapping
@@ -3357,141 +1222,6 @@ class StatsCache(models.Model):
verbose_name_plural = _("Caches for stats")
-def update_stats(statscache, item, funcname):
- if not settings.USE_BACKGROUND_TASK:
- current_values = statscache.values
- if not current_values:
- current_values = {}
- value = getattr(item, funcname)()
- current_values[funcname] = value
- statscache.values = current_values
- statscache.updated = datetime.datetime.now()
- statscache.save()
- return current_values
-
- now = datetime.datetime.now()
- app_name = item._meta.app_label
- model_name = item._meta.model_name
- statscache.update_requested = now.isoformat()
- statscache.save()
- _update_stats.delay(app_name, model_name, item.pk, funcname)
- return statscache.values
-
-
-def __get_stats_cache_values(model_name, model_pk):
- q = StatsCache.objects.filter(
- model=model_name, model_pk=model_pk
- )
- nb = q.count()
- if nb >= 1:
- sc = q.all()[0]
- for extra in q.order_by("-id").all()[1:]:
- extra.delete()
- else:
- sc = StatsCache.objects.create(
- model=model_name, model_pk=model_pk
- )
- values = sc.values
- if not values:
- values = {}
- return sc, values
-
-
-@task()
-def _update_stats(app, model, model_pk, funcname):
- model_name = app + "." + model
- model = apps.get_model(app, model)
- try:
- item = model.objects.get(pk=model_pk)
- except model.DoesNotExist:
- return
- value = getattr(item, funcname)()
- sc, current_values = __get_stats_cache_values(model_name, model_pk)
- current_values[funcname] = value
- sc.values = current_values
- sc.update_requested = None
- sc.updated = datetime.datetime.now()
- sc.save()
-
-
-class DashboardFormItem(object):
- """
- Provide methods to manage statistics
- """
-
- def last_stats_update(self):
- model_name = self._meta.app_label + "." + self._meta.model_name
- q = StatsCache.objects.filter(
- model=model_name, model_pk=self.pk).order_by("-updated")
- if not q.count():
- return
- return q.all()[0].updated
-
- def _get_or_set_stats(self, funcname, update=False,
- expected_type=None):
- model_name = self._meta.app_label + "." + self._meta.model_name
- sc, __ = StatsCache.objects.get_or_create(
- model=model_name, model_pk=self.pk
- )
- if not update:
- values = sc.values
- if funcname not in values:
- if expected_type is not None:
- return expected_type()
- return 0
- else:
- values = update_stats(sc, self, funcname)
- if funcname in values:
- values = values[funcname]
- else:
- values = 0
- if expected_type is not None and not isinstance(values, expected_type):
- return expected_type()
- return values
-
- @classmethod
- def get_periods(cls, slice='month', fltr={}, date_source='creation'):
- date_var = date_source + '_date'
- q = cls.objects.filter(**{date_var + '__isnull': False})
- if fltr:
- q = q.filter(**fltr)
- if slice == 'year':
- return [res[date_var].year for res in list(q.values(date_var)
- .annotate(
- Count("id")).order_by())]
- elif slice == 'month':
- return [(res[date_var].year, res[date_var].month)
- for res in list(q.values(date_var)
- .annotate(Count("id")).order_by())]
- return []
-
- @classmethod
- def get_by_year(cls, year, fltr={}, date_source='creation'):
- date_var = date_source + '_date'
- q = cls.objects.filter(**{date_var + '__isnull': False})
- if fltr:
- q = q.filter(**fltr)
- return q.filter(
- **{date_var + '__year': year}).order_by('pk').distinct('pk')
-
- @classmethod
- def get_by_month(cls, year, month, fltr={}, date_source='creation'):
- date_var = date_source + '_date'
- q = cls.objects.filter(**{date_var + '__isnull': False})
- if fltr:
- q = q.filter(**fltr)
- q = q.filter(
- **{date_var + '__year': year, date_var + '__month': month})
- return q.order_by('pk').distinct('pk')
-
- @classmethod
- def get_total_number(cls, fltr=None):
- q = cls.objects
- if fltr:
- q = q.filter(**fltr)
- return q.order_by('pk').distinct('pk').count()
-
-
class Dashboard(object):
def __init__(self, model, slice='year', date_source=None, show_detail=None,
fltr=None):
@@ -3780,254 +1510,6 @@ class DocumentTemplate(models.Model):
return output_name
-class NumberManager(models.Manager):
- def get_by_natural_key(self, number):
- return self.get(number=number)
-
-
-class State(models.Model):
- label = models.CharField(_("Label"), max_length=30)
- number = models.CharField(_("Number"), unique=True, max_length=3)
- objects = NumberManager()
-
- class Meta:
- verbose_name = _("State")
- ordering = ['number']
-
- def __str__(self):
- return self.label
-
- def natural_key(self):
- return (self.number,)
-
-
-class Department(models.Model):
- label = models.CharField(_("Label"), max_length=30)
- number = models.CharField(_("Number"), unique=True, max_length=3)
- state = models.ForeignKey(
- 'State', verbose_name=_("State"), blank=True, null=True,
- on_delete=models.SET_NULL,
- )
- objects = NumberManager()
-
- class Meta:
- verbose_name = _("Department")
- verbose_name_plural = _("Departments")
- ordering = ['number']
-
- def __str__(self):
- return self.label
-
- def natural_key(self):
- return (self.number,)
-
- def history_compress(self):
- return self.number
-
- @classmethod
- def history_decompress(cls, full_value, create=False):
- if not full_value:
- return []
- res = []
- for value in full_value:
- try:
- res.append(cls.objects.get(number=value))
- except cls.DoesNotExist:
- continue
- return res
-
-
-class Arrondissement(models.Model):
- name = models.CharField("Nom", max_length=30)
- department = models.ForeignKey(Department, verbose_name="Département")
-
- def __str__(self):
- return settings.JOINT.join((self.name, str(self.department)))
-
-
-class Canton(models.Model):
- name = models.CharField("Nom", max_length=30)
- arrondissement = models.ForeignKey(Arrondissement,
- verbose_name="Arrondissement")
-
- def __str__(self):
- return settings.JOINT.join(
- (self.name, str(self.arrondissement)))
-
-
-class TownManager(models.GeoManager):
- def get_by_natural_key(self, numero_insee, year):
- return self.get(numero_insee=numero_insee, year=year)
-
-
-class Town(Imported, models.Model):
- name = models.CharField(_("Name"), max_length=100)
- surface = models.IntegerField(_("Surface (m2)"), blank=True, null=True)
- center = models.PointField(_("Localisation"), srid=settings.SRID,
- blank=True, null=True)
- limit = models.MultiPolygonField(_("Limit"), blank=True, null=True)
- numero_insee = models.CharField("Code commune (numéro INSEE)",
- max_length=120)
- departement = models.ForeignKey(
- Department, verbose_name=_("Department"),
- on_delete=models.SET_NULL, null=True, blank=True)
- year = models.IntegerField(
- _("Year of creation"), null=True, blank=True,
- help_text=_("Filling this field is relevant to distinguish old towns "
- "from new towns."))
- children = models.ManyToManyField(
- 'Town', verbose_name=_("Town children"), blank=True,
- related_name='parents')
- cached_label = models.CharField(_("Cached name"), max_length=500,
- null=True, blank=True, db_index=True)
- objects = TownManager()
-
- class Meta:
- verbose_name = _("Town")
- verbose_name_plural = _("Towns")
- if settings.COUNTRY == 'fr':
- ordering = ['numero_insee']
- unique_together = (('numero_insee', 'year'),)
-
- def natural_key(self):
- return (self.numero_insee, self.year)
-
- def history_compress(self):
- values = {'numero_insee': self.numero_insee,
- 'year': self.year or ""}
- return values
-
- def get_values(self, prefix='', no_values=False, filtr=None, **kwargs):
- values = {}
- if not filtr or prefix in filtr or "label" in filtr:
- if prefix:
- values[prefix] = str(self)
- else:
- values['label'] = str(self)
- if not filtr or prefix + "name" in filtr:
- values[prefix + "name"] = self.name
- if not filtr or prefix + "numero_insee" in filtr:
- values[prefix + "numero_insee"] = self.numero_insee
- return values
-
- @classmethod
- def history_decompress(cls, full_value, create=False):
- if not full_value:
- return []
- res = []
- for value in full_value:
- try:
- res.append(
- cls.objects.get(numero_insee=value['numero_insee'],
- year=value['year'] or None))
- except cls.DoesNotExist:
- continue
- return res
-
- def __str__(self):
- return self.cached_label or ""
-
- @property
- def label_with_areas(self):
- label = [self.name]
- if self.numero_insee:
- label.append("({})".format(self.numero_insee))
- for area in self.areas.all():
- label.append(" - ")
- label.append(area.full_label)
- return " ".join(label)
-
- def generate_geo(self, force=False):
- force = self.generate_limit(force=force)
- self.generate_center(force=force)
- self.generate_area(force=force)
-
- def generate_limit(self, force=False):
- if not force and self.limit:
- return
- parents = None
- if not self.parents.count():
- return
- for parent in self.parents.all():
- if not parent.limit:
- return
- if not parents:
- parents = parent.limit
- else:
- parents = parents.union(parent.limit)
- # if union is a simple polygon make it a multi
- if 'MULTI' not in parents.wkt:
- parents = parents.wkt.replace('POLYGON', 'MULTIPOLYGON(') + ")"
- if not parents:
- return
- self.limit = parents
- self.save()
- return True
-
- def generate_center(self, force=False):
- if not force and (self.center or not self.limit):
- return
- self.center = self.limit.centroid
- if not self.center:
- return False
- self.save()
- return True
-
- def generate_area(self, force=False):
- if not force and (self.surface or not self.limit):
- return
- surface = self.limit.transform(settings.SURFACE_SRID,
- clone=True).area
- if surface > 214748364 or not surface:
- return False
- self.surface = surface
- self.save()
- return True
-
- def update_town_code(self):
- if not self.numero_insee or not self.children.count() or not self.year:
- return
- old_num = self.numero_insee[:]
- numero = old_num.split('-')[0]
- self.numero_insee = "{}-{}".format(numero, self.year)
- if self.numero_insee != old_num:
- return True
-
- def _generate_cached_label(self):
- cached_label = self.name
- if settings.COUNTRY == "fr" and self.numero_insee:
- dpt_len = 2
- if self.numero_insee.startswith('97') or \
- self.numero_insee.startswith('98') or \
- self.numero_insee[0] not in ('0', '1', '2', '3', '4', '5',
- '6', '7', '8', '9'):
- dpt_len = 3
- cached_label = "%s - %s" % (self.name, self.numero_insee[:dpt_len])
- if self.year and self.children.count():
- cached_label += " ({})".format(self.year)
- return cached_label
-
-
-def post_save_town(sender, **kwargs):
- cached_label_changed(sender, **kwargs)
- town = kwargs['instance']
- town.generate_geo()
- if town.update_town_code():
- town.save()
-
-
-post_save.connect(post_save_town, sender=Town)
-
-
-def town_child_changed(sender, **kwargs):
- town = kwargs['instance']
- if town.update_town_code():
- town.save()
-
-
-m2m_changed.connect(town_child_changed, sender=Town.children.through)
-
-
class Area(HierarchicalType):
towns = models.ManyToManyField(Town, verbose_name=_("Towns"), blank=True,
related_name='areas')
@@ -4057,261 +1539,6 @@ class Area(HierarchicalType):
return " / ".join(label)
-class Address(BaseHistorizedItem):
- FIELDS = (
- "address", "address_complement", "postal_code", "town",
- "precise_town", "country",
- "alt_address", "alt_address_complement", "alt_postal_code", "alt_town",
- "alt_country",
- "phone", "phone_desc", "phone2", "phone_desc2", "phone3", "phone_desc3",
- "raw_phone", "mobile_phone", "email", "alt_address_is_prefered"
- )
- address = models.TextField(_("Address"), null=True, blank=True)
- address_complement = models.TextField(_("Address complement"), null=True,
- blank=True)
- postal_code = models.CharField(_("Postal code"), max_length=10, null=True,
- blank=True)
- town = models.CharField(_("Town (freeform)"), max_length=150, null=True,
- blank=True)
- precise_town = models.ForeignKey(
- Town, verbose_name=_("Town (precise)"), null=True, blank=True)
- country = models.CharField(_("Country"), max_length=30, null=True,
- blank=True)
- alt_address = models.TextField(_("Other address: address"), null=True,
- blank=True)
- alt_address_complement = models.TextField(
- _("Other address: address complement"), null=True, blank=True)
- alt_postal_code = models.CharField(_("Other address: postal code"),
- max_length=10, null=True, blank=True)
- alt_town = models.CharField(_("Other address: town"), max_length=70,
- null=True, blank=True)
- alt_country = models.CharField(_("Other address: country"),
- max_length=30, null=True, blank=True)
- phone = models.CharField(_("Phone"), max_length=18, null=True, blank=True)
- phone_desc = models.CharField(_("Phone description"), max_length=300,
- null=True, blank=True)
- phone2 = models.CharField(_("Phone description 2"), max_length=18,
- null=True, blank=True)
- phone_desc2 = models.CharField(_("Phone description 2"), max_length=300,
- null=True, blank=True)
- phone3 = models.CharField(_("Phone 3"), max_length=18, null=True,
- blank=True)
- phone_desc3 = models.CharField(_("Phone description 3"), max_length=300,
- null=True, blank=True)
- raw_phone = models.TextField(_("Raw phone"), blank=True, null=True)
- mobile_phone = models.CharField(_("Mobile phone"), max_length=18,
- null=True, blank=True)
- email = models.EmailField(
- _("Email"), max_length=300, blank=True, null=True)
- alt_address_is_prefered = models.BooleanField(
- _("Alternative address is prefered"), default=False)
- SUB_ADDRESSES = []
-
- class Meta:
- abstract = True
-
- def get_short_html_items(self):
- items = []
- if self.address:
- items.append(
- """<span class="subadress">{}</span>""".format(self.address))
- if self.address_complement:
- items.append(
- """<span class="subadress-complement">{}</span>""".format(
- self.address_complement))
- if self.postal_code:
- items.append(
- """<span class="postal-code">{}</span>""".format(
- self.postal_code))
- if self.precise_town:
- items.append(
- """<span class="town">{}</span>""".format(
- self.precise_town.name))
- elif self.town:
- items.append(
- """<span class="town">{}</span>""".format(
- self.town))
- if self.country:
- items.append(
- """<span class="country">{}</span>""".format(
- self.country))
- return items
-
- def get_short_html_detail(self):
- html = """<div class="address">"""
- items = self.get_short_html_items()
- if not items:
- items = [
- "<span class='no-address'>{}</span>".format(
- _("No associated address")
- )
- ]
- html += "".join(items)
- html += """</div>"""
- return html
-
- def get_town_centroid(self):
- if self.precise_town:
- return self.precise_town.center, self._meta.verbose_name
- for sub_address in self.SUB_ADDRESSES:
- sub_item = getattr(self, sub_address)
- if sub_item and sub_item.precise_town:
- return sub_item.precise_town.center, sub_item._meta.verbose_name
-
- def get_town_polygons(self):
- if self.precise_town:
- return self.precise_town.limit, self._meta.verbose_name
- for sub_address in self.SUB_ADDRESSES:
- sub_item = getattr(self, sub_address)
- if sub_item and sub_item.precise_town:
- return sub_item.precise_town.limit, sub_item._meta.verbose_name
-
- def get_attribute(self, attr):
- if self.town or self.precise_town:
- return getattr(self, attr)
- for sub_address in self.SUB_ADDRESSES:
- sub_item = getattr(self, sub_address)
- if not sub_item:
- continue
- if sub_item.town or sub_item.precise_town:
- return getattr(sub_item, attr)
- return getattr(self, attr)
-
- def get_address(self):
- return self.get_attribute("address")
-
- def get_address_complement(self):
- return self.get_attribute("address_complement")
-
- def get_postal_code(self):
- return self.get_attribute("postal_code")
-
- def get_town(self):
- return self.get_attribute("town")
-
- def get_precise_town(self):
- return self.get_attribute("precise_town")
-
- def get_country(self):
- return self.get_attribute("country")
-
- def simple_lbl(self):
- return str(self)
-
- def full_address(self):
- lbl = self.simple_lbl()
- if lbl:
- lbl += "\n"
- lbl += self.address_lbl()
- return lbl
-
- def address_lbl(self):
- lbl = u''
- prefix = ''
- if self.alt_address_is_prefered:
- prefix = 'alt_'
- if getattr(self, prefix + 'address'):
- lbl += getattr(self, prefix + 'address')
- if getattr(self, prefix + 'address_complement'):
- if lbl:
- lbl += "\n"
- lbl += getattr(self, prefix + 'address_complement')
- postal_code = getattr(self, prefix + 'postal_code')
- town = getattr(self, prefix + 'town')
- if postal_code or town:
- if lbl:
- lbl += "\n"
- lbl += "{}{}{}".format(
- postal_code or '',
- " " if postal_code and town else '',
- town or '')
- if self.phone:
- if lbl:
- lbl += "\n"
- lbl += "{} {}".format(str(_("Tel: ")), self.phone)
- if self.mobile_phone:
- if lbl:
- lbl += "\n"
- lbl += "{} {}".format(str(_("Mobile: ")), self.mobile_phone)
- if self.email:
- if lbl:
- lbl += "\n"
- lbl += "{} {}".format(str(_("Email: ")), self.email)
- return lbl
-
-
-class Merge(models.Model):
- merge_key = models.TextField(_("Merge key"), blank=True, null=True)
- merge_candidate = models.ManyToManyField("self", blank=True)
- merge_exclusion = models.ManyToManyField("self", blank=True)
- archived = models.NullBooleanField(default=False,
- blank=True, null=True)
- # 1 for one word similarity, 2 for two word similarity, etc.
- MERGE_CLEMENCY = None
- EMPTY_MERGE_KEY = '--'
- MERGE_ATTRIBUTE = "name"
-
- class Meta:
- abstract = True
-
- def generate_merge_key(self):
- if self.archived:
- return
- merge_attr = getattr(self, self.MERGE_ATTRIBUTE)
- self.merge_key = slugify(merge_attr if merge_attr else '')
- if not self.merge_key:
- self.merge_key = self.EMPTY_MERGE_KEY
- self.merge_key = self.merge_key
-
- def generate_merge_candidate(self):
- if self.archived:
- return
- if not self.merge_key:
- self.generate_merge_key()
- self.save(merge_key_generated=True)
- if not self.pk or self.merge_key == self.EMPTY_MERGE_KEY:
- return
- q = self.__class__.objects \
- .exclude(pk=self.pk) \
- .exclude(merge_exclusion=self) \
- .exclude(merge_candidate=self) \
- .exclude(archived=True)
- if not self.MERGE_CLEMENCY:
- q = q.filter(merge_key=self.merge_key)
- else:
- subkeys_front = "-".join(
- self.merge_key.split('-')[:self.MERGE_CLEMENCY])
- subkeys_back = "-".join(
- self.merge_key.split('-')[-self.MERGE_CLEMENCY:])
- q = q.filter(Q(merge_key__istartswith=subkeys_front) |
- Q(merge_key__iendswith=subkeys_back))
- for item in q.all():
- self.merge_candidate.add(item)
-
- def save(self, *args, **kwargs):
- # prevent circular save
- merge_key_generated = False
- if 'merge_key_generated' in kwargs:
- merge_key_generated = kwargs.pop('merge_key_generated')
- self.generate_merge_key()
- item = super(Merge, self).save(*args, **kwargs)
- if not merge_key_generated:
- self.merge_candidate.clear()
- self.generate_merge_candidate()
- return item
-
- def archive(self):
- self.archived = True
- self.save()
- self.merge_candidate.clear()
- self.merge_exclusion.clear()
-
- def merge(self, item, keep_old=False, exclude_fields=None):
- merge_model_objects(self, item, keep_old=keep_old,
- exclude_fields=exclude_fields)
- self.generate_merge_candidate()
-
-
class OrganizationType(GeneralType):
class Meta:
verbose_name = _("Organization type")
@@ -5704,15 +2931,12 @@ class Document(BaseHistorizedItem, QRCodeItem, OwnPerms, ImageModel,
verbose_name=_("Receipt date in documentation"))
item_number = models.IntegerField(_("Number of items"), default=1)
description = models.TextField(_("Description"), blank=True, null=True)
- container = models.ForeignKey(
- "archaeological_warehouse.Container", verbose_name=_("Container"),
- blank=True, null=True, related_name='contained_documents',
- on_delete=models.SET_NULL)
- container_ref = models.ForeignKey(
- "archaeological_warehouse.Container",
- verbose_name=_("Reference container"),
- blank=True, null=True,
- related_name='contained_documents_ref', on_delete=models.SET_NULL)
+ container_id = models.PositiveIntegerField(
+ verbose_name=_("Container ID"), blank=True, null=True)
+ # container = models.ForeignKey("archaeological_warehouse.Container")
+ container_ref_id = models.PositiveIntegerField(
+ verbose_name=_("Container ID"), blank=True, null=True)
+ # container_ref = models.ForeignKey("archaeological_warehouse.Container")
comment = models.TextField(_("Comment"), blank=True, null=True)
additional_information = models.TextField(_("Additional information"),
blank=True, null=True)
@@ -5750,6 +2974,26 @@ class Document(BaseHistorizedItem, QRCodeItem, OwnPerms, ImageModel,
def natural_key(self):
return (self.external_id,)
+ @property
+ def container(self):
+ if not self.container_id:
+ return
+ Container = apps.get_model("archaeological_warehouse", "Container")
+ try:
+ return Container.objects.get(pk=self.container_id)
+ except Container.DoesNotExist:
+ return
+
+ @property
+ def container_ref(self):
+ if not self.container_ref_id:
+ return
+ Container = apps.get_model("archaeological_warehouse", "Container")
+ try:
+ return Container.objects.get(pk=self.container_ref_id)
+ except Container.DoesNotExist:
+ return
+
"""
@property
def code(self):
@@ -6177,6 +3421,16 @@ class Document(BaseHistorizedItem, QRCodeItem, OwnPerms, ImageModel,
self.set_index()
if not self.associated_url:
self.associated_url = None
+ container = self.container
+ if self.container_id and not container:
+ self.container_id = None
+ if container and container.pk != self.container_id:
+ self.container_id = container.pk
+ container_ref = self.container_ref
+ if self.container_ref_id and not container_ref:
+ self.container_ref_id = None
+ if container_ref and container_ref.pk != self.container_ref_id:
+ self.container_ref_id = container_ref.pk
super(Document, self).save(*args, **kwargs)
if self.image and not no_path_change and \
@@ -6190,42 +3444,6 @@ class Document(BaseHistorizedItem, QRCodeItem, OwnPerms, ImageModel,
self.save(no_path_change=True)
-def document_attached_changed(sender, **kwargs):
- # associate a default main image
- instance = kwargs.get("instance", None)
- model = kwargs.get("model", None)
- pk_set = kwargs.get("pk_set", None)
- if not instance or not model:
- return
-
- if hasattr(instance, "documents"):
- items = [instance]
- else:
- if not pk_set:
- return
- try:
- items = [model.objects.get(pk=pk) for pk in pk_set]
- except model.DoesNotExist:
- return
-
- for item in items:
- q = item.documents.filter(
- image__isnull=False).exclude(image='')
- if item.main_image:
- if q.filter(pk=item.main_image.pk).count():
- return
- # the association has disappear not the main image anymore
- item.main_image = None
- item.skip_history_when_saving = True
- item.save()
- if not q.count():
- return
- # by default get the lowest pk
- item.main_image = q.order_by('pk').all()[0]
- item.skip_history_when_saving = True
- item.save()
-
-
post_save.connect(cached_label_changed, sender=Document)