diff options
author | Étienne Loks <etienne.loks@iggdrasil.net> | 2020-10-07 19:09:30 +0200 |
---|---|---|
committer | Étienne Loks <etienne.loks@iggdrasil.net> | 2021-02-28 12:15:21 +0100 |
commit | 9d5f0791187ff6b18d3ffa4db4d593fe96834e8d (patch) | |
tree | 9cd21bf7e51d271b958a9a4b2b85367adbb97992 /ishtar_common | |
parent | e5c0a159929fc64d63db37ebd85a5a810faf2534 (diff) | |
download | Ishtar-9d5f0791187ff6b18d3ffa4db4d593fe96834e8d.tar.bz2 Ishtar-9d5f0791187ff6b18d3ffa4db4d593fe96834e8d.zip |
Refactoring of models. Document container - declare only id
Diffstat (limited to 'ishtar_common')
-rw-r--r-- | ishtar_common/admin.py | 18 | ||||
-rw-r--r-- | ishtar_common/forms_common.py | 18 | ||||
-rw-r--r-- | ishtar_common/migrations/0201_squashed.py | 6 | ||||
-rw-r--r-- | ishtar_common/migrations/0204_auto_20200514_1124.py | 27 | ||||
-rw-r--r-- | ishtar_common/migrations/0204_auto_20201007_1630.py (renamed from ishtar_common/migrations/0205_auto_20200527_1500.py) | 52 | ||||
-rw-r--r-- | ishtar_common/models.py | 2890 | ||||
-rw-r--r-- | ishtar_common/models_common.py | 2777 | ||||
-rw-r--r-- | ishtar_common/serializers.py | 3 | ||||
-rw-r--r-- | ishtar_common/tasks.py | 2 | ||||
-rw-r--r-- | ishtar_common/tests.py | 4 | ||||
-rw-r--r-- | ishtar_common/utils.py | 118 | ||||
-rw-r--r-- | ishtar_common/views_item.py | 22 | ||||
-rw-r--r-- | ishtar_common/wizards.py | 2 |
13 files changed, 3036 insertions, 2903 deletions
diff --git a/ishtar_common/admin.py b/ishtar_common/admin.py index 287f28249..b5ff67567 100644 --- a/ishtar_common/admin.py +++ b/ishtar_common/admin.py @@ -62,7 +62,7 @@ from django.views.decorators.csrf import csrf_protect from django import forms -from ishtar_common import models +from ishtar_common import models, models_common from ishtar_common.apps import admin_site from ishtar_common.model_merging import merge_model_objects from ishtar_common.utils import get_cache, create_slug @@ -376,10 +376,10 @@ admin_site.register(models.IshtarSiteProfile, IshtarSiteProfileAdmin) class DepartmentAdmin(admin.ModelAdmin): list_display = ('number', 'label',) - model = models.Department + model = models_common.Department -admin_site.register(models.Department, DepartmentAdmin) +admin_site.register(models_common.Department, DepartmentAdmin) class OrganizationAdmin(HistorizedObjectAdmin): @@ -765,7 +765,7 @@ class ImportGEOJSONActionAdmin(object): continue num_insee = values.pop('numero_insee') year = values.pop('year') or None - t, c = models.Town.objects.get_or_create( + t, c = models_common.Town.objects.get_or_create( numero_insee=num_insee, year=year, defaults=values) if c: @@ -861,7 +861,7 @@ class ImportJSONActionAdmin(admin.ModelAdmin): class AdminRelatedTownForm(forms.ModelForm): class Meta: - model = models.Town.children.through + model = models_common.Town.children.through exclude = [] from_town = AutoCompleteSelectField( 'town', required=True, label=_(u"Parent")) @@ -869,7 +869,7 @@ class AdminRelatedTownForm(forms.ModelForm): class AdminTownForm(forms.ModelForm): class Meta: - model = models.Town + model = models_common.Town exclude = ['imports', 'departement'] center = PointField(label=_(u"Center"), required=False, widget=OSMWidget) @@ -880,7 +880,7 @@ class AdminTownForm(forms.ModelForm): class TownParentInline(admin.TabularInline): - model = models.Town.children.through + model = models_common.Town.children.through fk_name = 'to_town' form = AdminRelatedTownForm verbose_name = _(u"Parent") @@ -891,7 +891,7 @@ class TownParentInline(admin.TabularInline): class TownAdmin(ImportGEOJSONActionAdmin, ImportActionAdmin): change_list_template = "admin/town_change_list.html" - model = models.Town + model = models_common.Town list_display = ['name', 'year'] search_fields = ['name'] readonly_fields = ['cached_label'] @@ -907,7 +907,7 @@ class TownAdmin(ImportGEOJSONActionAdmin, ImportActionAdmin): import_keys = ['slug', 'txt_idx', 'numero_insee'] -admin_site.register(models.Town, TownAdmin) +admin_site.register(models_common.Town, TownAdmin) class GeneralTypeAdmin(ImportActionAdmin, ImportJSONActionAdmin): diff --git a/ishtar_common/forms_common.py b/ishtar_common/forms_common.py index 49d1829bc..d6d4e197a 100644 --- a/ishtar_common/forms_common.py +++ b/ishtar_common/forms_common.py @@ -1281,12 +1281,18 @@ class DocumentForm(forms.ModelForm, CustomForm, ManageOldType): model=models.Format, label=_("Format"), choices=[], required=False) scale = forms.CharField(label=_("Scale"), max_length=30, required=False) - container = widgets.ModelJQueryAutocompleteField( + container_id = forms.IntegerField( label=_("Current container"), - model=Container, required=False) - container_ref = widgets.ModelJQueryAutocompleteField( + widget=widgets.JQueryAutoComplete( + reverse_lazy('autocomplete-container'), + associated_model=Container, new=True), + validators=[models.valid_id(Container)], required=False) + container_ref_id = forms.IntegerField( label=_("Reference container"), - model=Container, required=False) + widget=widgets.JQueryAutoComplete( + reverse_lazy('autocomplete-container'), + associated_model=Container, new=True), + validators=[models.valid_id(Container)], required=False) authors = widgets.Select2MultipleField( label=_("Authors"), required=False, model=models.Author, remote="autocomplete-author") @@ -1378,7 +1384,7 @@ class DocumentForm(forms.ModelForm, CustomForm, ManageOldType): 'receipt_date_in_documentation', 'creation_date', 'publisher', 'language', 'isbn', 'issn', 'licenses', 'source', 'source_free_input', - 'container', "container_ref", + 'container_id', "container_ref_id", 'comment', 'description', 'additional_information', 'duplicate' ] @@ -1390,7 +1396,7 @@ class DocumentForm(forms.ModelForm, CustomForm, ManageOldType): 'receipt_date': FormHeader(_("Dates")), 'publisher': FormHeader(_("Publishing"), collapse=True), 'source': FormHeader(_("Source"), collapse=True), - 'container': FormHeader(_("Warehouse"), collapse=True), + 'container_id': FormHeader(_("Warehouse"), collapse=True), 'comment': FormHeader(_("Advanced"), collapse=True), 'finds': FormHeader(_("Related items")), } diff --git a/ishtar_common/migrations/0201_squashed.py b/ishtar_common/migrations/0201_squashed.py index d0710636e..d7b65626a 100644 --- a/ishtar_common/migrations/0201_squashed.py +++ b/ishtar_common/migrations/0201_squashed.py @@ -203,7 +203,11 @@ class Migration(migrations.Migration): 'ordering': ('title',), 'permissions': (('view_document', 'Peut voir tous les Documents'), ('view_own_document', 'Peut voir ses propres Documents'), ('add_own_document', 'Peut ajouter son propre Document'), ('change_own_document', 'Peut modifier ses propres Documents'), ('delete_own_document', 'Peut supprimer ses propres Documents')), }, - bases=(ishtar_common.models.StatisticItem, ishtar_common.models.TemplateItem, ishtar_common.models.OwnPerms, models.Model, ishtar_common.models.CachedGen, ishtar_common.models.FixAssociated, ishtar_common.models.CascasdeUpdate, ishtar_common.models.ImageContainerModel, ishtar_common.models.ValueGetter, ishtar_common.models.MainItem), + bases=(ishtar_common.models.StatisticItem, + ishtar_common.models.TemplateItem, + ishtar_common.models.OwnPerms, models.Model, + ishtar_common.models.CachedGen, + ishtar_common.models.FixAssociated, ishtar_common.models.CascasdeUpdate, ishtar_common.models.ImageContainerModel, ishtar_common.models.ValueGetter, ishtar_common.models.MainItem), ), migrations.CreateModel( name='DocumentTemplate', diff --git a/ishtar_common/migrations/0204_auto_20200514_1124.py b/ishtar_common/migrations/0204_auto_20200514_1124.py deleted file mode 100644 index 4eea40bf3..000000000 --- a/ishtar_common/migrations/0204_auto_20200514_1124.py +++ /dev/null @@ -1,27 +0,0 @@ -# -*- coding: utf-8 -*- -# Generated by Django 1.11.27 on 2020-05-14 11:24 -from __future__ import unicode_literals - -from django.db import migrations, models -import django.db.models.deletion - - -class Migration(migrations.Migration): - - dependencies = [ - ('archaeological_warehouse', '0103_auto_container_views'), - ('ishtar_common', '0203_auto_20200407_1142'), - ] - - operations = [ - migrations.AddField( - model_name='document', - name='container', - field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='contained_documents', to='archaeological_warehouse.Container', verbose_name='Container'), - ), - migrations.AddField( - model_name='document', - name='container_ref', - field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='contained_documents_ref', to='archaeological_warehouse.Container', verbose_name='Reference container'), - ), - ] diff --git a/ishtar_common/migrations/0205_auto_20200527_1500.py b/ishtar_common/migrations/0204_auto_20201007_1630.py index deed0ad90..9b5ab0876 100644 --- a/ishtar_common/migrations/0205_auto_20200527_1500.py +++ b/ishtar_common/migrations/0204_auto_20201007_1630.py @@ -1,18 +1,18 @@ # -*- coding: utf-8 -*- -# Generated by Django 1.11.27 on 2020-05-27 15:00 +# Generated by Django 1.11.27 on 2020-10-07 16:30 from __future__ import unicode_literals import django.core.validators from django.db import migrations, models import django.db.models.deletion -import ishtar_common.models +import ishtar_common.models_common import re class Migration(migrations.Migration): dependencies = [ - ('ishtar_common', '0204_auto_20200514_1124'), + ('ishtar_common', '0203_auto_20200407_1142'), ] operations = [ @@ -30,7 +30,7 @@ class Migration(migrations.Migration): 'verbose_name_plural': 'Document tags', 'ordering': ('label',), }, - bases=(ishtar_common.models.Cached, models.Model), + bases=(ishtar_common.models_common.Cached, models.Model), ), migrations.CreateModel( name='Language', @@ -46,7 +46,7 @@ class Migration(migrations.Migration): 'verbose_name': 'Language', 'verbose_name_plural': 'Languages', }, - bases=(ishtar_common.models.Cached, models.Model), + bases=(ishtar_common.models_common.Cached, models.Model), ), migrations.AlterModelOptions( name='sourcetype', @@ -54,6 +54,16 @@ class Migration(migrations.Migration): ), migrations.AddField( model_name='document', + name='container_id', + field=models.PositiveIntegerField(blank=True, null=True, verbose_name='Container ID'), + ), + migrations.AddField( + model_name='document', + name='container_ref_id', + field=models.PositiveIntegerField(blank=True, null=True, verbose_name='Container ID'), + ), + migrations.AddField( + model_name='document', name='isbn', field=models.CharField(blank=True, max_length=13, null=True, verbose_name='ISBN'), ), @@ -65,7 +75,7 @@ class Migration(migrations.Migration): migrations.AddField( model_name='document', name='publisher', - field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='ishtar_common.Organization', verbose_name='Publisher'), + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='publish', to='ishtar_common.Organization', verbose_name='Publisher'), ), migrations.AddField( model_name='document', @@ -92,6 +102,36 @@ class Migration(migrations.Migration): name='is_localized', field=models.BooleanField(default=False, help_text='Setting a language for this type of document is relevant', verbose_name='Is localized'), ), + migrations.AlterField( + model_name='document', + name='format_type', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, to='ishtar_common.Format', verbose_name='Medium'), + ), + migrations.AlterField( + model_name='importercolumn', + name='regexp_pre_filter', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='columns', to='ishtar_common.Regexp'), + ), + migrations.AlterField( + model_name='importercolumn', + name='value_format', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='columns', to='ishtar_common.ValueFormater'), + ), + migrations.AlterField( + model_name='importertype', + name='associated_models', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='importer_type_associated', to='ishtar_common.ImporterModel', verbose_name='Associated model'), + ), + migrations.AlterField( + model_name='importertype', + name='created_models', + field=models.ManyToManyField(blank=True, help_text='Leave blank for no restrictions', related_name='importer_type_created', to='ishtar_common.ImporterModel', verbose_name='Models that can accept new items'), + ), + migrations.AlterField( + model_name='importtarget', + name='formater_type', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='targets', to='ishtar_common.FormaterType'), + ), migrations.AddField( model_name='document', name='language', diff --git a/ishtar_common/models.py b/ishtar_common/models.py index 001c1894f..f23cab926 100644 --- a/ishtar_common/models.py +++ b/ishtar_common/models.py @@ -21,7 +21,6 @@ Models description """ import copy -from collections import OrderedDict import datetime import inspect from importlib import import_module @@ -29,9 +28,6 @@ from jinja2 import TemplateSyntaxError, UndefinedError import json import logging import os -import pyqrcode -import re -import shutil import string import tempfile import time @@ -46,7 +42,6 @@ import zipfile from urllib.parse import urlencode from xml.etree import ElementTree as ET -from django import forms from django.apps import apps from django.conf import settings from django.contrib.auth.models import User, Group @@ -54,32 +49,22 @@ from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.contenttypes.models import ContentType from django.contrib.gis.db import models from django.contrib.postgres.fields import JSONField -from django.contrib.postgres.search import SearchVectorField, SearchVector from django.contrib.postgres.indexes import GinIndex from django.contrib.sites.models import Site from django.core.cache import cache from django.core.exceptions import ObjectDoesNotExist, ValidationError, \ MultipleObjectsReturned from django.core.files.uploadedfile import SimpleUploadedFile -from django.core.files import File -from django.core.serializers import serialize from django.core.urlresolvers import reverse, NoReverseMatch -from django.core.validators import validate_slug -from django.db import connection from django.db.models import Q, Max, Count, F from django.db.models.signals import post_save, post_delete, m2m_changed from django.db.utils import DatabaseError from django.template.defaultfilters import slugify from django.utils.functional import lazy -from django.utils.safestring import SafeText, mark_safe -from django.utils.translation import activate, deactivate from ishtar_common.utils import ugettext_lazy as _, ugettext, \ - pgettext_lazy + pgettext_lazy, get_external_id, get_current_profile, duplicate_item, \ + get_image_path from ishtar_common.utils_secretary import IshtarSecretaryRenderer -from simple_history.models import HistoricalRecords as BaseHistoricalRecords -from simple_history.signals import post_create_historical_record, \ - pre_create_historical_record -from unidecode import unidecode from ishtar_common.alternative_configs import ALTERNATE_CONFIGS, \ ALTERNATE_CONFIGS_CHOICES @@ -87,16 +72,24 @@ from ishtar_common.alternative_configs import ALTERNATE_CONFIGS, \ from ishtar_common.data_importer import pre_importer_action from ishtar_common.model_managers import SlugModelManager, ExternalIdManager, \ - TypeManager, UUIDModelManager + UUIDModelManager from ishtar_common.model_merging import merge_model_objects from ishtar_common.models_imports import ImporterModel, ImporterType, \ ImporterDefault, ImporterDefaultValues, ImporterColumn, \ ImporterDuplicateField, Regexp, ImportTarget, TargetKey, FormaterType, \ Import, TargetKeyGroup, ValueFormater -from ishtar_common.templatetags.link_to_window import simple_link_to_window -from ishtar_common.utils import get_cache, disable_for_loaddata, create_slug, \ - get_all_field_names, merge_tsvectors, cached_label_changed, post_save_geo, \ - generate_relation_graph, max_size_help, task +from ishtar_common.utils import get_cache, create_slug, \ + get_all_field_names, cached_label_changed, \ + generate_relation_graph, max_size_help + +from ishtar_common.models_common import GeneralType, HierarchicalType, \ + BaseHistorizedItem, LightHistorizedItem, FullSearch, Imported, \ + FixAssociated, SearchAltName, HistoryError, OwnPerms, Cached, \ + Address, post_save_cache, TemplateItem, SpatialReferenceSystem, \ + DashboardFormItem, document_attached_changed, SearchAltName, \ + DynamicRequest, GeoItem, QRCodeItem, SearchVectorConfig, DocumentItem, \ + QuickAction, MainItem, Merge, ShortMenuItem, Town, ImageContainerModel, \ + StatisticItem, CachedGen, CascasdeUpdate, Department, State __all__ = [ 'ImporterModel', 'ImporterType', 'ImporterDefault', 'ImporterDefaultValues', @@ -107,7 +100,8 @@ __all__ = [ 'LightHistorizedItem', 'OwnPerms', 'Address', 'post_save_cache', 'DashboardFormItem', 'ShortMenuItem', 'document_attached_changed', 'SearchAltName', 'DynamicRequest', 'GeoItem', 'QRCodeItem', - 'SearchVectorConfig', 'DocumentItem' + 'SearchVectorConfig', 'DocumentItem', 'CachedGen', 'StatisticItem', + 'CascasdeUpdate', 'Department', 'State' ] logger = logging.getLogger(__name__) @@ -262,88 +256,6 @@ class HistoryModel(models.Model): create=create) -class HistoricalRecords(BaseHistoricalRecords): - def _save_historic(self, manager, instance, history_date, history_type, - history_user, history_change_reason, using, attrs): - history_instance = manager.model( - history_date=history_date, - history_type=history_type, - history_user=history_user, - history_change_reason=history_change_reason, - **attrs - ) - - pre_create_historical_record.send( - sender=manager.model, - instance=instance, - history_date=history_date, - history_user=history_user, - history_change_reason=history_change_reason, - history_instance=history_instance, - using=using, - ) - - history_instance.save(using=using) - - post_create_historical_record.send( - sender=manager.model, - instance=instance, - history_instance=history_instance, - history_date=history_date, - history_user=history_user, - history_change_reason=history_change_reason, - using=using, - ) - - def create_historical_record(self, instance, history_type, using=None): - try: - history_modifier = getattr(instance, 'history_modifier', None) - assert history_modifier - except (User.DoesNotExist, AssertionError): - # on batch removing of users, user could have disappeared - return - history_date = getattr(instance, "_history_date", - datetime.datetime.now()) - history_change_reason = getattr(instance, "changeReason", None) - force = getattr(instance, "_force_history", False) - manager = getattr(instance, self.manager_name) - attrs = {} - for field in instance._meta.fields: - attrs[field.attname] = getattr(instance, field.attname) - q_history = instance.history \ - .filter(history_modifier_id=history_modifier.pk) \ - .order_by('-history_date', '-history_id') - # instance.skip_history_when_saving = True - if not q_history.count(): - if force: - delattr(instance, '_force_history') - self._save_historic( - manager, instance, history_date, history_type, history_modifier, - history_change_reason, using, attrs) - return - old_instance = q_history.all()[0] - # multiple saving by the same user in a very short time are generaly - # caused by post_save signals it is not relevant to keep them - min_history_date = datetime.datetime.now() \ - - datetime.timedelta(seconds=5) - q = q_history.filter(history_date__isnull=False, - history_date__gt=min_history_date) \ - .order_by('-history_date', '-history_id') - if not force and q.count(): - return - - if force: - delattr(instance, '_force_history') - - # record a new version only if data have been changed - for field in instance._meta.fields: - if getattr(old_instance, field.attname) != attrs[field.attname]: - self._save_historic(manager, instance, history_date, - history_type, history_modifier, - history_change_reason, using, attrs) - return - - def valid_id(cls): # valid ID validator for models def func(value): @@ -383,741 +295,6 @@ def is_unique(cls, field): return func -class OwnPerms(object): - """ - Manage special permissions for object's owner - """ - - @classmethod - def get_query_owns(cls, ishtaruser): - """ - Query object to get own items - """ - return None # implement for each object - - def can_view(self, request): - if hasattr(self, "LONG_SLUG"): - perm = "view_" + self.LONG_SLUG - else: - perm = "view_" + self.SLUG - return self.can_do(request, perm) - - def can_do(self, request, action_name): - """ - Check permission availability for the current object. - :param request: request object - :param action_name: action name eg: "change_find" - "own" variation is - checked - :return: boolean - """ - if not getattr(request.user, 'ishtaruser', None): - return False - splited = action_name.split('_') - action_own_name = splited[0] + '_own_' + '_'.join(splited[1:]) - user = request.user - if action_own_name == "view_own_findbasket": - action_own_name = "view_own_find" - return user.ishtaruser.has_right(action_name, request.session) or \ - (user.ishtaruser.has_right(action_own_name, request.session) - and self.is_own(user.ishtaruser)) - - def is_own(self, user, alt_query_own=None): - """ - Check if the current object is owned by the user - """ - if isinstance(user, IshtarUser): - ishtaruser = user - elif hasattr(user, 'ishtaruser'): - ishtaruser = user.ishtaruser - else: - return False - if not alt_query_own: - query = self.get_query_owns(ishtaruser) - else: - query = getattr(self, alt_query_own)(ishtaruser) - if not query: - return False - query &= Q(pk=self.pk) - return self.__class__.objects.filter(query).count() - - @classmethod - def has_item_of(cls, user): - """ - Check if the user own some items - """ - if isinstance(user, IshtarUser): - ishtaruser = user - elif hasattr(user, 'ishtaruser'): - ishtaruser = user.ishtaruser - else: - return False - query = cls.get_query_owns(ishtaruser) - if not query: - return False - return cls.objects.filter(query).count() - - @classmethod - def _return_get_owns(cls, owns, values, get_short_menu_class, - label_key='cached_label'): - if not owns: - return [] - sorted_values = [] - if hasattr(cls, 'BASKET_MODEL'): - owns_len = len(owns) - for idx, item in enumerate(reversed(owns)): - if get_short_menu_class: - item = item[0] - if type(item) == cls.BASKET_MODEL: - basket = owns.pop(owns_len - idx - 1) - sorted_values.append(basket) - sorted_values = list(reversed(sorted_values)) - if not values: - if not get_short_menu_class: - return sorted_values + list( - sorted(owns, key=lambda x: getattr(x, label_key) or "")) - return sorted_values + list( - sorted(owns, key=lambda x: getattr(x[0], label_key) or "")) - if not get_short_menu_class: - return sorted_values + list( - sorted(owns, key=lambda x: x[label_key] or "")) - return sorted_values + list( - sorted(owns, key=lambda x: x[0][label_key] or "")) - - @classmethod - def get_owns(cls, user, replace_query=None, limit=None, values=None, - get_short_menu_class=False, menu_filtr=None): - """ - Get Own items - """ - if not replace_query: - replace_query = {} - if hasattr(user, 'is_authenticated') and not user.is_authenticated(): - returned = cls.objects.filter(pk__isnull=True) - if values: - returned = [] - return returned - if isinstance(user, User): - try: - ishtaruser = IshtarUser.objects.get(user_ptr=user) - except IshtarUser.DoesNotExist: - returned = cls.objects.filter(pk__isnull=True) - if values: - returned = [] - return returned - elif isinstance(user, IshtarUser): - ishtaruser = user - else: - if values: - return [] - return cls.objects.filter(pk__isnull=True) - items = [] - if hasattr(cls, 'BASKET_MODEL'): - items = list(cls.BASKET_MODEL.objects.filter(user=ishtaruser).all()) - query = cls.get_query_owns(ishtaruser) - if not query and not replace_query: - returned = cls.objects.filter(pk__isnull=True) - if values: - returned = [] - return returned - if query: - q = cls.objects.filter(query) - else: # replace_query - q = cls.objects.filter(replace_query) - if values: - q = q.values(*values) - if limit: - items += list(q.order_by('-pk')[:limit]) - else: - items += list(q.order_by(*cls._meta.ordering).all()) - if get_short_menu_class: - if values: - if 'id' not in values: - raise NotImplementedError( - "Call of get_owns with get_short_menu_class option and" - " no 'id' in values is not implemented") - my_items = [] - for i in items: - if hasattr(cls, 'BASKET_MODEL') and \ - type(i) == cls.BASKET_MODEL: - dct = dict([(k, getattr(i, k)) for k in values]) - my_items.append( - (dct, cls.BASKET_MODEL.get_short_menu_class(i.pk))) - else: - my_items.append((i, cls.get_short_menu_class(i['id']))) - items = my_items - else: - items = [(i, cls.get_short_menu_class(i.pk)) for i in items] - return items - - @classmethod - def _get_query_owns_dicts(cls, ishtaruser): - """ - List of query own dict to construct the query. - Each dict are join with an AND operator, each dict key, values are - joined with OR operator - """ - return [] - - @classmethod - def _construct_query_own(cls, prefix, dct_list): - q = None - for subquery_dict in dct_list: - subquery = None - for k in subquery_dict: - subsubquery = Q(**{prefix + k: subquery_dict[k]}) - if subquery: - subquery |= subsubquery - else: - subquery = subsubquery - if not subquery: - continue - if q: - q &= subquery - else: - q = subquery - return q - - -class CachedGen(object): - @classmethod - def refresh_cache(cls): - raise NotImplementedError() - - @classmethod - def _add_cache_key_to_refresh(cls, keys): - cache_ckey, current_keys = get_cache(cls, ['_current_keys']) - if type(current_keys) != list: - current_keys = [] - if keys not in current_keys: - current_keys.append(keys) - cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT) - - -class Cached(CachedGen): - slug_field = 'txt_idx' - - @classmethod - def refresh_cache(cls): - cache_ckey, current_keys = get_cache(cls, ['_current_keys']) - if not current_keys: - return - for keys in current_keys: - if len(keys) == 2 and keys[0] == '__slug': - cls.get_cache(keys[1], force=True) - elif keys[0] == '__get_types': - default = None - empty_first = True - exclude = [] - if len(keys) >= 2: - default = keys.pop() - if len(keys) > 1: - empty_first = bool(keys.pop()) - exclude = keys[1:] - cls.get_types( - exclude=exclude, empty_first=empty_first, default=default, - force=True) - elif keys[0] == '__get_help': - cls.get_help(force=True) - - @classmethod - def _add_cache_key_to_refresh(cls, keys): - cache_ckey, current_keys = get_cache(cls, ['_current_keys']) - if type(current_keys) != list: - current_keys = [] - if keys not in current_keys: - current_keys.append(keys) - cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT) - - @classmethod - def get_cache(cls, slug, force=False): - cache_key, value = get_cache(cls, ['__slug', slug]) - if not force and value: - return value - try: - k = {cls.slug_field: slug} - obj = cls.objects.get(**k) - cache.set(cache_key, obj, settings.CACHE_TIMEOUT) - return obj - except cls.DoesNotExist: - cache.set(cache_key, None, settings.CACHE_TIMEOUT) - return None - - -@disable_for_loaddata -def post_save_cache(sender, **kwargs): - sender.refresh_cache() - - -class GeneralType(Cached, models.Model): - """ - Abstract class for "types" - """ - label = models.TextField(_("Label")) - txt_idx = models.TextField( - _("Textual ID"), validators=[validate_slug], - unique=True, - help_text=_( - "The slug is the standardized version of the name. It contains " - "only lowercase letters, numbers and hyphens. Each slug must " - "be unique.")) - comment = models.TextField(_("Comment"), blank=True, null=True) - available = models.BooleanField(_("Available"), default=True) - HELP_TEXT = "" - objects = TypeManager() - - class Meta: - abstract = True - - def __str__(self): - return self.label - - def natural_key(self): - return (self.txt_idx,) - - def history_compress(self): - return self.txt_idx - - @classmethod - def history_decompress(cls, value, create=False): - if not value: - return [] - res = [] - for txt_idx in value: - try: - res.append(cls.objects.get(txt_idx=txt_idx)) - except cls.DoesNotExist: - continue - return res - - @property - def explicit_label(self): - return "{} ({})".format(self.label, self._meta.verbose_name) - - @classmethod - def create_default_for_test(cls): - return [cls.objects.create(label='Test %d' % i) for i in range(5)] - - @property - def short_label(self): - return self.label - - @property - def name(self): - return self.label - - @classmethod - def get_or_create(cls, slug, label=''): - """ - Get or create a new item. - - :param slug: textual id - :param label: label for initialization if the item doesn't exist (not - mandatory) - - :return: instancied item of the base class - """ - - item = cls.get_cache(slug) - if item: - return item - if isinstance(slug, list): - slug = slug[0] - item, __ = cls.objects.get_or_create( - txt_idx=slug, defaults={'label': label}) - return item - - @classmethod - def get_or_create_pk(cls, slug): - """ - Get an id from a slug. Create the associated item if needed. - - :param slug: textual id - - :return: id of the item (string) - """ - return str(cls.get_or_create(slug).pk) - - @classmethod - def get_or_create_pks(cls, slugs): - """ - Get and merge a list of ids from a slug list. Create the associated - items if needed. - - :param slugs: textual ids - - :return: string with ids separated by "_" - """ - items = [] - for slug in slugs: - items.append(str(cls.get_or_create(slug).pk)) - return "_".join(items) - - @classmethod - def get_help(cls, dct=None, exclude=None, force=False, full_hierarchy=None): - if not dct: - dct = {} - if not exclude: - exclude = [] - keys = ['__get_help'] - keys += ["{}".format(ex) for ex in exclude] - keys += ['{}-{}'.format(str(k), dct[k]) for k in dct] - cache_key, value = get_cache(cls, keys) - if value and not force: - return mark_safe(value) - help_text = cls.HELP_TEXT - c_rank = -1 - help_items = "\n" - for item in cls.get_types(dct=dct, instances=True, exclude=exclude): - if hasattr(item, '__iter__'): - pk = item[0] - item = cls.objects.get(pk=pk) - item.rank = c_rank + 1 - if hasattr(item, 'parent'): - c_item = item - parents = [] - while c_item.parent: - parents.append(c_item.parent.label) - c_item = c_item.parent - parents.reverse() - parents.append(item.label) - item.label = " / ".join(parents) - if not item.comment: - continue - if c_rank > item.rank: - help_items += "</dl>\n" - elif c_rank < item.rank: - help_items += "<dl>\n" - c_rank = item.rank - help_items += "<dt>%s</dt><dd>%s</dd>" % ( - item.label, "<br/>".join(item.comment.split('\n'))) - c_rank += 1 - if c_rank: - help_items += c_rank * "</dl>" - if help_text or help_items != u'\n': - help_text = help_text + help_items - else: - help_text = "" - cache.set(cache_key, help_text, settings.CACHE_TIMEOUT) - return mark_safe(help_text) - - @classmethod - def _get_initial_types(cls, initial, type_pks, instance=False): - new_vals = [] - if not initial: - return [] - if type(initial) not in (list, tuple): - initial = [initial] - for value in initial: - try: - pk = int(value) - except (ValueError, TypeError): - continue - if pk in type_pks: - continue - try: - extra_type = cls.objects.get(pk=pk) - if instance: - new_vals.append(extra_type) - else: - new_vals.append((extra_type.pk, str(extra_type))) - except cls.DoesNotExist: - continue - return new_vals - - @classmethod - def get_types(cls, dct=None, instances=False, exclude=None, - empty_first=True, default=None, initial=None, force=False, - full_hierarchy=False): - if not dct: - dct = {} - if not exclude: - exclude = [] - types = [] - if not instances and empty_first and not default: - types = [('', '--')] - types += cls._pre_get_types(dct, instances, exclude, - default, force, - get_full_hierarchy=full_hierarchy) - if not initial: - return types - new_vals = cls._get_initial_types(initial, [idx for idx, lbl in types]) - types += new_vals - return types - - @classmethod - def _pre_get_types(cls, dct=None, instances=False, exclude=None, - default=None, force=False, get_full_hierarchy=False): - if not dct: - dct = {} - if not exclude: - exclude = [] - # cache - cache_key = None - if not instances: - keys = ['__get_types'] - keys += ["{}".format(ex) for ex in exclude] + \ - ["{}".format(default)] - keys += ['{}-{}'.format(str(k), dct[k]) for k in dct] - cache_key, value = get_cache(cls, keys) - if value and not force: - return value - base_dct = dct.copy() - if hasattr(cls, 'parent'): - if not cache_key: - return cls._get_parent_types( - base_dct, instances, exclude=exclude, - default=default, get_full_hierarchy=get_full_hierarchy) - vals = [v for v in cls._get_parent_types( - base_dct, instances, exclude=exclude, - default=default, get_full_hierarchy=get_full_hierarchy)] - cache.set(cache_key, vals, settings.CACHE_TIMEOUT) - return vals - - if not cache_key: - return cls._get_types(base_dct, instances, exclude=exclude, - default=default) - vals = [ - v for v in cls._get_types(base_dct, instances, exclude=exclude, - default=default) - ] - cache.set(cache_key, vals, settings.CACHE_TIMEOUT) - return vals - - @classmethod - def _get_types(cls, dct=None, instances=False, exclude=None, default=None): - if not dct: - dct = {} - if not exclude: - exclude = [] - dct['available'] = True - if default: - try: - default = cls.objects.get(txt_idx=default) - yield (default.pk, _(str(default))) - except cls.DoesNotExist: - pass - items = cls.objects.filter(**dct) - if default and default != "None": - if hasattr(default, 'txt_idx'): - exclude.append(default.txt_idx) - else: - exclude.append(default) - if exclude: - items = items.exclude(txt_idx__in=exclude) - for item in items.order_by(*cls._meta.ordering).all(): - if instances: - item.rank = 0 - yield item - else: - yield (item.pk, _(str(item)) if item and str(item) else '') - - @classmethod - def _get_childs_list(cls, dct=None, exclude=None, instances=False): - if not dct: - dct = {} - if not exclude: - exclude = [] - if 'parent' in dct: - dct.pop('parent') - childs = cls.objects.filter(**dct) - if exclude: - childs = childs.exclude(txt_idx__in=exclude) - if hasattr(cls, 'order'): - childs = childs.order_by('order') - res = {} - if instances: - for item in childs.all(): - parent_id = item.parent_id or 0 - if parent_id not in res: - res[parent_id] = [] - res[parent_id].append(item) - else: - for item in childs.values("id", "parent_id", "label").all(): - parent_id = item["parent_id"] or 0 - if item["id"] == item["parent_id"]: - parent_id = 0 - if parent_id not in res: - res[parent_id] = [] - res[parent_id].append((item["id"], item["label"])) - return res - - PREFIX = "│ " - PREFIX_EMPTY = " " - PREFIX_MEDIUM = "├ " - PREFIX_LAST = "└ " - PREFIX_CODES = ["\u2502", "\u251C", "\u2514"] - - @classmethod - def _get_childs(cls, item, child_list, prefix=0, instances=False, - is_last=False, last_of=None, get_full_hierarchy=False): - if not last_of: - last_of = [] - - prefix += 1 - current_child_lst = [] - if item in child_list: - current_child_lst = child_list[item] - - lst = [] - total = len(current_child_lst) - full_hierarchy_initial = get_full_hierarchy - for idx, child in enumerate(current_child_lst): - mylast_of = last_of[:] - p = '' - if instances: - child.rank = prefix - lst.append(child) - else: - if full_hierarchy_initial: - if isinstance(full_hierarchy_initial, str): - p = full_hierarchy_initial + " > " - else: - p = "" - else: - cprefix = prefix - while cprefix: - cprefix -= 1 - if not cprefix: - if (idx + 1) == total: - p += cls.PREFIX_LAST - else: - p += cls.PREFIX_MEDIUM - elif is_last: - if mylast_of: - clast = mylast_of.pop(0) - if clast: - p += cls.PREFIX_EMPTY - else: - p += cls.PREFIX - else: - p += cls.PREFIX_EMPTY - else: - p += cls.PREFIX - lst.append(( - child[0], SafeText(p + str(_(child[1]))) - )) - clast_of = last_of[:] - clast_of.append(idx + 1 == total) - if instances: - child_id = child.id - else: - child_id = child[0] - if get_full_hierarchy: - if p: - if not p.endswith(" > "): - p += " > " - get_full_hierarchy = p + child[1] - else: - get_full_hierarchy = child[1] - for sub_child in cls._get_childs( - child_id, child_list, prefix, instances, - is_last=((idx + 1) == total), last_of=clast_of, - get_full_hierarchy=get_full_hierarchy): - lst.append(sub_child) - return lst - - @classmethod - def _get_parent_types(cls, dct=None, instances=False, exclude=None, - default=None, get_full_hierarchy=False): - if not dct: - dct = {} - if not exclude: - exclude = [] - dct['available'] = True - child_list = cls._get_childs_list(dct, exclude, instances) - - if 0 in child_list: - for item in child_list[0]: - if instances: - item.rank = 0 - item_id = item.pk - yield item - else: - item_id = item[0] - yield item - if get_full_hierarchy: - get_full_hierarchy = item[1] - for child in cls._get_childs( - item_id, child_list, instances=instances, - get_full_hierarchy=get_full_hierarchy): - yield child - - def save(self, *args, **kwargs): - if not self.id and not self.label: - txt_idx = self.txt_idx - if isinstance(txt_idx, list): - txt_idx = txt_idx[0] - self.txt_idx = txt_idx - self.label = " ".join(" ".join(self.txt_idx.split('-')) - .split('_')).title() - if not self.txt_idx: - self.txt_idx = slugify(self.label)[:100] - - # clean old keys - if self.pk: - old = self.__class__.objects.get(pk=self.pk) - content_type = ContentType.objects.get_for_model(self.__class__) - if slugify(self.label) != slugify(old.label): - ItemKey.objects.filter( - object_id=self.pk, key=slugify(old.label), - content_type=content_type).delete() - if self.txt_idx != old.txt_idx: - ItemKey.objects.filter( - object_id=self.pk, key=old.txt_idx, - content_type=content_type).delete() - - obj = super(GeneralType, self).save(*args, **kwargs) - self.generate_key(force=True) - return obj - - def add_key(self, key, force=False, importer=None, group=None, - user=None): - content_type = ContentType.objects.get_for_model(self.__class__) - if not importer and not force and ItemKey.objects.filter( - key=key, content_type=content_type).count(): - return - filtr = {'key': key, 'content_type': content_type} - if group: - filtr['group'] = group - elif user: - filtr['user'] = user - else: - filtr['importer'] = importer - if force: - ItemKey.objects.filter(**filtr).exclude(object_id=self.pk).delete() - filtr['object_id'] = self.pk - ItemKey.objects.get_or_create(**filtr) - - def generate_key(self, force=False): - for key in (slugify(self.label), self.txt_idx): - self.add_key(key) - - def get_keys(self, importer): - keys = [self.txt_idx] - content_type = ContentType.objects.get_for_model(self.__class__) - base_q = Q(content_type=content_type, object_id=self.pk) - subquery = Q(importer__isnull=True, user__isnull=True, - group__isnull=True) - subquery |= Q(user__isnull=True, group__isnull=True, - importer=importer) - if importer.user: - subquery |= Q(user=importer.user, group__isnull=True, - importer=importer) - if importer.associated_group: - subquery |= Q(user__isnull=True, group=importer.associated_group, - importer=importer) - q = ItemKey.objects.filter(base_q & subquery) - for ik in q.exclude(key=self.txt_idx).all(): - keys.append(ik.key) - return keys - - @classmethod - def generate_keys(cls): - # content_type = ContentType.objects.get_for_model(cls) - for item in cls.objects.all(): - item.generate_key() - - def get_general_type_label(model, slug): obj = model.get_cache(slug) if not obj: @@ -1125,23 +302,6 @@ def get_general_type_label(model, slug): return str(obj) -class HierarchicalType(GeneralType): - parent = models.ForeignKey('self', blank=True, null=True, - on_delete=models.SET_NULL, - verbose_name=_("Parent")) - - class Meta: - abstract = True - - def full_label(self): - lbls = [self.label] - item = self - while item.parent: - item = item.parent - lbls.append(item.label) - return " > ".join(reversed(lbls)) - - class TinyUrl(models.Model): CHAR_MAP = string.ascii_letters + string.digits CHAR_MAP_LEN = len(CHAR_MAP) @@ -1183,24 +343,6 @@ class ItemKey(models.Model): return self.key -def get_image_path(instance, filename): - # when using migrations instance is not a real ImageModel instance - if not hasattr(instance, '_get_image_path'): - n = datetime.datetime.now() - return "upload/{}/{:02d}/{:02d}/{}".format( - n.year, n.month, n.day, filename) - return instance._get_image_path(filename) - - -class ImageContainerModel(object): - def _get_image_path(self, filename): - return "{}/{}".format(self._get_base_image_path(), filename) - - def _get_base_image_path(self): - n = datetime.datetime.now() - return "upload/{}/{:02d}/{:02d}".format(n.year, n.month, n.day) - - class ImageModel(models.Model, ImageContainerModel): image = models.ImageField(upload_to=get_image_path, blank=True, null=True, max_length=255, help_text=max_size_help()) @@ -1286,17 +428,6 @@ class ImageModel(models.Model, ImageContainerModel): ) -class HistoryError(Exception): - def __init__(self, value): - self.value = value - - def __str__(self): - return repr(self.value) - - -PRIVATE_FIELDS = ('id', 'history_modifier', 'order', 'uuid') - - class BulkUpdatedItem(object): @classmethod def bulk_recursion(cls, transaction_id, extra_args): @@ -1456,1071 +587,6 @@ class JsonDataField(models.Model): _("Content types of the field and of the menu do not match")) -class JsonData(models.Model, CachedGen): - data = JSONField(default={}, blank=True) - - class Meta: - abstract = True - - def pre_save(self): - if not self.data: - self.data = {} - - @property - def json_sections(self): - sections = [] - try: - content_type = ContentType.objects.get_for_model(self) - except ContentType.DoesNotExists: - return sections - fields = list(JsonDataField.objects.filter( - content_type=content_type, display=True, section__isnull=True - ).all()) # no section fields - - fields += list(JsonDataField.objects.filter( - content_type=content_type, display=True, section__isnull=False - ).order_by('section__order', 'order').all()) - - for field in fields: - value = None - data = self.data.copy() - for key in field.key.split('__'): - if key in data: - value = copy.copy(data[key]) - data = data[key] - else: - value = None - break - if value is None: - continue - if type(value) in (list, tuple): - value = " ; ".join([str(v) for v in value]) - section_name = field.section.name if field.section else None - if not sections or section_name != sections[-1][0]: - # if section name is identical it is the same - sections.append((section_name, [])) - sections[-1][1].append((field.name, value)) - return sections - - @classmethod - def refresh_cache(cls): - __, refreshed = get_cache(cls, ['cache_refreshed']) - if refreshed and time.time() - refreshed < 1: - return - cache_ckey, current_keys = get_cache(cls, ['_current_keys']) - if not current_keys: - return - for keys in current_keys: - if keys[0] == '__get_dynamic_choices': - cls._get_dynamic_choices(keys[1], force=True) - - @classmethod - def _get_dynamic_choices(cls, key, force=False): - """ - Get choice from existing values - :param key: data key - :param force: if set to True do not use cache - :return: tuple of choices (id, value) - """ - cache_key, value = get_cache(cls, ['__get_dynamic_choices', key]) - if not force and value: - return value - choices = set() - splitted_key = key[len('data__'):].split('__') - q = cls.objects.filter( - data__has_key=key[len('data__'):]).values_list('data', flat=True) - for value in q.all(): - for k in splitted_key: - value = value[k] - choices.add(value) - choices = [('', '')] + [(v, v) for v in sorted(list(choices))] - cache.set(cache_key, choices, settings.CACHE_SMALLTIMEOUT) - return choices - - -class Imported(models.Model): - imports = models.ManyToManyField( - Import, blank=True, - related_name="imported_%(app_label)s_%(class)s") - - class Meta: - abstract = True - - -class SearchAltName(object): - def __init__(self, search_key, search_query, extra_query=None, - distinct_query=False): - self.search_key = search_key - self.search_query = search_query - self.extra_query = extra_query or {} - self.distinct_query = distinct_query - - -class DynamicRequest(object): - def __init__(self, label, app_name, model_name, form_key, search_key, - type_query, search_query): - self.label = label - self.form_key = form_key - self.search_key = search_key - self.app_name = app_name - self.model_name = model_name - self.type_query = type_query - self.search_query = search_query - - def get_all_types(self): - model = apps.get_app_config(self.app_name).get_model(self.model_name) - return model.objects.filter(available=True) - - def get_form_fields(self): - fields = {} - for item in self.get_all_types().all(): - fields[self.form_key + "-" + item.txt_idx] = forms.CharField( - label=str(self.label) + " " + str(item), - required=False - ) - return fields - - def get_extra_query(self, slug): - return { - self.type_query: slug - } - - def get_alt_names(self): - alt_names = {} - for item in self.get_all_types().all(): - alt_names[self.form_key + "-" + item.txt_idx] = SearchAltName( - self.search_key + "-" + item.txt_idx, self.search_query, - self.get_extra_query(item.txt_idx), distinct_query=True - ) - return alt_names - - -class SearchVectorConfig(object): - def __init__(self, key, language=None, func=None): - self.key = key - if language: - self.language = language - if language == "local": - self.language = settings.ISHTAR_SEARCH_LANGUAGE - else: - self.language = "simple" - self.func = func - - def format(self, value): - if value == 'None': - value = '' - if not self.func: - return [value] - return self.func(value) - - -class FullSearch(models.Model): - search_vector = SearchVectorField(_("Search vector"), blank=True, null=True, - help_text=_("Auto filled at save")) - - EXTRA_REQUEST_KEYS = {} - DYNAMIC_REQUESTS = {} - ALT_NAMES = {} - - BASE_SEARCH_VECTORS = [] - PROPERTY_SEARCH_VECTORS = [] - INT_SEARCH_VECTORS = [] - M2M_SEARCH_VECTORS = [] - PARENT_SEARCH_VECTORS = [] - # prevent circular dependency - PARENT_ONLY_SEARCH_VECTORS = [] - - class Meta: - abstract = True - - @classmethod - def general_types(cls): - for k in get_all_field_names(cls): - field = cls._meta.get_field(k) - if not hasattr(field, 'rel') or not field.rel: - continue - rel_model = field.rel.to - if issubclass(rel_model, (GeneralType, HierarchicalType)): - yield k - - @classmethod - def get_alt_names(cls): - alt_names = cls.ALT_NAMES.copy() - for dr_k in cls.DYNAMIC_REQUESTS: - alt_names.update(cls.DYNAMIC_REQUESTS[dr_k].get_alt_names()) - return alt_names - - @classmethod - def get_query_parameters(cls): - query_parameters = {} - for v in cls.get_alt_names().values(): - for language_code, language_lbl in settings.LANGUAGES: - activate(language_code) - query_parameters[str(v.search_key)] = v - deactivate() - return query_parameters - - def _update_search_field(self, search_vector_conf, search_vectors, data): - for value in search_vector_conf.format(data): - with connection.cursor() as cursor: - cursor.execute("SELECT to_tsvector(%s, %s)", [ - search_vector_conf.language, value]) - row = cursor.fetchone() - search_vectors.append(row[0]) - - def _update_search_number_field(self, search_vectors, val): - search_vectors.append("'{}':1".format(val)) - - def update_search_vector(self, save=True, exclude_parent=False): - """ - Update the search vector - :param save: True if you want to save the object immediately - :return: True if modified - """ - if not hasattr(self, 'search_vector'): - return - if not self.pk: - # logger.warning("Cannot update search vector before save or " - # "after deletion.") - return - if not self.BASE_SEARCH_VECTORS and not self.M2M_SEARCH_VECTORS \ - and not self.INT_SEARCH_VECTORS \ - and not self.PROPERTY_SEARCH_VECTORS \ - and not self.PARENT_SEARCH_VECTORS: - logger.warning("No search_vectors defined for {}".format( - self.__class__)) - return - if getattr(self, '_search_updated', None): - return - self._search_updated = True - - old_search = "" - if self.search_vector: - old_search = self.search_vector[:] - search_vectors = [] - base_q = self.__class__.objects.filter(pk=self.pk) - - # many to many have to be queried one by one otherwise only one is fetch - for m2m_search_vector in self.M2M_SEARCH_VECTORS: - key = m2m_search_vector.key.split('__')[0] - rel_key = getattr(self, key) - for item in rel_key.values('pk').all(): - query_dct = {key + "__pk": item['pk']} - q = copy.copy(base_q).filter(**query_dct) - q = q.annotate( - search=SearchVector( - m2m_search_vector.key, - config=m2m_search_vector.language) - ).values('search') - search_vectors.append(q.all()[0]['search']) - - # int/float are not well managed by the SearchVector - for int_search_vector in self.INT_SEARCH_VECTORS: - q = base_q.values(int_search_vector.key) - for val in int_search_vector.format( - q.all()[0][int_search_vector.key]): - self._update_search_number_field(search_vectors, val) - - if not exclude_parent: - # copy parent vector fields - for PARENT_SEARCH_VECTOR in self.PARENT_SEARCH_VECTORS: - parent = getattr(self, PARENT_SEARCH_VECTOR) - if hasattr(parent, 'all'): # m2m - for p in parent.all(): - search_vectors.append(p.search_vector) - elif parent: - search_vectors.append(parent.search_vector) - - for PARENT_ONLY_SEARCH_VECTOR in self.PARENT_ONLY_SEARCH_VECTORS: - parent = getattr(self, PARENT_ONLY_SEARCH_VECTOR) - if hasattr(parent, 'all'): # m2m - for p in parent.all(): - search_vectors.append( - p.update_search_vector(save=False, exclude_parent=True) - ) - elif parent: - search_vectors.append( - parent.update_search_vector(save=False, exclude_parent=True) - ) - - if self.BASE_SEARCH_VECTORS: - # query "simple" fields - q = base_q.values(*[sv.key for sv in self.BASE_SEARCH_VECTORS]) - res = q.all()[0] - for base_search_vector in self.BASE_SEARCH_VECTORS: - data = res[base_search_vector.key] - data = unidecode(str(data)) - self._update_search_field(base_search_vector, - search_vectors, data) - - if self.PROPERTY_SEARCH_VECTORS: - for property_search_vector in self.PROPERTY_SEARCH_VECTORS: - data = getattr(self, property_search_vector.key) - if callable(data): - data = data() - if not data: - continue - data = str(data) - self._update_search_field(property_search_vector, - search_vectors, data) - - if hasattr(self, 'data') and self.data: - content_type = ContentType.objects.get_for_model(self) - for json_field in JsonDataField.objects.filter( - content_type=content_type, - search_index=True).all(): - data = copy.deepcopy(self.data) - no_data = False - for key in json_field.key.split('__'): - if key not in data: - no_data = True - break - data = data[key] - if no_data or not data: - continue - - if json_field.value_type == 'B': - if data is True: - data = json_field.name - else: - continue - elif json_field.value_type in ('I', 'F'): - self._update_search_number_field(search_vectors, data) - continue - elif json_field.value_type == 'D': - # only index year - self._update_search_number_field(search_vectors, data.year) - continue - for lang in ("simple", settings.ISHTAR_SEARCH_LANGUAGE): - with connection.cursor() as cursor: - cursor.execute("SELECT to_tsvector(%s, %s)", - [lang, data]) - row = cursor.fetchone() - search_vectors.append(row[0]) - new_search_vector = merge_tsvectors(search_vectors) - changed = old_search != new_search_vector - self.search_vector = new_search_vector - if save and changed: - self.__class__.objects.filter(pk=self.pk).update( - search_vector=new_search_vector) - elif not save: - return new_search_vector - return changed - - -class FixAssociated(object): - ASSOCIATED = {} - - def fix_associated(self): - for key in self.ASSOCIATED: - item = getattr(self, key) - if not item: - continue - dct = self.ASSOCIATED[key] - for dct_key in dct: - subkey, ctype = dct_key - expected_values = dct[dct_key] - if not isinstance(expected_values, (list, tuple)): - expected_values = [expected_values] - if hasattr(ctype, "txt_idx"): - try: - expected_values = [ctype.objects.get(txt_idx=v) - for v in expected_values] - except ctype.DoesNotExist: - # type not yet initialized - return - current_vals = getattr(item, subkey) - is_many = False - if hasattr(current_vals, "all"): - is_many = True - current_vals = current_vals.all() - else: - current_vals = [current_vals] - is_ok = False - for current_val in current_vals: - if current_val in expected_values: - is_ok = True - break - if is_ok: - continue - # the first value is used - new_value = expected_values[0] - if is_many: - getattr(item, subkey).add(new_value) - else: - setattr(item, subkey, new_value) - - -class QRCodeItem(models.Model, ImageContainerModel): - HAS_QR_CODE = True - qrcode = models.ImageField(upload_to=get_image_path, blank=True, null=True, - max_length=255) - - class Meta: - abstract = True - - @property - def qrcode_path(self): - if not self.qrcode: - self.generate_qrcode() - if not self.qrcode: # error on qrcode generation - return "" - return self.qrcode.path - - def generate_qrcode(self, request=None, secure=True, tmpdir=None): - url = self.get_absolute_url() - site = Site.objects.get_current() - if request: - scheme = request.scheme - else: - if secure: - scheme = "https" - else: - scheme = "http" - url = scheme + "://" + site.domain + url - tiny_url = TinyUrl() - tiny_url.link = url - tiny_url.save() - short_url = scheme + "://" + site.domain + reverse( - 'tiny-redirect', args=[tiny_url.get_short_id()]) - qr = pyqrcode.create(short_url, version=settings.ISHTAR_QRCODE_VERSION) - tmpdir_created = False - if not tmpdir: - tmpdir = tempfile.mkdtemp("-qrcode") - tmpdir_created = True - filename = tmpdir + os.sep + 'qrcode.png' - qr.png(filename, scale=settings.ISHTAR_QRCODE_SCALE) - with open(filename, 'rb') as qrfile: - self.qrcode.save("qrcode.png", File(qrfile)) - self.skip_history_when_saving = True - self._no_move = True - self.save() - if tmpdir_created: - shutil.rmtree(tmpdir) - - -class DocumentItem(object): - ALT_NAMES = { - 'documents__image__isnull': - SearchAltName( - pgettext_lazy("key for text search", "has-image"), - 'documents__image__isnull'), - 'documents__associated_url__isnull': - SearchAltName( - pgettext_lazy("key for text search", "has-url"), - 'documents__associated_url__isnull'), - 'documents__associated_file__isnull': - SearchAltName( - pgettext_lazy("key for text search", "has-attached-file"), - 'documents__associated_file__isnull'), - } - - def public_representation(self): - images = [] - if getattr(self, "main_image", None): - images.append(self.main_image.public_representation()) - images += [ - image.public_representation() - for image in self.images_without_main_image.all() - ] - return {"images": images} - - @property - def images(self): - if not hasattr(self, 'documents'): - return Document.objects.none() - return self.documents.filter( - image__isnull=False).exclude(image="").order_by("pk") - - @property - def images_without_main_image(self): - if not hasattr(self, 'main_image') or not hasattr(self, 'documents'): - return self.images - if not self.main_image: - return self.documents.filter( - image__isnull=False).exclude( - image="").order_by("pk") - return self.documents.filter( - image__isnull=False).exclude( - image="").exclude(pk=self.main_image.pk).order_by("pk") - - def get_extra_actions(self, request): - """ - For sheet template: return "Add document / image" action - """ - # url, base_text, icon, extra_text, extra css class, is a quick action - try: - actions = super(DocumentItem, self).get_extra_actions(request) - except AttributeError: - actions = [] - - if not hasattr(self, 'SLUG'): - return actions - - can_add_doc = self.can_do(request, 'add_document') - if can_add_doc and ( - not hasattr(self, "is_locked") or - not self.is_locked(request.user)): - actions += [ - ( - reverse("create-document") + "?{}={}".format( - self.SLUG, self.pk), - _("Add document/image"), - "fa fa-plus", - _("doc./image"), - "", - False - ) - ] - return actions - - -class SpatialReferenceSystem(GeneralType): - order = models.IntegerField(_("Order"), default=10) - auth_name = models.CharField( - _("Authority name"), default=u'EPSG', max_length=256) - srid = models.IntegerField(_("Authority SRID")) - - class Meta: - verbose_name = _("Spatial reference system") - verbose_name_plural = _("Spatial reference systems") - ordering = ('label',) - - -post_save.connect(post_save_cache, sender=SpatialReferenceSystem) -post_delete.connect(post_save_cache, sender=SpatialReferenceSystem) - - -class GeoItem(models.Model): - GEO_SOURCE = ( - ('T', _("Town")), ('P', _("Precise")), ('M', _("Polygon")) - ) - - # gis - x = models.FloatField(_(u'X'), blank=True, null=True) - y = models.FloatField(_(u'Y'), blank=True, null=True) - z = models.FloatField(_(u'Z'), blank=True, null=True) - estimated_error_x = models.FloatField(_(u'Estimated error for X'), - blank=True, null=True) - estimated_error_y = models.FloatField(_(u'Estimated error for Y'), - blank=True, null=True) - estimated_error_z = models.FloatField(_(u'Estimated error for Z'), - blank=True, null=True) - spatial_reference_system = models.ForeignKey( - SpatialReferenceSystem, verbose_name=_("Spatial Reference System"), - blank=True, null=True) - point = models.PointField(_("Point"), blank=True, null=True, dim=3) - point_2d = models.PointField(_("Point (2D)"), blank=True, null=True) - point_source = models.CharField( - _("Point source"), choices=GEO_SOURCE, max_length=1, blank=True, - null=True) - point_source_item = models.CharField( - _("Point source item"), max_length=100, blank=True, null=True) - multi_polygon = models.MultiPolygonField(_("Multi polygon"), blank=True, - null=True) - multi_polygon_source = models.CharField( - _("Multi-polygon source"), choices=GEO_SOURCE, max_length=1, - blank=True, null=True) - multi_polygon_source_item = models.CharField( - _("Multi polygon source item"), max_length=100, blank=True, null=True) - - GEO_LABEL = "" - - class Meta: - abstract = True - - def get_town_centroid(self): - raise NotImplementedError - - def get_town_polygons(self): - raise NotImplementedError - - @property - def display_coordinates(self): - if not self.point_2d: - return "" - profile = get_current_profile() - if not profile.display_srs or not profile.display_srs.srid: - return self.x, self.y - point = self.point_2d.transform(profile.display_srs.srid, clone=True) - return round(point.x, 5), round(point.y, 5) - - @property - def display_spatial_reference_system(self): - profile = get_current_profile() - if not profile.display_srs or not profile.display_srs.srid: - return self.spatial_reference_system - return profile.display_srs - - def get_precise_points(self): - if self.point_source == 'P' and self.point_2d: - return self.point_2d, self.point, self.point_source_item - - def get_precise_polygons(self): - if self.multi_polygon_source == 'P' and self.multi_polygon: - return self.multi_polygon, self.multi_polygon_source_item - - def most_precise_geo(self): - if self.point_source == 'M': - return 'multi_polygon' - current_source = str(self.__class__._meta.verbose_name) - if self.multi_polygon_source_item == current_source \ - and (self.multi_polygon_source == "P" or - self.point_source_item != current_source): - return 'multi_polygon' - if self.point_source_item == current_source\ - and self.point_source == 'P': - return 'point' - if self.multi_polygon_source == 'P': - return 'multi_polygon' - if self.point_source == 'P': - return 'point' - if self.multi_polygon: - return 'multi_polygon' - if self.point_2d: - return 'point' - - def geo_point_source(self): - if not self.point_source: - return "" - src = "{} - {}".format( - dict(self.GEO_SOURCE)[self.point_source], - self.point_source_item - ) - return src - - def geo_polygon_source(self): - if not self.multi_polygon_source: - return "" - src = "{} - {}".format( - dict(self.GEO_SOURCE)[self.multi_polygon_source], - self.multi_polygon_source_item - ) - return src - - def _geojson_serialize(self, geom_attr): - if not hasattr(self, geom_attr): - return "" - cached_label_key = 'cached_label' - if self.GEO_LABEL: - cached_label_key = self.GEO_LABEL - if getattr(self, "CACHED_LABELS", None): - cached_label_key = self.CACHED_LABELS[-1] - geojson = serialize( - 'geojson', - self.__class__.objects.filter(pk=self.pk), - geometry_field=geom_attr, fields=(cached_label_key,)) - geojson_dct = json.loads(geojson) - profile = get_current_profile() - precision = profile.point_precision - - features = geojson_dct.pop('features') - for idx in range(len(features)): - feature = features[idx] - lbl = feature['properties'].pop(cached_label_key) - feature['properties']['name'] = lbl - feature['properties']['id'] = self.pk - if precision is not None: - geom_type = feature["geometry"].get("type", None) - if geom_type == "Point": - feature["geometry"]["coordinates"] = [ - round(coord, precision) - for coord in feature["geometry"]["coordinates"] - ] - geojson_dct['features'] = features - geojson_dct['link_template'] = simple_link_to_window(self).replace( - '999999', '<pk>' - ) - geojson = json.dumps(geojson_dct) - return geojson - - @property - def point_2d_geojson(self): - return self._geojson_serialize('point_2d') - - @property - def multi_polygon_geojson(self): - return self._geojson_serialize('multi_polygon') - - -class TemplateItem: - @classmethod - def _label_templates_q(cls): - model_name = "{}.{}".format( - cls.__module__, cls.__name__) - q = Q(associated_model__klass=model_name, - for_labels=True, available=True) - alt_model_name = model_name.replace( - "models_finds", "models").replace( - "models_treatments", "models") - if alt_model_name != model_name: - q |= Q(associated_model__klass=model_name, - for_labels=True, available=True) - return DocumentTemplate.objects.filter(q) - - @classmethod - def has_label_templates(cls): - return cls._label_templates_q().count() - - @classmethod - def label_templates(cls): - return cls._label_templates_q() - - def get_extra_templates(self, request): - cls = self.__class__ - templates = [] - name = str(cls.__name__) - module = str(cls.__module__) - if "archaeological_finds" in module: - if "models_finds" in name or "models_treatments" in name: - names = [ - name, - name.replace("models_finds", "models" - ).replace("models_treatments", "models") - ] - else: - names = [name, name.replace("models", "models_finds"), - name.replace("models", "models_treatments")] - else: - names = [name] - model_names = [ - "{}.{}".format(module, name) for name in names - ] - q = DocumentTemplate.objects.filter( - associated_model__klass__in=model_names, - for_labels=False, available=True) - for template in q.all(): - urlname = "generate-document" - templates.append( - (template.name, reverse( - urlname, args=[template.slug, self.pk])) - ) - return templates - - -class StatisticItem: - STATISTIC_MODALITIES = [] # example: "year", "operation_type__label" - STATISTIC_MODALITIES_OPTIONS = OrderedDict() # example: - # OrderedDict([('year', _("Year")), - # ("operation_type__label", _("Operation type"))]) - STATISTIC_SUM_VARIABLE = OrderedDict( - (("pk", (_("Number"), 1)),) - ) # example: "Price", "Volume" - the number is a multiplier - - -class CascasdeUpdate: - DOWN_MODEL_UPDATE = [] - - def cascade_update(self): - for down_model in self.DOWN_MODEL_UPDATE: - if not settings.USE_BACKGROUND_TASK: - rel = getattr(self, down_model) - if hasattr(rel.model, "need_update"): - rel.update(need_update=True) - continue - for item in getattr(self, down_model).all(): - cached_label_changed(item.__class__, instance=item) - if hasattr(item, "point_2d"): - post_save_geo(item.__class__, instance=item) - - -def duplicate_item(item, user=None, data=None): - model = item.__class__ - new = model.objects.get(pk=item.pk) - - for field in model._meta.fields: - # pk is in PRIVATE_FIELDS so: new.pk = None and a new - # item will be created on save - if field.name == "uuid": - new.uuid = uuid.uuid4() - elif field.name in PRIVATE_FIELDS: - setattr(new, field.name, None) - if user: - new.history_user = user - if data: - for k in data: - setattr(new, k, data[k]) - new.save() - - # m2m fields - m2m = [field.name for field in model._meta.many_to_many - if field.name not in PRIVATE_FIELDS] - for field in m2m: - for val in getattr(item, field).all(): - if val not in getattr(new, field).all(): - getattr(new, field).add(val) - return new - - -class BaseHistorizedItem(StatisticItem, TemplateItem, FullSearch, Imported, - JsonData, FixAssociated, CascasdeUpdate): - """ - Historized item with external ID management. - All historized items are searchable and have a data json field. - Historized items can be "locked" for edition. - """ - IS_BASKET = False - SHOW_URL = None - EXTERNAL_ID_KEY = '' - EXTERNAL_ID_DEPENDENCIES = [] - HISTORICAL_M2M = [] - - history_modifier = models.ForeignKey( - User, related_name='+', on_delete=models.SET_NULL, - verbose_name=_("Last editor"), blank=True, null=True) - history_creator = models.ForeignKey( - User, related_name='+', on_delete=models.SET_NULL, - verbose_name=_("Creator"), blank=True, null=True) - last_modified = models.DateTimeField(auto_now=True) - history_m2m = JSONField(default={}, blank=True) - need_update = models.BooleanField( - verbose_name=_("Need update"), default=False) - locked = models.BooleanField( - verbose_name=_("Item locked for edition"), default=False) - lock_user = models.ForeignKey( - User, related_name='+', on_delete=models.SET_NULL, - verbose_name=_("Locked by"), blank=True, null=True) - - ALT_NAMES = { - 'history_creator': SearchAltName( - pgettext_lazy("key for text search", u"created-by"), - 'history_creator__ishtaruser__person__cached_label__iexact' - ), - 'history_modifier': SearchAltName( - pgettext_lazy("key for text search", u"modified-by"), - 'history_modifier__ishtaruser__person__cached_label__iexact' - ), - 'modified_before': SearchAltName( - pgettext_lazy("key for text search", "modified-before"), - 'last_modified__lte' - ), - 'modified_after': SearchAltName( - pgettext_lazy("key for text search", "modified-after"), - 'last_modified__gte' - ), - } - - class Meta: - abstract = True - - @classmethod - def get_verbose_name(cls): - return cls._meta.verbose_name - - def is_locked(self, user=None): - if not user: - return self.locked - return self.locked and (not self.lock_user or self.lock_user != user) - - def merge(self, item, keep_old=False): - merge_model_objects(self, item, keep_old=keep_old) - - def public_representation(self): - return {} - - def duplicate(self, user=None, data=None): - return duplicate_item(self, user, data) - - def update_external_id(self, save=False): - if not self.EXTERNAL_ID_KEY or ( - self.external_id and - not getattr(self, 'auto_external_id', False)): - return - external_id = get_external_id(self.EXTERNAL_ID_KEY, self) - if external_id == self.external_id: - return - self.auto_external_id = True - self.external_id = external_id - self._cached_label_checked = False - if save: - self.skip_history_when_saving = True - self.save() - return external_id - - def get_last_history_date(self): - q = self.history.values("history_date").order_by('-history_date') - if not q.count(): - return - return q.all()[0]['history_date'] - - def get_previous(self, step=None, date=None, strict=False): - """ - Get a "step" previous state of the item - """ - assert step or date - historized = self.history.all() - item = None - if step: - if len(historized) <= step: - # silently return the last step if too far in the history - item = historized[len(historized) - 1] - else: - item = historized[step] - else: - for step, item in enumerate(historized): - if item.history_date == date: - break - # ended with no match - if item.history_date != date: - return - item._step = step - if len(historized) != (step + 1): - item._previous = historized[step + 1].history_date - else: - item._previous = None - if step > 0: - item._next = historized[step - 1].history_date - else: - item._next = None - item.history_date = historized[step].history_date - model = self.__class__ - for k in get_all_field_names(model): - field = model._meta.get_field(k) - if hasattr(field, 'rel') and field.rel: - if not hasattr(item, k + '_id'): - setattr(item, k, getattr(self, k)) - continue - val = getattr(item, k + '_id') - if not val: - setattr(item, k, None) - continue - try: - val = field.rel.to.objects.get(pk=val) - setattr(item, k, val) - except ObjectDoesNotExist: - if strict: - raise HistoryError("The class %s has no pk %d" % ( - str(field.rel.to), val)) - setattr(item, k, None) - item.pk = self.pk - return item - - @property - def last_edition_date(self): - try: - return self.history.order_by('-history_date').all()[0].history_date - except (AttributeError, IndexError): - return - - @property - def history_creation_date(self): - try: - return self.history.order_by('history_date').all()[0].history_date - except (AttributeError, IndexError): - return - - def rollback(self, date): - """ - Rollback to a previous state - """ - to_del, new_item = [], None - for item in self.history.all(): - if item.history_date == date: - new_item = item - break - to_del.append(item) - if not new_item: - raise HistoryError("The date to rollback to doesn't exist.") - try: - field_keys = [f.name for f in self._meta.fields] - for k in field_keys: - if k != 'id' and hasattr(self, k): - if not hasattr(new_item, k): - k = k + "_id" - setattr(self, k, getattr(new_item, k)) - - try: - self.history_modifier = User.objects.get( - pk=new_item.history_modifier_id) - except User.ObjectDoesNotExist: - pass - self.save() - saved_m2m = new_item.history_m2m.copy() - for hist_key in self.HISTORICAL_M2M: - # after each association m2m is rewrite - force the original - # to be reset - new_item.history_m2m = saved_m2m - values = new_item.m2m_listing(hist_key, create=True) or [] - hist_field = getattr(self, hist_key) - hist_field.clear() - for val in values: - hist_field.add(val) - # force label regeneration - self._cached_label_checked = False - self.save() - except ObjectDoesNotExist: - raise HistoryError("The rollback has failed.") - # clean the obsolete history - for historized_item in to_del: - historized_item.delete() - - def m2m_listing(self, key): - return getattr(self, key).all() - - def values(self): - values = {} - for f in self._meta.fields: - k = f.name - if k != 'id': - values[k] = getattr(self, k) - return values - - def get_absolute_url(self): - try: - return reverse('display-item', args=[self.SLUG, self.pk]) - except NoReverseMatch: - return - - def get_show_url(self): - show_url = self.SHOW_URL - if not show_url: - show_url = 'show-' + self.__class__.__name__.lower() - try: - return reverse(show_url, args=[self.pk, '']) - except NoReverseMatch: - return - - @property - def associated_filename(self): - if [True for attr in ('get_town_label', 'get_department', 'reference', - 'short_class_name') if not hasattr(self, attr)]: - return '' - items = [slugify(self.get_department()), - slugify(self.get_town_label()).upper(), - slugify(self.short_class_name), - slugify(self.reference), - slugify(self.name or '').replace('-', '_').capitalize()] - last_edition_date = self.last_edition_date - if last_edition_date: - items.append(last_edition_date.strftime('%Y%m%d')) - else: - items.append('00000000') - return "-".join([str(item) for item in items]) - - def save(self, *args, **kwargs): - created = not self.pk - if not getattr(self, 'skip_history_when_saving', False): - assert hasattr(self, 'history_modifier') - if created: - self.history_creator = self.history_modifier - # external ID can have related item not available before save - external_id_updated = kwargs.pop('external_id_updated') \ - if 'external_id_updated' in kwargs else False - if not created and not external_id_updated: - self.update_external_id() - super(BaseHistorizedItem, self).save(*args, **kwargs) - if created and self.update_external_id(): - # force resave for external ID creation - self.skip_history_when_saving = True - self._updated_id = True - return self.save(external_id_updated=True) - for dep in self.EXTERNAL_ID_DEPENDENCIES: - for obj in getattr(self, dep).all(): - obj.update_external_id(save=True) - self.fix_associated() - return True - - LOGICAL_TYPES = ( ('above', _("Above")), ('below', _("Below")), @@ -2623,207 +689,10 @@ class SearchQuery(models.Model): return str(self.label) -class ShortMenuItem(object): - """ - Item available in the short menu - """ - UP_MODEL_QUERY = {} - - @classmethod - def get_short_menu_class(cls, pk): - return '' - - @property - def short_class_name(self): - return "" - - -class QuickAction(object): - """ - Quick action available from tables - """ - - def __init__(self, url, icon_class='', text='', target=None, rights=None, - module=None): - self.url = url - self.icon_class = icon_class - self.text = text - self.rights = rights - self.target = target - self.module = module - assert self.target in ('one', 'many', None) - - def is_available(self, user, session=None, obj=None): - if self.module and not getattr(get_current_profile(), self.module): - return False - if not self.rights: # no restriction - return True - if not user or not hasattr(user, 'ishtaruser') or not user.ishtaruser: - return False - user = user.ishtaruser - - for right in self.rights: - if user.has_perm(right, session=session, obj=obj): - return True - return False - - @property - def rendered_icon(self): - if not self.icon_class: - return "" - return "<i class='{}' aria-hidden='true'></i>".format(self.icon_class) - - @property - def base_url(self): - if self.target is None: - url = reverse(self.url) - else: - # put arbitrary pk for the target - url = reverse(self.url, args=[0]) - url = url[:-2] # all quick action url have to finish with the - # pk of the selected item and a "/" - return url - - -class MainItem(ShortMenuItem): - """ - Item with quick actions available from tables - Extra actions are available from sheets - """ - QUICK_ACTIONS = [] - - @classmethod - def get_quick_actions(cls, user, session=None, obj=None): - """ - Get a list of (url, title, icon, target) actions for an user - """ - qas = [] - for action in cls.QUICK_ACTIONS: - if not action.is_available(user, session=session, obj=obj): - continue - qas.append([action.base_url, - mark_safe(action.text), - mark_safe(action.rendered_icon), - action.target or ""]) - return qas - - @classmethod - def get_quick_action_by_url(cls, url): - for action in cls.QUICK_ACTIONS: - if action.url == url: - return action - - def regenerate_external_id(self): - if not hasattr(self, "external_id"): - return - self.skip_history_when_saving = True - self._no_move = True - if hasattr(self, "auto_external_id"): - self.external_id = None - self.save() - - def get_extra_actions(self, request): - if not hasattr(self, 'SLUG'): - return [] - - actions = [] - if request.user.is_superuser and hasattr(self, "auto_external_id"): - actions += [ - ( - reverse("regenerate-external-id") + "?{}={}".format( - self.SLUG, self.pk), - _("Regenerate ID"), - "fa fa-key", - _("regen."), - "", - True - ) - ] - - return actions - - -class LightHistorizedItem(BaseHistorizedItem): - history_date = models.DateTimeField(default=datetime.datetime.now) - - class Meta: - abstract = True - - def save(self, *args, **kwargs): - super(LightHistorizedItem, self).save(*args, **kwargs) - return self - - -PARSE_FORMULA = re.compile("{([^}]*)}") - - -def _deduplicate(value): - new_values = [] - for v in value.split(u'-'): - if v not in new_values: - new_values.append(v) - return u'-'.join(new_values) - - -FORMULA_FILTERS = { - 'upper': lambda x: x.upper(), - 'lower': lambda x: x.lower(), - 'capitalize': lambda x: x.capitalize(), - 'slug': lambda x: slugify(x), - 'deduplicate': _deduplicate -} - - -def get_external_id(key, item): - profile = get_current_profile() - if not hasattr(profile, key): - return - formula = getattr(profile, key) - dct = {} - for fkey in PARSE_FORMULA.findall(formula): - filtered = fkey.split('|') - initial_key = fkey[:] - fkey = filtered[0] - filters = [] - for filtr in filtered[1:]: - if filtr in FORMULA_FILTERS: - filters.append(FORMULA_FILTERS[filtr]) - if fkey.startswith('settings__'): - dct[fkey] = getattr(settings, fkey[len('settings__'):]) or '' - continue - obj = item - for k in fkey.split('__'): - try: - obj = getattr(obj, k) - except ObjectDoesNotExist: - obj = None - if hasattr(obj, 'all') and hasattr(obj, 'count'): # query manager - if not obj.count(): - break - obj = obj.all()[0] - elif callable(obj): - obj = obj() - if obj is None: - break - if obj is None: - dct[initial_key] = '' - else: - dct[initial_key] = str(obj) - for filtr in filters: - dct[initial_key] = filtr(dct[initial_key]) - values = formula.format(**dct).split('||') - value = values[0] - for filtr in values[1:]: - if filtr not in FORMULA_FILTERS: - value += '||' + filtr - continue - value = FORMULA_FILTERS[filtr](value) - return value - - class Language(GeneralType): iso_code = models.CharField(_("ISO code"), null=True, blank=True, max_length=2) + class Meta: verbose_name = _("Language") verbose_name_plural = _("Languages") @@ -3109,10 +978,6 @@ class IshtarSiteProfile(models.Model, Cached): return obj -def get_current_profile(force=False): - return IshtarSiteProfile.get_current_profile(force=force) - - def _profile_mapping(): return get_current_profile().mapping @@ -3357,141 +1222,6 @@ class StatsCache(models.Model): verbose_name_plural = _("Caches for stats") -def update_stats(statscache, item, funcname): - if not settings.USE_BACKGROUND_TASK: - current_values = statscache.values - if not current_values: - current_values = {} - value = getattr(item, funcname)() - current_values[funcname] = value - statscache.values = current_values - statscache.updated = datetime.datetime.now() - statscache.save() - return current_values - - now = datetime.datetime.now() - app_name = item._meta.app_label - model_name = item._meta.model_name - statscache.update_requested = now.isoformat() - statscache.save() - _update_stats.delay(app_name, model_name, item.pk, funcname) - return statscache.values - - -def __get_stats_cache_values(model_name, model_pk): - q = StatsCache.objects.filter( - model=model_name, model_pk=model_pk - ) - nb = q.count() - if nb >= 1: - sc = q.all()[0] - for extra in q.order_by("-id").all()[1:]: - extra.delete() - else: - sc = StatsCache.objects.create( - model=model_name, model_pk=model_pk - ) - values = sc.values - if not values: - values = {} - return sc, values - - -@task() -def _update_stats(app, model, model_pk, funcname): - model_name = app + "." + model - model = apps.get_model(app, model) - try: - item = model.objects.get(pk=model_pk) - except model.DoesNotExist: - return - value = getattr(item, funcname)() - sc, current_values = __get_stats_cache_values(model_name, model_pk) - current_values[funcname] = value - sc.values = current_values - sc.update_requested = None - sc.updated = datetime.datetime.now() - sc.save() - - -class DashboardFormItem(object): - """ - Provide methods to manage statistics - """ - - def last_stats_update(self): - model_name = self._meta.app_label + "." + self._meta.model_name - q = StatsCache.objects.filter( - model=model_name, model_pk=self.pk).order_by("-updated") - if not q.count(): - return - return q.all()[0].updated - - def _get_or_set_stats(self, funcname, update=False, - expected_type=None): - model_name = self._meta.app_label + "." + self._meta.model_name - sc, __ = StatsCache.objects.get_or_create( - model=model_name, model_pk=self.pk - ) - if not update: - values = sc.values - if funcname not in values: - if expected_type is not None: - return expected_type() - return 0 - else: - values = update_stats(sc, self, funcname) - if funcname in values: - values = values[funcname] - else: - values = 0 - if expected_type is not None and not isinstance(values, expected_type): - return expected_type() - return values - - @classmethod - def get_periods(cls, slice='month', fltr={}, date_source='creation'): - date_var = date_source + '_date' - q = cls.objects.filter(**{date_var + '__isnull': False}) - if fltr: - q = q.filter(**fltr) - if slice == 'year': - return [res[date_var].year for res in list(q.values(date_var) - .annotate( - Count("id")).order_by())] - elif slice == 'month': - return [(res[date_var].year, res[date_var].month) - for res in list(q.values(date_var) - .annotate(Count("id")).order_by())] - return [] - - @classmethod - def get_by_year(cls, year, fltr={}, date_source='creation'): - date_var = date_source + '_date' - q = cls.objects.filter(**{date_var + '__isnull': False}) - if fltr: - q = q.filter(**fltr) - return q.filter( - **{date_var + '__year': year}).order_by('pk').distinct('pk') - - @classmethod - def get_by_month(cls, year, month, fltr={}, date_source='creation'): - date_var = date_source + '_date' - q = cls.objects.filter(**{date_var + '__isnull': False}) - if fltr: - q = q.filter(**fltr) - q = q.filter( - **{date_var + '__year': year, date_var + '__month': month}) - return q.order_by('pk').distinct('pk') - - @classmethod - def get_total_number(cls, fltr=None): - q = cls.objects - if fltr: - q = q.filter(**fltr) - return q.order_by('pk').distinct('pk').count() - - class Dashboard(object): def __init__(self, model, slice='year', date_source=None, show_detail=None, fltr=None): @@ -3780,254 +1510,6 @@ class DocumentTemplate(models.Model): return output_name -class NumberManager(models.Manager): - def get_by_natural_key(self, number): - return self.get(number=number) - - -class State(models.Model): - label = models.CharField(_("Label"), max_length=30) - number = models.CharField(_("Number"), unique=True, max_length=3) - objects = NumberManager() - - class Meta: - verbose_name = _("State") - ordering = ['number'] - - def __str__(self): - return self.label - - def natural_key(self): - return (self.number,) - - -class Department(models.Model): - label = models.CharField(_("Label"), max_length=30) - number = models.CharField(_("Number"), unique=True, max_length=3) - state = models.ForeignKey( - 'State', verbose_name=_("State"), blank=True, null=True, - on_delete=models.SET_NULL, - ) - objects = NumberManager() - - class Meta: - verbose_name = _("Department") - verbose_name_plural = _("Departments") - ordering = ['number'] - - def __str__(self): - return self.label - - def natural_key(self): - return (self.number,) - - def history_compress(self): - return self.number - - @classmethod - def history_decompress(cls, full_value, create=False): - if not full_value: - return [] - res = [] - for value in full_value: - try: - res.append(cls.objects.get(number=value)) - except cls.DoesNotExist: - continue - return res - - -class Arrondissement(models.Model): - name = models.CharField("Nom", max_length=30) - department = models.ForeignKey(Department, verbose_name="Département") - - def __str__(self): - return settings.JOINT.join((self.name, str(self.department))) - - -class Canton(models.Model): - name = models.CharField("Nom", max_length=30) - arrondissement = models.ForeignKey(Arrondissement, - verbose_name="Arrondissement") - - def __str__(self): - return settings.JOINT.join( - (self.name, str(self.arrondissement))) - - -class TownManager(models.GeoManager): - def get_by_natural_key(self, numero_insee, year): - return self.get(numero_insee=numero_insee, year=year) - - -class Town(Imported, models.Model): - name = models.CharField(_("Name"), max_length=100) - surface = models.IntegerField(_("Surface (m2)"), blank=True, null=True) - center = models.PointField(_("Localisation"), srid=settings.SRID, - blank=True, null=True) - limit = models.MultiPolygonField(_("Limit"), blank=True, null=True) - numero_insee = models.CharField("Code commune (numéro INSEE)", - max_length=120) - departement = models.ForeignKey( - Department, verbose_name=_("Department"), - on_delete=models.SET_NULL, null=True, blank=True) - year = models.IntegerField( - _("Year of creation"), null=True, blank=True, - help_text=_("Filling this field is relevant to distinguish old towns " - "from new towns.")) - children = models.ManyToManyField( - 'Town', verbose_name=_("Town children"), blank=True, - related_name='parents') - cached_label = models.CharField(_("Cached name"), max_length=500, - null=True, blank=True, db_index=True) - objects = TownManager() - - class Meta: - verbose_name = _("Town") - verbose_name_plural = _("Towns") - if settings.COUNTRY == 'fr': - ordering = ['numero_insee'] - unique_together = (('numero_insee', 'year'),) - - def natural_key(self): - return (self.numero_insee, self.year) - - def history_compress(self): - values = {'numero_insee': self.numero_insee, - 'year': self.year or ""} - return values - - def get_values(self, prefix='', no_values=False, filtr=None, **kwargs): - values = {} - if not filtr or prefix in filtr or "label" in filtr: - if prefix: - values[prefix] = str(self) - else: - values['label'] = str(self) - if not filtr or prefix + "name" in filtr: - values[prefix + "name"] = self.name - if not filtr or prefix + "numero_insee" in filtr: - values[prefix + "numero_insee"] = self.numero_insee - return values - - @classmethod - def history_decompress(cls, full_value, create=False): - if not full_value: - return [] - res = [] - for value in full_value: - try: - res.append( - cls.objects.get(numero_insee=value['numero_insee'], - year=value['year'] or None)) - except cls.DoesNotExist: - continue - return res - - def __str__(self): - return self.cached_label or "" - - @property - def label_with_areas(self): - label = [self.name] - if self.numero_insee: - label.append("({})".format(self.numero_insee)) - for area in self.areas.all(): - label.append(" - ") - label.append(area.full_label) - return " ".join(label) - - def generate_geo(self, force=False): - force = self.generate_limit(force=force) - self.generate_center(force=force) - self.generate_area(force=force) - - def generate_limit(self, force=False): - if not force and self.limit: - return - parents = None - if not self.parents.count(): - return - for parent in self.parents.all(): - if not parent.limit: - return - if not parents: - parents = parent.limit - else: - parents = parents.union(parent.limit) - # if union is a simple polygon make it a multi - if 'MULTI' not in parents.wkt: - parents = parents.wkt.replace('POLYGON', 'MULTIPOLYGON(') + ")" - if not parents: - return - self.limit = parents - self.save() - return True - - def generate_center(self, force=False): - if not force and (self.center or not self.limit): - return - self.center = self.limit.centroid - if not self.center: - return False - self.save() - return True - - def generate_area(self, force=False): - if not force and (self.surface or not self.limit): - return - surface = self.limit.transform(settings.SURFACE_SRID, - clone=True).area - if surface > 214748364 or not surface: - return False - self.surface = surface - self.save() - return True - - def update_town_code(self): - if not self.numero_insee or not self.children.count() or not self.year: - return - old_num = self.numero_insee[:] - numero = old_num.split('-')[0] - self.numero_insee = "{}-{}".format(numero, self.year) - if self.numero_insee != old_num: - return True - - def _generate_cached_label(self): - cached_label = self.name - if settings.COUNTRY == "fr" and self.numero_insee: - dpt_len = 2 - if self.numero_insee.startswith('97') or \ - self.numero_insee.startswith('98') or \ - self.numero_insee[0] not in ('0', '1', '2', '3', '4', '5', - '6', '7', '8', '9'): - dpt_len = 3 - cached_label = "%s - %s" % (self.name, self.numero_insee[:dpt_len]) - if self.year and self.children.count(): - cached_label += " ({})".format(self.year) - return cached_label - - -def post_save_town(sender, **kwargs): - cached_label_changed(sender, **kwargs) - town = kwargs['instance'] - town.generate_geo() - if town.update_town_code(): - town.save() - - -post_save.connect(post_save_town, sender=Town) - - -def town_child_changed(sender, **kwargs): - town = kwargs['instance'] - if town.update_town_code(): - town.save() - - -m2m_changed.connect(town_child_changed, sender=Town.children.through) - - class Area(HierarchicalType): towns = models.ManyToManyField(Town, verbose_name=_("Towns"), blank=True, related_name='areas') @@ -4057,261 +1539,6 @@ class Area(HierarchicalType): return " / ".join(label) -class Address(BaseHistorizedItem): - FIELDS = ( - "address", "address_complement", "postal_code", "town", - "precise_town", "country", - "alt_address", "alt_address_complement", "alt_postal_code", "alt_town", - "alt_country", - "phone", "phone_desc", "phone2", "phone_desc2", "phone3", "phone_desc3", - "raw_phone", "mobile_phone", "email", "alt_address_is_prefered" - ) - address = models.TextField(_("Address"), null=True, blank=True) - address_complement = models.TextField(_("Address complement"), null=True, - blank=True) - postal_code = models.CharField(_("Postal code"), max_length=10, null=True, - blank=True) - town = models.CharField(_("Town (freeform)"), max_length=150, null=True, - blank=True) - precise_town = models.ForeignKey( - Town, verbose_name=_("Town (precise)"), null=True, blank=True) - country = models.CharField(_("Country"), max_length=30, null=True, - blank=True) - alt_address = models.TextField(_("Other address: address"), null=True, - blank=True) - alt_address_complement = models.TextField( - _("Other address: address complement"), null=True, blank=True) - alt_postal_code = models.CharField(_("Other address: postal code"), - max_length=10, null=True, blank=True) - alt_town = models.CharField(_("Other address: town"), max_length=70, - null=True, blank=True) - alt_country = models.CharField(_("Other address: country"), - max_length=30, null=True, blank=True) - phone = models.CharField(_("Phone"), max_length=18, null=True, blank=True) - phone_desc = models.CharField(_("Phone description"), max_length=300, - null=True, blank=True) - phone2 = models.CharField(_("Phone description 2"), max_length=18, - null=True, blank=True) - phone_desc2 = models.CharField(_("Phone description 2"), max_length=300, - null=True, blank=True) - phone3 = models.CharField(_("Phone 3"), max_length=18, null=True, - blank=True) - phone_desc3 = models.CharField(_("Phone description 3"), max_length=300, - null=True, blank=True) - raw_phone = models.TextField(_("Raw phone"), blank=True, null=True) - mobile_phone = models.CharField(_("Mobile phone"), max_length=18, - null=True, blank=True) - email = models.EmailField( - _("Email"), max_length=300, blank=True, null=True) - alt_address_is_prefered = models.BooleanField( - _("Alternative address is prefered"), default=False) - SUB_ADDRESSES = [] - - class Meta: - abstract = True - - def get_short_html_items(self): - items = [] - if self.address: - items.append( - """<span class="subadress">{}</span>""".format(self.address)) - if self.address_complement: - items.append( - """<span class="subadress-complement">{}</span>""".format( - self.address_complement)) - if self.postal_code: - items.append( - """<span class="postal-code">{}</span>""".format( - self.postal_code)) - if self.precise_town: - items.append( - """<span class="town">{}</span>""".format( - self.precise_town.name)) - elif self.town: - items.append( - """<span class="town">{}</span>""".format( - self.town)) - if self.country: - items.append( - """<span class="country">{}</span>""".format( - self.country)) - return items - - def get_short_html_detail(self): - html = """<div class="address">""" - items = self.get_short_html_items() - if not items: - items = [ - "<span class='no-address'>{}</span>".format( - _("No associated address") - ) - ] - html += "".join(items) - html += """</div>""" - return html - - def get_town_centroid(self): - if self.precise_town: - return self.precise_town.center, self._meta.verbose_name - for sub_address in self.SUB_ADDRESSES: - sub_item = getattr(self, sub_address) - if sub_item and sub_item.precise_town: - return sub_item.precise_town.center, sub_item._meta.verbose_name - - def get_town_polygons(self): - if self.precise_town: - return self.precise_town.limit, self._meta.verbose_name - for sub_address in self.SUB_ADDRESSES: - sub_item = getattr(self, sub_address) - if sub_item and sub_item.precise_town: - return sub_item.precise_town.limit, sub_item._meta.verbose_name - - def get_attribute(self, attr): - if self.town or self.precise_town: - return getattr(self, attr) - for sub_address in self.SUB_ADDRESSES: - sub_item = getattr(self, sub_address) - if not sub_item: - continue - if sub_item.town or sub_item.precise_town: - return getattr(sub_item, attr) - return getattr(self, attr) - - def get_address(self): - return self.get_attribute("address") - - def get_address_complement(self): - return self.get_attribute("address_complement") - - def get_postal_code(self): - return self.get_attribute("postal_code") - - def get_town(self): - return self.get_attribute("town") - - def get_precise_town(self): - return self.get_attribute("precise_town") - - def get_country(self): - return self.get_attribute("country") - - def simple_lbl(self): - return str(self) - - def full_address(self): - lbl = self.simple_lbl() - if lbl: - lbl += "\n" - lbl += self.address_lbl() - return lbl - - def address_lbl(self): - lbl = u'' - prefix = '' - if self.alt_address_is_prefered: - prefix = 'alt_' - if getattr(self, prefix + 'address'): - lbl += getattr(self, prefix + 'address') - if getattr(self, prefix + 'address_complement'): - if lbl: - lbl += "\n" - lbl += getattr(self, prefix + 'address_complement') - postal_code = getattr(self, prefix + 'postal_code') - town = getattr(self, prefix + 'town') - if postal_code or town: - if lbl: - lbl += "\n" - lbl += "{}{}{}".format( - postal_code or '', - " " if postal_code and town else '', - town or '') - if self.phone: - if lbl: - lbl += "\n" - lbl += "{} {}".format(str(_("Tel: ")), self.phone) - if self.mobile_phone: - if lbl: - lbl += "\n" - lbl += "{} {}".format(str(_("Mobile: ")), self.mobile_phone) - if self.email: - if lbl: - lbl += "\n" - lbl += "{} {}".format(str(_("Email: ")), self.email) - return lbl - - -class Merge(models.Model): - merge_key = models.TextField(_("Merge key"), blank=True, null=True) - merge_candidate = models.ManyToManyField("self", blank=True) - merge_exclusion = models.ManyToManyField("self", blank=True) - archived = models.NullBooleanField(default=False, - blank=True, null=True) - # 1 for one word similarity, 2 for two word similarity, etc. - MERGE_CLEMENCY = None - EMPTY_MERGE_KEY = '--' - MERGE_ATTRIBUTE = "name" - - class Meta: - abstract = True - - def generate_merge_key(self): - if self.archived: - return - merge_attr = getattr(self, self.MERGE_ATTRIBUTE) - self.merge_key = slugify(merge_attr if merge_attr else '') - if not self.merge_key: - self.merge_key = self.EMPTY_MERGE_KEY - self.merge_key = self.merge_key - - def generate_merge_candidate(self): - if self.archived: - return - if not self.merge_key: - self.generate_merge_key() - self.save(merge_key_generated=True) - if not self.pk or self.merge_key == self.EMPTY_MERGE_KEY: - return - q = self.__class__.objects \ - .exclude(pk=self.pk) \ - .exclude(merge_exclusion=self) \ - .exclude(merge_candidate=self) \ - .exclude(archived=True) - if not self.MERGE_CLEMENCY: - q = q.filter(merge_key=self.merge_key) - else: - subkeys_front = "-".join( - self.merge_key.split('-')[:self.MERGE_CLEMENCY]) - subkeys_back = "-".join( - self.merge_key.split('-')[-self.MERGE_CLEMENCY:]) - q = q.filter(Q(merge_key__istartswith=subkeys_front) | - Q(merge_key__iendswith=subkeys_back)) - for item in q.all(): - self.merge_candidate.add(item) - - def save(self, *args, **kwargs): - # prevent circular save - merge_key_generated = False - if 'merge_key_generated' in kwargs: - merge_key_generated = kwargs.pop('merge_key_generated') - self.generate_merge_key() - item = super(Merge, self).save(*args, **kwargs) - if not merge_key_generated: - self.merge_candidate.clear() - self.generate_merge_candidate() - return item - - def archive(self): - self.archived = True - self.save() - self.merge_candidate.clear() - self.merge_exclusion.clear() - - def merge(self, item, keep_old=False, exclude_fields=None): - merge_model_objects(self, item, keep_old=keep_old, - exclude_fields=exclude_fields) - self.generate_merge_candidate() - - class OrganizationType(GeneralType): class Meta: verbose_name = _("Organization type") @@ -5704,15 +2931,12 @@ class Document(BaseHistorizedItem, QRCodeItem, OwnPerms, ImageModel, verbose_name=_("Receipt date in documentation")) item_number = models.IntegerField(_("Number of items"), default=1) description = models.TextField(_("Description"), blank=True, null=True) - container = models.ForeignKey( - "archaeological_warehouse.Container", verbose_name=_("Container"), - blank=True, null=True, related_name='contained_documents', - on_delete=models.SET_NULL) - container_ref = models.ForeignKey( - "archaeological_warehouse.Container", - verbose_name=_("Reference container"), - blank=True, null=True, - related_name='contained_documents_ref', on_delete=models.SET_NULL) + container_id = models.PositiveIntegerField( + verbose_name=_("Container ID"), blank=True, null=True) + # container = models.ForeignKey("archaeological_warehouse.Container") + container_ref_id = models.PositiveIntegerField( + verbose_name=_("Container ID"), blank=True, null=True) + # container_ref = models.ForeignKey("archaeological_warehouse.Container") comment = models.TextField(_("Comment"), blank=True, null=True) additional_information = models.TextField(_("Additional information"), blank=True, null=True) @@ -5750,6 +2974,26 @@ class Document(BaseHistorizedItem, QRCodeItem, OwnPerms, ImageModel, def natural_key(self): return (self.external_id,) + @property + def container(self): + if not self.container_id: + return + Container = apps.get_model("archaeological_warehouse", "Container") + try: + return Container.objects.get(pk=self.container_id) + except Container.DoesNotExist: + return + + @property + def container_ref(self): + if not self.container_ref_id: + return + Container = apps.get_model("archaeological_warehouse", "Container") + try: + return Container.objects.get(pk=self.container_ref_id) + except Container.DoesNotExist: + return + """ @property def code(self): @@ -6177,6 +3421,16 @@ class Document(BaseHistorizedItem, QRCodeItem, OwnPerms, ImageModel, self.set_index() if not self.associated_url: self.associated_url = None + container = self.container + if self.container_id and not container: + self.container_id = None + if container and container.pk != self.container_id: + self.container_id = container.pk + container_ref = self.container_ref + if self.container_ref_id and not container_ref: + self.container_ref_id = None + if container_ref and container_ref.pk != self.container_ref_id: + self.container_ref_id = container_ref.pk super(Document, self).save(*args, **kwargs) if self.image and not no_path_change and \ @@ -6190,42 +3444,6 @@ class Document(BaseHistorizedItem, QRCodeItem, OwnPerms, ImageModel, self.save(no_path_change=True) -def document_attached_changed(sender, **kwargs): - # associate a default main image - instance = kwargs.get("instance", None) - model = kwargs.get("model", None) - pk_set = kwargs.get("pk_set", None) - if not instance or not model: - return - - if hasattr(instance, "documents"): - items = [instance] - else: - if not pk_set: - return - try: - items = [model.objects.get(pk=pk) for pk in pk_set] - except model.DoesNotExist: - return - - for item in items: - q = item.documents.filter( - image__isnull=False).exclude(image='') - if item.main_image: - if q.filter(pk=item.main_image.pk).count(): - return - # the association has disappear not the main image anymore - item.main_image = None - item.skip_history_when_saving = True - item.save() - if not q.count(): - return - # by default get the lowest pk - item.main_image = q.order_by('pk').all()[0] - item.skip_history_when_saving = True - item.save() - - post_save.connect(cached_label_changed, sender=Document) diff --git a/ishtar_common/models_common.py b/ishtar_common/models_common.py new file mode 100644 index 000000000..b7685b8b5 --- /dev/null +++ b/ishtar_common/models_common.py @@ -0,0 +1,2777 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Generic models and tools for models +""" + +""" +from ishtar_common.models import GeneralType, get_external_id, \ + LightHistorizedItem, OwnPerms, Address, post_save_cache, \ + DashboardFormItem, document_attached_changed, SearchAltName, \ + DynamicRequest, GeoItem, QRCodeItem, SearchVectorConfig, DocumentItem, \ + QuickAction, MainItem, Merge + + +""" + +import copy +from collections import OrderedDict +import datetime +import json +import logging +import os +import pyqrcode +import shutil +import tempfile +import time + +from django import forms +from django.apps import apps +from django.conf import settings +from django.contrib.auth.models import User, Group +from django.contrib.contenttypes.models import ContentType +from django.contrib.gis.db import models +from django.contrib.postgres.fields import JSONField +from django.contrib.postgres.search import SearchVectorField, SearchVector +from django.contrib.sites.models import Site +from django.core.cache import cache +from django.core.exceptions import ObjectDoesNotExist +from django.core.files import File +from django.core.serializers import serialize +from django.core.urlresolvers import reverse, NoReverseMatch +from django.core.validators import validate_slug +from django.db import connection +from django.db.models import Q, Count +from django.db.models.signals import post_save, post_delete, m2m_changed +from django.template.defaultfilters import slugify +from django.utils.safestring import SafeText, mark_safe +from django.utils.translation import activate, deactivate +from ishtar_common.utils import ugettext_lazy as _, \ + pgettext_lazy, get_image_path +from simple_history.models import HistoricalRecords as BaseHistoricalRecords +from simple_history.signals import post_create_historical_record, \ + pre_create_historical_record +from unidecode import unidecode + +from ishtar_common.model_managers import TypeManager +from ishtar_common.model_merging import merge_model_objects +from ishtar_common.models_imports import Import +from ishtar_common.templatetags.link_to_window import simple_link_to_window +from ishtar_common.utils import get_cache, disable_for_loaddata, \ + get_all_field_names, merge_tsvectors, cached_label_changed, post_save_geo, \ + task, duplicate_item, get_external_id, get_current_profile + +""" +from ishtar_common.models import get_external_id, \ + LightHistorizedItem, OwnPerms, Address, post_save_cache, \ + DashboardFormItem, document_attached_changed, SearchAltName, \ + DynamicRequest, GeoItem, QRCodeItem, SearchVectorConfig, DocumentItem, \ + QuickAction, MainItem, Merge + + +""" + +logger = logging.getLogger(__name__) + + +class CachedGen(object): + @classmethod + def refresh_cache(cls): + raise NotImplementedError() + + @classmethod + def _add_cache_key_to_refresh(cls, keys): + cache_ckey, current_keys = get_cache(cls, ['_current_keys']) + if type(current_keys) != list: + current_keys = [] + if keys not in current_keys: + current_keys.append(keys) + cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT) + + +class Cached(CachedGen): + slug_field = 'txt_idx' + + @classmethod + def refresh_cache(cls): + cache_ckey, current_keys = get_cache(cls, ['_current_keys']) + if not current_keys: + return + for keys in current_keys: + if len(keys) == 2 and keys[0] == '__slug': + cls.get_cache(keys[1], force=True) + elif keys[0] == '__get_types': + default = None + empty_first = True + exclude = [] + if len(keys) >= 2: + default = keys.pop() + if len(keys) > 1: + empty_first = bool(keys.pop()) + exclude = keys[1:] + cls.get_types( + exclude=exclude, empty_first=empty_first, default=default, + force=True) + elif keys[0] == '__get_help': + cls.get_help(force=True) + + @classmethod + def _add_cache_key_to_refresh(cls, keys): + cache_ckey, current_keys = get_cache(cls, ['_current_keys']) + if type(current_keys) != list: + current_keys = [] + if keys not in current_keys: + current_keys.append(keys) + cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT) + + @classmethod + def get_cache(cls, slug, force=False): + cache_key, value = get_cache(cls, ['__slug', slug]) + if not force and value: + return value + try: + k = {cls.slug_field: slug} + obj = cls.objects.get(**k) + cache.set(cache_key, obj, settings.CACHE_TIMEOUT) + return obj + except cls.DoesNotExist: + cache.set(cache_key, None, settings.CACHE_TIMEOUT) + return None + + +@disable_for_loaddata +def post_save_cache(sender, **kwargs): + sender.refresh_cache() + + +class GeneralType(Cached, models.Model): + """ + Abstract class for "types" + """ + label = models.TextField(_("Label")) + txt_idx = models.TextField( + _("Textual ID"), validators=[validate_slug], + unique=True, + help_text=_( + "The slug is the standardized version of the name. It contains " + "only lowercase letters, numbers and hyphens. Each slug must " + "be unique.")) + comment = models.TextField(_("Comment"), blank=True, null=True) + available = models.BooleanField(_("Available"), default=True) + HELP_TEXT = "" + objects = TypeManager() + + class Meta: + abstract = True + + def __str__(self): + return self.label + + def natural_key(self): + return (self.txt_idx,) + + def history_compress(self): + return self.txt_idx + + @classmethod + def history_decompress(cls, value, create=False): + if not value: + return [] + res = [] + for txt_idx in value: + try: + res.append(cls.objects.get(txt_idx=txt_idx)) + except cls.DoesNotExist: + continue + return res + + @property + def explicit_label(self): + return "{} ({})".format(self.label, self._meta.verbose_name) + + @classmethod + def create_default_for_test(cls): + return [cls.objects.create(label='Test %d' % i) for i in range(5)] + + @property + def short_label(self): + return self.label + + @property + def name(self): + return self.label + + @classmethod + def get_or_create(cls, slug, label=''): + """ + Get or create a new item. + + :param slug: textual id + :param label: label for initialization if the item doesn't exist (not + mandatory) + + :return: instancied item of the base class + """ + + item = cls.get_cache(slug) + if item: + return item + item, created = cls.objects.get_or_create( + txt_idx=slug, defaults={'label': label}) + return item + + @classmethod + def get_or_create_pk(cls, slug): + """ + Get an id from a slug. Create the associated item if needed. + + :param slug: textual id + + :return: id of the item (string) + """ + return str(cls.get_or_create(slug).pk) + + @classmethod + def get_or_create_pks(cls, slugs): + """ + Get and merge a list of ids from a slug list. Create the associated + items if needed. + + :param slugs: textual ids + + :return: string with ids separated by "_" + """ + items = [] + for slug in slugs: + items.append(str(cls.get_or_create(slug).pk)) + return "_".join(items) + + @classmethod + def get_help(cls, dct=None, exclude=None, force=False, full_hierarchy=None): + if not dct: + dct = {} + if not exclude: + exclude = [] + keys = ['__get_help'] + keys += ["{}".format(ex) for ex in exclude] + keys += ['{}-{}'.format(str(k), dct[k]) for k in dct] + cache_key, value = get_cache(cls, keys) + if value and not force: + return mark_safe(value) + help_text = cls.HELP_TEXT + c_rank = -1 + help_items = "\n" + for item in cls.get_types(dct=dct, instances=True, exclude=exclude): + if hasattr(item, '__iter__'): + pk = item[0] + item = cls.objects.get(pk=pk) + item.rank = c_rank + 1 + if hasattr(item, 'parent'): + c_item = item + parents = [] + while c_item.parent: + parents.append(c_item.parent.label) + c_item = c_item.parent + parents.reverse() + parents.append(item.label) + item.label = " / ".join(parents) + if not item.comment: + continue + if c_rank > item.rank: + help_items += "</dl>\n" + elif c_rank < item.rank: + help_items += "<dl>\n" + c_rank = item.rank + help_items += "<dt>%s</dt><dd>%s</dd>" % ( + item.label, "<br/>".join(item.comment.split('\n'))) + c_rank += 1 + if c_rank: + help_items += c_rank * "</dl>" + if help_text or help_items != u'\n': + help_text = help_text + help_items + else: + help_text = "" + cache.set(cache_key, help_text, settings.CACHE_TIMEOUT) + return mark_safe(help_text) + + @classmethod + def _get_initial_types(cls, initial, type_pks, instance=False): + new_vals = [] + if not initial: + return [] + if type(initial) not in (list, tuple): + initial = [initial] + for value in initial: + try: + pk = int(value) + except (ValueError, TypeError): + continue + if pk in type_pks: + continue + try: + extra_type = cls.objects.get(pk=pk) + if instance: + new_vals.append(extra_type) + else: + new_vals.append((extra_type.pk, str(extra_type))) + except cls.DoesNotExist: + continue + return new_vals + + @classmethod + def get_types(cls, dct=None, instances=False, exclude=None, + empty_first=True, default=None, initial=None, force=False, + full_hierarchy=False): + if not dct: + dct = {} + if not exclude: + exclude = [] + types = [] + if not instances and empty_first and not default: + types = [('', '--')] + types += cls._pre_get_types(dct, instances, exclude, + default, force, + get_full_hierarchy=full_hierarchy) + if not initial: + return types + new_vals = cls._get_initial_types(initial, [idx for idx, lbl in types]) + types += new_vals + return types + + @classmethod + def _pre_get_types(cls, dct=None, instances=False, exclude=None, + default=None, force=False, get_full_hierarchy=False): + if not dct: + dct = {} + if not exclude: + exclude = [] + # cache + cache_key = None + if not instances: + keys = ['__get_types'] + keys += ["{}".format(ex) for ex in exclude] + \ + ["{}".format(default)] + keys += ['{}-{}'.format(str(k), dct[k]) for k in dct] + cache_key, value = get_cache(cls, keys) + if value and not force: + return value + base_dct = dct.copy() + if hasattr(cls, 'parent'): + if not cache_key: + return cls._get_parent_types( + base_dct, instances, exclude=exclude, + default=default, get_full_hierarchy=get_full_hierarchy) + vals = [v for v in cls._get_parent_types( + base_dct, instances, exclude=exclude, + default=default, get_full_hierarchy=get_full_hierarchy)] + cache.set(cache_key, vals, settings.CACHE_TIMEOUT) + return vals + + if not cache_key: + return cls._get_types(base_dct, instances, exclude=exclude, + default=default) + vals = [ + v for v in cls._get_types(base_dct, instances, exclude=exclude, + default=default) + ] + cache.set(cache_key, vals, settings.CACHE_TIMEOUT) + return vals + + @classmethod + def _get_types(cls, dct=None, instances=False, exclude=None, default=None): + if not dct: + dct = {} + if not exclude: + exclude = [] + dct['available'] = True + if default: + try: + default = cls.objects.get(txt_idx=default) + yield (default.pk, _(str(default))) + except cls.DoesNotExist: + pass + items = cls.objects.filter(**dct) + if default and default != "None": + if hasattr(default, 'txt_idx'): + exclude.append(default.txt_idx) + else: + exclude.append(default) + if exclude: + items = items.exclude(txt_idx__in=exclude) + for item in items.order_by(*cls._meta.ordering).all(): + if instances: + item.rank = 0 + yield item + else: + yield (item.pk, _(str(item)) if item and str(item) else '') + + @classmethod + def _get_childs_list(cls, dct=None, exclude=None, instances=False): + if not dct: + dct = {} + if not exclude: + exclude = [] + if 'parent' in dct: + dct.pop('parent') + childs = cls.objects.filter(**dct) + if exclude: + childs = childs.exclude(txt_idx__in=exclude) + if hasattr(cls, 'order'): + childs = childs.order_by('order') + res = {} + if instances: + for item in childs.all(): + parent_id = item.parent_id or 0 + if parent_id not in res: + res[parent_id] = [] + res[parent_id].append(item) + else: + for item in childs.values("id", "parent_id", "label").all(): + parent_id = item["parent_id"] or 0 + if item["id"] == item["parent_id"]: + parent_id = 0 + if parent_id not in res: + res[parent_id] = [] + res[parent_id].append((item["id"], item["label"])) + return res + + PREFIX = "│ " + PREFIX_EMPTY = " " + PREFIX_MEDIUM = "├ " + PREFIX_LAST = "└ " + PREFIX_CODES = ["\u2502", "\u251C", "\u2514"] + + @classmethod + def _get_childs(cls, item, child_list, prefix=0, instances=False, + is_last=False, last_of=None, get_full_hierarchy=False): + if not last_of: + last_of = [] + + prefix += 1 + current_child_lst = [] + if item in child_list: + current_child_lst = child_list[item] + + lst = [] + total = len(current_child_lst) + full_hierarchy_initial = get_full_hierarchy + for idx, child in enumerate(current_child_lst): + mylast_of = last_of[:] + p = '' + if instances: + child.rank = prefix + lst.append(child) + else: + if full_hierarchy_initial: + if isinstance(full_hierarchy_initial, str): + p = full_hierarchy_initial + " > " + else: + p = "" + else: + cprefix = prefix + while cprefix: + cprefix -= 1 + if not cprefix: + if (idx + 1) == total: + p += cls.PREFIX_LAST + else: + p += cls.PREFIX_MEDIUM + elif is_last: + if mylast_of: + clast = mylast_of.pop(0) + if clast: + p += cls.PREFIX_EMPTY + else: + p += cls.PREFIX + else: + p += cls.PREFIX_EMPTY + else: + p += cls.PREFIX + lst.append(( + child[0], SafeText(p + str(_(child[1]))) + )) + clast_of = last_of[:] + clast_of.append(idx + 1 == total) + if instances: + child_id = child.id + else: + child_id = child[0] + if get_full_hierarchy: + if p: + if not p.endswith(" > "): + p += " > " + get_full_hierarchy = p + child[1] + else: + get_full_hierarchy = child[1] + for sub_child in cls._get_childs( + child_id, child_list, prefix, instances, + is_last=((idx + 1) == total), last_of=clast_of, + get_full_hierarchy=get_full_hierarchy): + lst.append(sub_child) + return lst + + @classmethod + def _get_parent_types(cls, dct=None, instances=False, exclude=None, + default=None, get_full_hierarchy=False): + if not dct: + dct = {} + if not exclude: + exclude = [] + dct['available'] = True + child_list = cls._get_childs_list(dct, exclude, instances) + + if 0 in child_list: + for item in child_list[0]: + if instances: + item.rank = 0 + item_id = item.pk + yield item + else: + item_id = item[0] + yield item + if get_full_hierarchy: + get_full_hierarchy = item[1] + for child in cls._get_childs( + item_id, child_list, instances=instances, + get_full_hierarchy=get_full_hierarchy): + yield child + + def save(self, *args, **kwargs): + ItemKey = apps.get_model("ishtar_common", "ItemKey") + if not self.id and not self.label: + txt_idx = self.txt_idx + if isinstance(txt_idx, list): + txt_idx = txt_idx[0] + self.txt_idx = txt_idx + self.label = " ".join(" ".join(self.txt_idx.split('-')) + .split('_')).title() + if not self.txt_idx: + self.txt_idx = slugify(self.label)[:100] + + # clean old keys + if self.pk: + old = self.__class__.objects.get(pk=self.pk) + content_type = ContentType.objects.get_for_model(self.__class__) + if slugify(self.label) != slugify(old.label): + ItemKey.objects.filter( + object_id=self.pk, key=slugify(old.label), + content_type=content_type).delete() + if self.txt_idx != old.txt_idx: + ItemKey.objects.filter( + object_id=self.pk, key=old.txt_idx, + content_type=content_type).delete() + + obj = super(GeneralType, self).save(*args, **kwargs) + self.generate_key(force=True) + return obj + + def add_key(self, key, force=False, importer=None, group=None, + user=None): + ItemKey = apps.get_model("ishtar_common", "ItemKey") + content_type = ContentType.objects.get_for_model(self.__class__) + if not importer and not force and ItemKey.objects.filter( + key=key, content_type=content_type).count(): + return + filtr = {'key': key, 'content_type': content_type} + if group: + filtr['group'] = group + elif user: + filtr['user'] = user + else: + filtr['importer'] = importer + if force: + ItemKey.objects.filter(**filtr).exclude(object_id=self.pk).delete() + filtr['object_id'] = self.pk + ItemKey.objects.get_or_create(**filtr) + + def generate_key(self, force=False): + for key in (slugify(self.label), self.txt_idx): + self.add_key(key) + + def get_keys(self, importer): + ItemKey = apps.get_model("ishtar_common", "ItemKey") + keys = [self.txt_idx] + content_type = ContentType.objects.get_for_model(self.__class__) + base_q = Q(content_type=content_type, object_id=self.pk) + subquery = Q(importer__isnull=True, user__isnull=True, + group__isnull=True) + subquery |= Q(user__isnull=True, group__isnull=True, + importer=importer) + if importer.user: + subquery |= Q(user=importer.user, group__isnull=True, + importer=importer) + if importer.associated_group: + subquery |= Q(user__isnull=True, group=importer.associated_group, + importer=importer) + q = ItemKey.objects.filter(base_q & subquery) + for ik in q.exclude(key=self.txt_idx).all(): + keys.append(ik.key) + return keys + + @classmethod + def generate_keys(cls): + # content_type = ContentType.objects.get_for_model(cls) + for item in cls.objects.all(): + item.generate_key() + + +class HierarchicalType(GeneralType): + parent = models.ForeignKey('self', blank=True, null=True, + on_delete=models.SET_NULL, + verbose_name=_("Parent")) + + class Meta: + abstract = True + + def full_label(self): + lbls = [self.label] + item = self + while item.parent: + item = item.parent + lbls.append(item.label) + return " > ".join(reversed(lbls)) + + +class StatisticItem: + STATISTIC_MODALITIES = [] # example: "year", "operation_type__label" + STATISTIC_MODALITIES_OPTIONS = OrderedDict() # example: + # OrderedDict([('year', _("Year")), + # ("operation_type__label", _("Operation type"))]) + STATISTIC_SUM_VARIABLE = OrderedDict( + (("pk", (_("Number"), 1)),) + ) # example: "Price", "Volume" - the number is a multiplier + + +class TemplateItem: + @classmethod + def _label_templates_q(cls): + model_name = "{}.{}".format( + cls.__module__, cls.__name__) + q = Q(associated_model__klass=model_name, + for_labels=True, available=True) + alt_model_name = model_name.replace( + "models_finds", "models").replace( + "models_treatments", "models") + if alt_model_name != model_name: + q |= Q(associated_model__klass=model_name, + for_labels=True, available=True) + DocumentTemplate = apps.get_model("ishtar_common", "DocumentTemplate") + return DocumentTemplate.objects.filter(q) + + @classmethod + def has_label_templates(cls): + return cls._label_templates_q().count() + + @classmethod + def label_templates(cls): + return cls._label_templates_q() + + def get_extra_templates(self, request): + cls = self.__class__ + templates = [] + name = str(cls.__name__) + module = str(cls.__module__) + if "archaeological_finds" in module: + if "models_finds" in name or "models_treatments" in name: + names = [ + name, + name.replace("models_finds", "models" + ).replace("models_treatments", "models") + ] + else: + names = [name, name.replace("models", "models_finds"), + name.replace("models", "models_treatments")] + else: + names = [name] + model_names = [ + "{}.{}".format(module, name) for name in names + ] + DocumentTemplate = apps.get_model("ishtar_common", "DocumentTemplate") + q = DocumentTemplate.objects.filter( + associated_model__klass__in=model_names, + for_labels=False, available=True) + for template in q.all(): + urlname = "generate-document" + templates.append( + (template.name, reverse( + urlname, args=[template.slug, self.pk])) + ) + return templates + + +class FullSearch(models.Model): + search_vector = SearchVectorField(_("Search vector"), blank=True, null=True, + help_text=_("Auto filled at save")) + + EXTRA_REQUEST_KEYS = {} + DYNAMIC_REQUESTS = {} + ALT_NAMES = {} + + BASE_SEARCH_VECTORS = [] + PROPERTY_SEARCH_VECTORS = [] + INT_SEARCH_VECTORS = [] + M2M_SEARCH_VECTORS = [] + PARENT_SEARCH_VECTORS = [] + # prevent circular dependency + PARENT_ONLY_SEARCH_VECTORS = [] + + class Meta: + abstract = True + + @classmethod + def general_types(cls): + for k in get_all_field_names(cls): + field = cls._meta.get_field(k) + if not hasattr(field, 'rel') or not field.rel: + continue + rel_model = field.rel.to + if issubclass(rel_model, (GeneralType, HierarchicalType)): + yield k + + @classmethod + def get_alt_names(cls): + alt_names = cls.ALT_NAMES.copy() + for dr_k in cls.DYNAMIC_REQUESTS: + alt_names.update(cls.DYNAMIC_REQUESTS[dr_k].get_alt_names()) + return alt_names + + @classmethod + def get_query_parameters(cls): + query_parameters = {} + for v in cls.get_alt_names().values(): + for language_code, language_lbl in settings.LANGUAGES: + activate(language_code) + query_parameters[str(v.search_key)] = v + deactivate() + return query_parameters + + def _update_search_field(self, search_vector_conf, search_vectors, data): + for value in search_vector_conf.format(data): + with connection.cursor() as cursor: + cursor.execute("SELECT to_tsvector(%s, %s)", [ + search_vector_conf.language, value]) + row = cursor.fetchone() + search_vectors.append(row[0]) + + def _update_search_number_field(self, search_vectors, val): + search_vectors.append("'{}':1".format(val)) + + def update_search_vector(self, save=True, exclude_parent=False): + """ + Update the search vector + :param save: True if you want to save the object immediately + :return: True if modified + """ + if not hasattr(self, 'search_vector'): + return + if not self.pk: + # logger.warning("Cannot update search vector before save or " + # "after deletion.") + return + if not self.BASE_SEARCH_VECTORS and not self.M2M_SEARCH_VECTORS \ + and not self.INT_SEARCH_VECTORS \ + and not self.PROPERTY_SEARCH_VECTORS \ + and not self.PARENT_SEARCH_VECTORS: + logger.warning("No search_vectors defined for {}".format( + self.__class__)) + return + if getattr(self, '_search_updated', None): + return + JsonDataField = apps.get_model("ishtar_common", "JsonDataField") + self._search_updated = True + + old_search = "" + if self.search_vector: + old_search = self.search_vector[:] + search_vectors = [] + base_q = self.__class__.objects.filter(pk=self.pk) + + # many to many have to be queried one by one otherwise only one is fetch + for m2m_search_vector in self.M2M_SEARCH_VECTORS: + key = m2m_search_vector.key.split('__')[0] + rel_key = getattr(self, key) + for item in rel_key.values('pk').all(): + query_dct = {key + "__pk": item['pk']} + q = copy.copy(base_q).filter(**query_dct) + q = q.annotate( + search=SearchVector( + m2m_search_vector.key, + config=m2m_search_vector.language) + ).values('search') + search_vectors.append(q.all()[0]['search']) + + # int/float are not well managed by the SearchVector + for int_search_vector in self.INT_SEARCH_VECTORS: + q = base_q.values(int_search_vector.key) + for val in int_search_vector.format( + q.all()[0][int_search_vector.key]): + self._update_search_number_field(search_vectors, val) + + if not exclude_parent: + # copy parent vector fields + for PARENT_SEARCH_VECTOR in self.PARENT_SEARCH_VECTORS: + parent = getattr(self, PARENT_SEARCH_VECTOR) + if hasattr(parent, 'all'): # m2m + for p in parent.all(): + search_vectors.append(p.search_vector) + elif parent: + search_vectors.append(parent.search_vector) + + for PARENT_ONLY_SEARCH_VECTOR in self.PARENT_ONLY_SEARCH_VECTORS: + parent = getattr(self, PARENT_ONLY_SEARCH_VECTOR) + if hasattr(parent, 'all'): # m2m + for p in parent.all(): + search_vectors.append( + p.update_search_vector(save=False, exclude_parent=True) + ) + elif parent: + search_vectors.append( + parent.update_search_vector(save=False, exclude_parent=True) + ) + + if self.BASE_SEARCH_VECTORS: + # query "simple" fields + q = base_q.values(*[sv.key for sv in self.BASE_SEARCH_VECTORS]) + res = q.all()[0] + for base_search_vector in self.BASE_SEARCH_VECTORS: + data = res[base_search_vector.key] + data = unidecode(str(data)) + self._update_search_field(base_search_vector, + search_vectors, data) + + if self.PROPERTY_SEARCH_VECTORS: + for property_search_vector in self.PROPERTY_SEARCH_VECTORS: + data = getattr(self, property_search_vector.key) + if callable(data): + data = data() + if not data: + continue + data = str(data) + self._update_search_field(property_search_vector, + search_vectors, data) + + if hasattr(self, 'data') and self.data: + content_type = ContentType.objects.get_for_model(self) + for json_field in JsonDataField.objects.filter( + content_type=content_type, + search_index=True).all(): + data = copy.deepcopy(self.data) + no_data = False + for key in json_field.key.split('__'): + if key not in data: + no_data = True + break + data = data[key] + if no_data or not data: + continue + + if json_field.value_type == 'B': + if data is True: + data = json_field.name + else: + continue + elif json_field.value_type in ('I', 'F'): + self._update_search_number_field(search_vectors, data) + continue + elif json_field.value_type == 'D': + # only index year + self._update_search_number_field(search_vectors, data.year) + continue + for lang in ("simple", settings.ISHTAR_SEARCH_LANGUAGE): + with connection.cursor() as cursor: + cursor.execute("SELECT to_tsvector(%s, %s)", + [lang, data]) + row = cursor.fetchone() + search_vectors.append(row[0]) + new_search_vector = merge_tsvectors(search_vectors) + changed = old_search != new_search_vector + self.search_vector = new_search_vector + if save and changed: + self.__class__.objects.filter(pk=self.pk).update( + search_vector=new_search_vector) + elif not save: + return new_search_vector + return changed + + +class Imported(models.Model): + imports = models.ManyToManyField( + Import, blank=True, + related_name="imported_%(app_label)s_%(class)s") + + class Meta: + abstract = True + + +class JsonData(models.Model, CachedGen): + data = JSONField(default={}, blank=True) + + class Meta: + abstract = True + + def pre_save(self): + if not self.data: + self.data = {} + + @property + def json_sections(self): + sections = [] + try: + content_type = ContentType.objects.get_for_model(self) + except ContentType.DoesNotExists: + return sections + JsonDataField = apps.get_model("ishtar_common", "JsonDataField") + fields = list(JsonDataField.objects.filter( + content_type=content_type, display=True, section__isnull=True + ).all()) # no section fields + + fields += list(JsonDataField.objects.filter( + content_type=content_type, display=True, section__isnull=False + ).order_by('section__order', 'order').all()) + + for field in fields: + value = None + data = self.data.copy() + for key in field.key.split('__'): + if key in data: + value = copy.copy(data[key]) + data = data[key] + else: + value = None + break + if value is None: + continue + if type(value) in (list, tuple): + value = " ; ".join([str(v) for v in value]) + section_name = field.section.name if field.section else None + if not sections or section_name != sections[-1][0]: + # if section name is identical it is the same + sections.append((section_name, [])) + sections[-1][1].append((field.name, value)) + return sections + + @classmethod + def refresh_cache(cls): + __, refreshed = get_cache(cls, ['cache_refreshed']) + if refreshed and time.time() - refreshed < 1: + return + cache_ckey, current_keys = get_cache(cls, ['_current_keys']) + if not current_keys: + return + for keys in current_keys: + if keys[0] == '__get_dynamic_choices': + cls._get_dynamic_choices(keys[1], force=True) + + @classmethod + def _get_dynamic_choices(cls, key, force=False): + """ + Get choice from existing values + :param key: data key + :param force: if set to True do not use cache + :return: tuple of choices (id, value) + """ + cache_key, value = get_cache(cls, ['__get_dynamic_choices', key]) + if not force and value: + return value + choices = set() + splitted_key = key[len('data__'):].split('__') + q = cls.objects.filter( + data__has_key=key[len('data__'):]).values_list('data', flat=True) + for value in q.all(): + for k in splitted_key: + value = value[k] + choices.add(value) + choices = [('', '')] + [(v, v) for v in sorted(list(choices))] + cache.set(cache_key, choices, settings.CACHE_SMALLTIMEOUT) + return choices + + +class FixAssociated: + ASSOCIATED = {} + + def fix_associated(self): + for key in self.ASSOCIATED: + item = getattr(self, key) + if not item: + continue + dct = self.ASSOCIATED[key] + for dct_key in dct: + subkey, ctype = dct_key + expected_values = dct[dct_key] + if not isinstance(expected_values, (list, tuple)): + expected_values = [expected_values] + if hasattr(ctype, "txt_idx"): + try: + expected_values = [ctype.objects.get(txt_idx=v) + for v in expected_values] + except ctype.DoesNotExist: + # type not yet initialized + return + current_vals = getattr(item, subkey) + is_many = False + if hasattr(current_vals, "all"): + is_many = True + current_vals = current_vals.all() + else: + current_vals = [current_vals] + is_ok = False + for current_val in current_vals: + if current_val in expected_values: + is_ok = True + break + if is_ok: + continue + # the first value is used + new_value = expected_values[0] + if is_many: + getattr(item, subkey).add(new_value) + else: + setattr(item, subkey, new_value) + + +class CascasdeUpdate: + DOWN_MODEL_UPDATE = [] + + def cascade_update(self): + for down_model in self.DOWN_MODEL_UPDATE: + if not settings.USE_BACKGROUND_TASK: + rel = getattr(self, down_model) + if hasattr(rel.model, "need_update"): + rel.update(need_update=True) + continue + for item in getattr(self, down_model).all(): + cached_label_changed(item.__class__, instance=item) + if hasattr(item, "point_2d"): + post_save_geo(item.__class__, instance=item) + + +class SearchAltName(object): + def __init__(self, search_key, search_query, extra_query=None, + distinct_query=False): + self.search_key = search_key + self.search_query = search_query + self.extra_query = extra_query or {} + self.distinct_query = distinct_query + + +class HistoryError(Exception): + def __init__(self, value): + self.value = value + + def __str__(self): + return repr(self.value) + + +class HistoricalRecords(BaseHistoricalRecords): + def _save_historic(self, manager, instance, history_date, history_type, + history_user, history_change_reason, using, attrs): + history_instance = manager.model( + history_date=history_date, + history_type=history_type, + history_user=history_user, + history_change_reason=history_change_reason, + **attrs + ) + + pre_create_historical_record.send( + sender=manager.model, + instance=instance, + history_date=history_date, + history_user=history_user, + history_change_reason=history_change_reason, + history_instance=history_instance, + using=using, + ) + + history_instance.save(using=using) + + post_create_historical_record.send( + sender=manager.model, + instance=instance, + history_instance=history_instance, + history_date=history_date, + history_user=history_user, + history_change_reason=history_change_reason, + using=using, + ) + + def create_historical_record(self, instance, history_type, using=None): + try: + history_modifier = getattr(instance, 'history_modifier', None) + assert history_modifier + except (User.DoesNotExist, AssertionError): + # on batch removing of users, user could have disappeared + return + history_date = getattr(instance, "_history_date", + datetime.datetime.now()) + history_change_reason = getattr(instance, "changeReason", None) + force = getattr(instance, "_force_history", False) + manager = getattr(instance, self.manager_name) + attrs = {} + for field in instance._meta.fields: + attrs[field.attname] = getattr(instance, field.attname) + q_history = instance.history \ + .filter(history_modifier_id=history_modifier.pk) \ + .order_by('-history_date', '-history_id') + # instance.skip_history_when_saving = True + if not q_history.count(): + if force: + delattr(instance, '_force_history') + self._save_historic( + manager, instance, history_date, history_type, history_modifier, + history_change_reason, using, attrs) + return + old_instance = q_history.all()[0] + # multiple saving by the same user in a very short time are generaly + # caused by post_save signals it is not relevant to keep them + min_history_date = datetime.datetime.now() \ + - datetime.timedelta(seconds=5) + q = q_history.filter(history_date__isnull=False, + history_date__gt=min_history_date) \ + .order_by('-history_date', '-history_id') + if not force and q.count(): + return + + if force: + delattr(instance, '_force_history') + + # record a new version only if data have been changed + for field in instance._meta.fields: + if getattr(old_instance, field.attname) != attrs[field.attname]: + self._save_historic(manager, instance, history_date, + history_type, history_modifier, + history_change_reason, using, attrs) + return + + +class BaseHistorizedItem(StatisticItem, TemplateItem, FullSearch, Imported, + JsonData, FixAssociated, CascasdeUpdate): + """ + Historized item with external ID management. + All historized items are searchable and have a data json field. + Historized items can be "locked" for edition. + """ + IS_BASKET = False + SHOW_URL = None + EXTERNAL_ID_KEY = '' + EXTERNAL_ID_DEPENDENCIES = [] + HISTORICAL_M2M = [] + + history_modifier = models.ForeignKey( + User, related_name='+', on_delete=models.SET_NULL, + verbose_name=_("Last editor"), blank=True, null=True) + history_creator = models.ForeignKey( + User, related_name='+', on_delete=models.SET_NULL, + verbose_name=_("Creator"), blank=True, null=True) + last_modified = models.DateTimeField(auto_now=True) + history_m2m = JSONField(default={}, blank=True) + need_update = models.BooleanField( + verbose_name=_("Need update"), default=False) + locked = models.BooleanField( + verbose_name=_("Item locked for edition"), default=False) + lock_user = models.ForeignKey( + User, related_name='+', on_delete=models.SET_NULL, + verbose_name=_("Locked by"), blank=True, null=True) + + ALT_NAMES = { + 'history_creator': SearchAltName( + pgettext_lazy("key for text search", u"created-by"), + 'history_creator__ishtaruser__person__cached_label__iexact' + ), + 'history_modifier': SearchAltName( + pgettext_lazy("key for text search", u"modified-by"), + 'history_modifier__ishtaruser__person__cached_label__iexact' + ), + 'modified_before': SearchAltName( + pgettext_lazy("key for text search", "modified-before"), + 'last_modified__lte' + ), + 'modified_after': SearchAltName( + pgettext_lazy("key for text search", "modified-after"), + 'last_modified__gte' + ), + } + + class Meta: + abstract = True + + @classmethod + def get_verbose_name(cls): + return cls._meta.verbose_name + + def is_locked(self, user=None): + if not user: + return self.locked + return self.locked and (not self.lock_user or self.lock_user != user) + + def merge(self, item, keep_old=False): + merge_model_objects(self, item, keep_old=keep_old) + + def public_representation(self): + return {} + + def duplicate(self, user=None, data=None): + return duplicate_item(self, user, data) + + def update_external_id(self, save=False): + if not self.EXTERNAL_ID_KEY or ( + self.external_id and + not getattr(self, 'auto_external_id', False)): + return + external_id = get_external_id(self.EXTERNAL_ID_KEY, self) + if external_id == self.external_id: + return + self.auto_external_id = True + self.external_id = external_id + self._cached_label_checked = False + if save: + self.skip_history_when_saving = True + self.save() + return external_id + + def get_last_history_date(self): + q = self.history.values("history_date").order_by('-history_date') + if not q.count(): + return + return q.all()[0]['history_date'] + + def get_previous(self, step=None, date=None, strict=False): + """ + Get a "step" previous state of the item + """ + assert step or date + historized = self.history.all() + item = None + if step: + if len(historized) <= step: + # silently return the last step if too far in the history + item = historized[len(historized) - 1] + else: + item = historized[step] + else: + for step, item in enumerate(historized): + if item.history_date == date: + break + # ended with no match + if item.history_date != date: + return + item._step = step + if len(historized) != (step + 1): + item._previous = historized[step + 1].history_date + else: + item._previous = None + if step > 0: + item._next = historized[step - 1].history_date + else: + item._next = None + item.history_date = historized[step].history_date + model = self.__class__ + for k in get_all_field_names(model): + field = model._meta.get_field(k) + if hasattr(field, 'rel') and field.rel: + if not hasattr(item, k + '_id'): + setattr(item, k, getattr(self, k)) + continue + val = getattr(item, k + '_id') + if not val: + setattr(item, k, None) + continue + try: + val = field.rel.to.objects.get(pk=val) + setattr(item, k, val) + except ObjectDoesNotExist: + if strict: + raise HistoryError("The class %s has no pk %d" % ( + str(field.rel.to), val)) + setattr(item, k, None) + item.pk = self.pk + return item + + @property + def last_edition_date(self): + try: + return self.history.order_by('-history_date').all()[0].history_date + except (AttributeError, IndexError): + return + + @property + def history_creation_date(self): + try: + return self.history.order_by('history_date').all()[0].history_date + except (AttributeError, IndexError): + return + + def rollback(self, date): + """ + Rollback to a previous state + """ + to_del, new_item = [], None + for item in self.history.all(): + if item.history_date == date: + new_item = item + break + to_del.append(item) + if not new_item: + raise HistoryError("The date to rollback to doesn't exist.") + try: + field_keys = [f.name for f in self._meta.fields] + for k in field_keys: + if k != 'id' and hasattr(self, k): + if not hasattr(new_item, k): + k = k + "_id" + setattr(self, k, getattr(new_item, k)) + + try: + self.history_modifier = User.objects.get( + pk=new_item.history_modifier_id) + except User.ObjectDoesNotExist: + pass + self.save() + saved_m2m = new_item.history_m2m.copy() + for hist_key in self.HISTORICAL_M2M: + # after each association m2m is rewrite - force the original + # to be reset + new_item.history_m2m = saved_m2m + values = new_item.m2m_listing(hist_key, create=True) or [] + hist_field = getattr(self, hist_key) + hist_field.clear() + for val in values: + hist_field.add(val) + # force label regeneration + self._cached_label_checked = False + self.save() + except ObjectDoesNotExist: + raise HistoryError("The rollback has failed.") + # clean the obsolete history + for historized_item in to_del: + historized_item.delete() + + def m2m_listing(self, key): + return getattr(self, key).all() + + def values(self): + values = {} + for f in self._meta.fields: + k = f.name + if k != 'id': + values[k] = getattr(self, k) + return values + + def get_absolute_url(self): + try: + return reverse('display-item', args=[self.SLUG, self.pk]) + except NoReverseMatch: + return + + def get_show_url(self): + show_url = self.SHOW_URL + if not show_url: + show_url = 'show-' + self.__class__.__name__.lower() + try: + return reverse(show_url, args=[self.pk, '']) + except NoReverseMatch: + return + + @property + def associated_filename(self): + if [True for attr in ('get_town_label', 'get_department', 'reference', + 'short_class_name') if not hasattr(self, attr)]: + return '' + items = [slugify(self.get_department()), + slugify(self.get_town_label()).upper(), + slugify(self.short_class_name), + slugify(self.reference), + slugify(self.name or '').replace('-', '_').capitalize()] + last_edition_date = self.last_edition_date + if last_edition_date: + items.append(last_edition_date.strftime('%Y%m%d')) + else: + items.append('00000000') + return "-".join([str(item) for item in items]) + + def save(self, *args, **kwargs): + created = not self.pk + if not getattr(self, 'skip_history_when_saving', False): + assert hasattr(self, 'history_modifier') + if created: + self.history_creator = self.history_modifier + # external ID can have related item not available before save + external_id_updated = kwargs.pop('external_id_updated') \ + if 'external_id_updated' in kwargs else False + if not created and not external_id_updated: + self.update_external_id() + super(BaseHistorizedItem, self).save(*args, **kwargs) + if created and self.update_external_id(): + # force resave for external ID creation + self.skip_history_when_saving = True + self._updated_id = True + return self.save(external_id_updated=True) + for dep in self.EXTERNAL_ID_DEPENDENCIES: + for obj in getattr(self, dep).all(): + obj.update_external_id(save=True) + self.fix_associated() + return True + + +class LightHistorizedItem(BaseHistorizedItem): + history_date = models.DateTimeField(default=datetime.datetime.now) + + class Meta: + abstract = True + + def save(self, *args, **kwargs): + super(LightHistorizedItem, self).save(*args, **kwargs) + return self + + +class OwnPerms(object): + """ + Manage special permissions for object's owner + """ + + @classmethod + def get_query_owns(cls, ishtaruser): + """ + Query object to get own items + """ + return None # implement for each object + + def can_view(self, request): + if hasattr(self, "LONG_SLUG"): + perm = "view_" + self.LONG_SLUG + else: + perm = "view_" + self.SLUG + return self.can_do(request, perm) + + def can_do(self, request, action_name): + """ + Check permission availability for the current object. + :param request: request object + :param action_name: action name eg: "change_find" - "own" variation is + checked + :return: boolean + """ + if not getattr(request.user, 'ishtaruser', None): + return False + splited = action_name.split('_') + action_own_name = splited[0] + '_own_' + '_'.join(splited[1:]) + user = request.user + if action_own_name == "view_own_findbasket": + action_own_name = "view_own_find" + return user.ishtaruser.has_right(action_name, request.session) or \ + (user.ishtaruser.has_right(action_own_name, request.session) + and self.is_own(user.ishtaruser)) + + def is_own(self, user, alt_query_own=None): + """ + Check if the current object is owned by the user + """ + IshtarUser = apps.get_model("ishtar_common", "IshtarUser") + if isinstance(user, IshtarUser): + ishtaruser = user + elif hasattr(user, 'ishtaruser'): + ishtaruser = user.ishtaruser + else: + return False + if not alt_query_own: + query = self.get_query_owns(ishtaruser) + else: + query = getattr(self, alt_query_own)(ishtaruser) + if not query: + return False + query &= Q(pk=self.pk) + return self.__class__.objects.filter(query).count() + + @classmethod + def has_item_of(cls, user): + """ + Check if the user own some items + """ + IshtarUser = apps.get_model("ishtar_common", "IshtarUser") + if isinstance(user, IshtarUser): + ishtaruser = user + elif hasattr(user, 'ishtaruser'): + ishtaruser = user.ishtaruser + else: + return False + query = cls.get_query_owns(ishtaruser) + if not query: + return False + return cls.objects.filter(query).count() + + @classmethod + def _return_get_owns(cls, owns, values, get_short_menu_class, + label_key='cached_label'): + if not owns: + return [] + sorted_values = [] + if hasattr(cls, 'BASKET_MODEL'): + owns_len = len(owns) + for idx, item in enumerate(reversed(owns)): + if get_short_menu_class: + item = item[0] + if type(item) == cls.BASKET_MODEL: + basket = owns.pop(owns_len - idx - 1) + sorted_values.append(basket) + sorted_values = list(reversed(sorted_values)) + if not values: + if not get_short_menu_class: + return sorted_values + list( + sorted(owns, key=lambda x: getattr(x, label_key) or "")) + return sorted_values + list( + sorted(owns, key=lambda x: getattr(x[0], label_key) or "")) + if not get_short_menu_class: + return sorted_values + list( + sorted(owns, key=lambda x: x[label_key] or "")) + return sorted_values + list( + sorted(owns, key=lambda x: x[0][label_key] or "")) + + @classmethod + def get_owns(cls, user, replace_query=None, limit=None, values=None, + get_short_menu_class=False, menu_filtr=None): + """ + Get Own items + """ + if not replace_query: + replace_query = {} + if hasattr(user, 'is_authenticated') and not user.is_authenticated(): + returned = cls.objects.filter(pk__isnull=True) + if values: + returned = [] + return returned + IshtarUser = apps.get_model("ishtar_common", "IshtarUser") + if isinstance(user, User): + try: + ishtaruser = IshtarUser.objects.get(user_ptr=user) + except IshtarUser.DoesNotExist: + returned = cls.objects.filter(pk__isnull=True) + if values: + returned = [] + return returned + elif isinstance(user, IshtarUser): + ishtaruser = user + else: + if values: + return [] + return cls.objects.filter(pk__isnull=True) + items = [] + if hasattr(cls, 'BASKET_MODEL'): + items = list(cls.BASKET_MODEL.objects.filter(user=ishtaruser).all()) + query = cls.get_query_owns(ishtaruser) + if not query and not replace_query: + returned = cls.objects.filter(pk__isnull=True) + if values: + returned = [] + return returned + if query: + q = cls.objects.filter(query) + else: # replace_query + q = cls.objects.filter(replace_query) + if values: + q = q.values(*values) + if limit: + items += list(q.order_by('-pk')[:limit]) + else: + items += list(q.order_by(*cls._meta.ordering).all()) + if get_short_menu_class: + if values: + if 'id' not in values: + raise NotImplementedError( + "Call of get_owns with get_short_menu_class option and" + " no 'id' in values is not implemented") + my_items = [] + for i in items: + if hasattr(cls, 'BASKET_MODEL') and \ + type(i) == cls.BASKET_MODEL: + dct = dict([(k, getattr(i, k)) for k in values]) + my_items.append( + (dct, cls.BASKET_MODEL.get_short_menu_class(i.pk))) + else: + my_items.append((i, cls.get_short_menu_class(i['id']))) + items = my_items + else: + items = [(i, cls.get_short_menu_class(i.pk)) for i in items] + return items + + @classmethod + def _get_query_owns_dicts(cls, ishtaruser): + """ + List of query own dict to construct the query. + Each dict are join with an AND operator, each dict key, values are + joined with OR operator + """ + return [] + + @classmethod + def _construct_query_own(cls, prefix, dct_list): + q = None + for subquery_dict in dct_list: + subquery = None + for k in subquery_dict: + subsubquery = Q(**{prefix + k: subquery_dict[k]}) + if subquery: + subquery |= subsubquery + else: + subquery = subsubquery + if not subquery: + continue + if q: + q &= subquery + else: + q = subquery + return q + + +class NumberManager(models.Manager): + def get_by_natural_key(self, number): + return self.get(number=number) + + +class State(models.Model): + label = models.CharField(_("Label"), max_length=30) + number = models.CharField(_("Number"), unique=True, max_length=3) + objects = NumberManager() + + class Meta: + verbose_name = _("State") + ordering = ['number'] + + def __str__(self): + return self.label + + def natural_key(self): + return (self.number,) + + +class Department(models.Model): + label = models.CharField(_("Label"), max_length=30) + number = models.CharField(_("Number"), unique=True, max_length=3) + state = models.ForeignKey( + 'State', verbose_name=_("State"), blank=True, null=True, + on_delete=models.SET_NULL, + ) + objects = NumberManager() + + class Meta: + verbose_name = _("Department") + verbose_name_plural = _("Departments") + ordering = ['number'] + + def __str__(self): + return self.label + + def natural_key(self): + return (self.number,) + + def history_compress(self): + return self.number + + @classmethod + def history_decompress(cls, full_value, create=False): + if not full_value: + return [] + res = [] + for value in full_value: + try: + res.append(cls.objects.get(number=value)) + except cls.DoesNotExist: + continue + return res + + +class Arrondissement(models.Model): + name = models.CharField("Nom", max_length=30) + department = models.ForeignKey(Department, verbose_name="Département") + + def __str__(self): + return settings.JOINT.join((self.name, str(self.department))) + + +class Canton(models.Model): + name = models.CharField("Nom", max_length=30) + arrondissement = models.ForeignKey(Arrondissement, + verbose_name="Arrondissement") + + def __str__(self): + return settings.JOINT.join( + (self.name, str(self.arrondissement))) + + +class TownManager(models.GeoManager): + def get_by_natural_key(self, numero_insee, year): + return self.get(numero_insee=numero_insee, year=year) + + +class Town(Imported, models.Model): + name = models.CharField(_("Name"), max_length=100) + surface = models.IntegerField(_("Surface (m2)"), blank=True, null=True) + center = models.PointField(_("Localisation"), srid=settings.SRID, + blank=True, null=True) + limit = models.MultiPolygonField(_("Limit"), blank=True, null=True) + numero_insee = models.CharField("Code commune (numéro INSEE)", + max_length=120) + departement = models.ForeignKey( + Department, verbose_name=_("Department"), + on_delete=models.SET_NULL, null=True, blank=True) + year = models.IntegerField( + _("Year of creation"), null=True, blank=True, + help_text=_("Filling this field is relevant to distinguish old towns " + "from new towns.")) + children = models.ManyToManyField( + 'Town', verbose_name=_("Town children"), blank=True, + related_name='parents') + cached_label = models.CharField(_("Cached name"), max_length=500, + null=True, blank=True, db_index=True) + objects = TownManager() + + class Meta: + verbose_name = _("Town") + verbose_name_plural = _("Towns") + if settings.COUNTRY == 'fr': + ordering = ['numero_insee'] + unique_together = (('numero_insee', 'year'),) + + def natural_key(self): + return (self.numero_insee, self.year) + + def history_compress(self): + values = {'numero_insee': self.numero_insee, + 'year': self.year or ""} + return values + + def get_values(self, prefix='', no_values=False, no_base_finds=True): + return { + prefix or "label": str(self), + prefix + "name": self.name, + prefix + "numero_insee": self.numero_insee + } + + @classmethod + def history_decompress(cls, full_value, create=False): + if not full_value: + return [] + res = [] + for value in full_value: + try: + res.append( + cls.objects.get(numero_insee=value['numero_insee'], + year=value['year'] or None)) + except cls.DoesNotExist: + continue + return res + + def __str__(self): + return self.cached_label or "" + + @property + def label_with_areas(self): + label = [self.name] + if self.numero_insee: + label.append("({})".format(self.numero_insee)) + for area in self.areas.all(): + label.append(" - ") + label.append(area.full_label) + return " ".join(label) + + def generate_geo(self, force=False): + force = self.generate_limit(force=force) + self.generate_center(force=force) + self.generate_area(force=force) + + def generate_limit(self, force=False): + if not force and self.limit: + return + parents = None + if not self.parents.count(): + return + for parent in self.parents.all(): + if not parent.limit: + return + if not parents: + parents = parent.limit + else: + parents = parents.union(parent.limit) + # if union is a simple polygon make it a multi + if 'MULTI' not in parents.wkt: + parents = parents.wkt.replace('POLYGON', 'MULTIPOLYGON(') + ")" + if not parents: + return + self.limit = parents + self.save() + return True + + def generate_center(self, force=False): + if not force and (self.center or not self.limit): + return + self.center = self.limit.centroid + if not self.center: + return False + self.save() + return True + + def generate_area(self, force=False): + if not force and (self.surface or not self.limit): + return + surface = self.limit.transform(settings.SURFACE_SRID, + clone=True).area + if surface > 214748364 or not surface: + return False + self.surface = surface + self.save() + return True + + def update_town_code(self): + if not self.numero_insee or not self.children.count() or not self.year: + return + old_num = self.numero_insee[:] + numero = old_num.split('-')[0] + self.numero_insee = "{}-{}".format(numero, self.year) + if self.numero_insee != old_num: + return True + + def _generate_cached_label(self): + cached_label = self.name + if settings.COUNTRY == "fr" and self.numero_insee: + dpt_len = 2 + if self.numero_insee.startswith('97') or \ + self.numero_insee.startswith('98') or \ + self.numero_insee[0] not in ('0', '1', '2', '3', '4', '5', + '6', '7', '8', '9'): + dpt_len = 3 + cached_label = "%s - %s" % (self.name, self.numero_insee[:dpt_len]) + if self.year and self.children.count(): + cached_label += " ({})".format(self.year) + return cached_label + + +def post_save_town(sender, **kwargs): + cached_label_changed(sender, **kwargs) + town = kwargs['instance'] + town.generate_geo() + if town.update_town_code(): + town.save() + + +post_save.connect(post_save_town, sender=Town) + + +def town_child_changed(sender, **kwargs): + town = kwargs['instance'] + if town.update_town_code(): + town.save() + + +m2m_changed.connect(town_child_changed, sender=Town.children.through) + + +class Address(BaseHistorizedItem): + FIELDS = ( + "address", "address_complement", "postal_code", "town", + "precise_town", "country", + "alt_address", "alt_address_complement", "alt_postal_code", "alt_town", + "alt_country", + "phone", "phone_desc", "phone2", "phone_desc2", "phone3", "phone_desc3", + "raw_phone", "mobile_phone", "email", "alt_address_is_prefered" + ) + address = models.TextField(_("Address"), null=True, blank=True) + address_complement = models.TextField(_("Address complement"), null=True, + blank=True) + postal_code = models.CharField(_("Postal code"), max_length=10, null=True, + blank=True) + town = models.CharField(_("Town (freeform)"), max_length=150, null=True, + blank=True) + precise_town = models.ForeignKey( + Town, verbose_name=_("Town (precise)"), null=True, + blank=True) + country = models.CharField(_("Country"), max_length=30, null=True, + blank=True) + alt_address = models.TextField(_("Other address: address"), null=True, + blank=True) + alt_address_complement = models.TextField( + _("Other address: address complement"), null=True, blank=True) + alt_postal_code = models.CharField(_("Other address: postal code"), + max_length=10, null=True, blank=True) + alt_town = models.CharField(_("Other address: town"), max_length=70, + null=True, blank=True) + alt_country = models.CharField(_("Other address: country"), + max_length=30, null=True, blank=True) + phone = models.CharField(_("Phone"), max_length=18, null=True, blank=True) + phone_desc = models.CharField(_("Phone description"), max_length=300, + null=True, blank=True) + phone2 = models.CharField(_("Phone description 2"), max_length=18, + null=True, blank=True) + phone_desc2 = models.CharField(_("Phone description 2"), max_length=300, + null=True, blank=True) + phone3 = models.CharField(_("Phone 3"), max_length=18, null=True, + blank=True) + phone_desc3 = models.CharField(_("Phone description 3"), max_length=300, + null=True, blank=True) + raw_phone = models.TextField(_("Raw phone"), blank=True, null=True) + mobile_phone = models.CharField(_("Mobile phone"), max_length=18, + null=True, blank=True) + email = models.EmailField( + _("Email"), max_length=300, blank=True, null=True) + alt_address_is_prefered = models.BooleanField( + _("Alternative address is prefered"), default=False) + history = HistoricalRecords(inherit=True) + SUB_ADDRESSES = [] + + class Meta: + abstract = True + + def get_short_html_items(self): + items = [] + if self.address: + items.append( + """<span class="subadress">{}</span>""".format(self.address)) + if self.address_complement: + items.append( + """<span class="subadress-complement">{}</span>""".format( + self.address_complement)) + if self.postal_code: + items.append( + """<span class="postal-code">{}</span>""".format( + self.postal_code)) + if self.precise_town: + items.append( + """<span class="town">{}</span>""".format( + self.precise_town.name)) + elif self.town: + items.append( + """<span class="town">{}</span>""".format( + self.town)) + if self.country: + items.append( + """<span class="country">{}</span>""".format( + self.country)) + return items + + def get_short_html_detail(self): + html = """<div class="address">""" + items = self.get_short_html_items() + if not items: + items = [ + "<span class='no-address'>{}</span>".format( + _("No associated address") + ) + ] + html += "".join(items) + html += """</div>""" + return html + + def get_town_centroid(self): + if self.precise_town: + return self.precise_town.center, self._meta.verbose_name + for sub_address in self.SUB_ADDRESSES: + sub_item = getattr(self, sub_address) + if sub_item and sub_item.precise_town: + return sub_item.precise_town.center, sub_item._meta.verbose_name + + def get_town_polygons(self): + if self.precise_town: + return self.precise_town.limit, self._meta.verbose_name + for sub_address in self.SUB_ADDRESSES: + sub_item = getattr(self, sub_address) + if sub_item and sub_item.precise_town: + return sub_item.precise_town.limit, sub_item._meta.verbose_name + + def get_attribute(self, attr): + if self.town or self.precise_town: + return getattr(self, attr) + for sub_address in self.SUB_ADDRESSES: + sub_item = getattr(self, sub_address) + if not sub_item: + continue + if sub_item.town or sub_item.precise_town: + return getattr(sub_item, attr) + return getattr(self, attr) + + def get_address(self): + return self.get_attribute("address") + + def get_address_complement(self): + return self.get_attribute("address_complement") + + def get_postal_code(self): + return self.get_attribute("postal_code") + + def get_town(self): + return self.get_attribute("town") + + def get_precise_town(self): + return self.get_attribute("precise_town") + + def get_country(self): + return self.get_attribute("country") + + def simple_lbl(self): + return str(self) + + def full_address(self): + lbl = self.simple_lbl() + if lbl: + lbl += "\n" + lbl += self.address_lbl() + return lbl + + def address_lbl(self): + lbl = u'' + prefix = '' + if self.alt_address_is_prefered: + prefix = 'alt_' + if getattr(self, prefix + 'address'): + lbl += getattr(self, prefix + 'address') + if getattr(self, prefix + 'address_complement'): + if lbl: + lbl += "\n" + lbl += getattr(self, prefix + 'address_complement') + postal_code = getattr(self, prefix + 'postal_code') + town = getattr(self, prefix + 'town') + if postal_code or town: + if lbl: + lbl += "\n" + lbl += "{}{}{}".format( + postal_code or '', + " " if postal_code and town else '', + town or '') + if self.phone: + if lbl: + lbl += "\n" + lbl += "{} {}".format(str(_("Tel: ")), self.phone) + if self.mobile_phone: + if lbl: + lbl += "\n" + lbl += "{} {}".format(str(_("Mobile: ")), self.mobile_phone) + if self.email: + if lbl: + lbl += "\n" + lbl += "{} {}".format(str(_("Email: ")), self.email) + return lbl + + +class Merge(models.Model): + merge_key = models.TextField(_("Merge key"), blank=True, null=True) + merge_candidate = models.ManyToManyField("self", blank=True) + merge_exclusion = models.ManyToManyField("self", blank=True) + archived = models.NullBooleanField(default=False, + blank=True, null=True) + # 1 for one word similarity, 2 for two word similarity, etc. + MERGE_CLEMENCY = None + EMPTY_MERGE_KEY = '--' + MERGE_ATTRIBUTE = "name" + + class Meta: + abstract = True + + def generate_merge_key(self): + if self.archived: + return + merge_attr = getattr(self, self.MERGE_ATTRIBUTE) + self.merge_key = slugify(merge_attr if merge_attr else '') + if not self.merge_key: + self.merge_key = self.EMPTY_MERGE_KEY + self.merge_key = self.merge_key + + def generate_merge_candidate(self): + if self.archived: + return + if not self.merge_key: + self.generate_merge_key() + self.save(merge_key_generated=True) + if not self.pk or self.merge_key == self.EMPTY_MERGE_KEY: + return + q = self.__class__.objects \ + .exclude(pk=self.pk) \ + .exclude(merge_exclusion=self) \ + .exclude(merge_candidate=self) \ + .exclude(archived=True) + if not self.MERGE_CLEMENCY: + q = q.filter(merge_key=self.merge_key) + else: + subkeys_front = "-".join( + self.merge_key.split('-')[:self.MERGE_CLEMENCY]) + subkeys_back = "-".join( + self.merge_key.split('-')[-self.MERGE_CLEMENCY:]) + q = q.filter(Q(merge_key__istartswith=subkeys_front) | + Q(merge_key__iendswith=subkeys_back)) + for item in q.all(): + self.merge_candidate.add(item) + + def save(self, *args, **kwargs): + # prevent circular save + merge_key_generated = False + if 'merge_key_generated' in kwargs: + merge_key_generated = kwargs.pop('merge_key_generated') + self.generate_merge_key() + item = super(Merge, self).save(*args, **kwargs) + if not merge_key_generated: + self.merge_candidate.clear() + self.generate_merge_candidate() + return item + + def archive(self): + self.archived = True + self.save() + self.merge_candidate.clear() + self.merge_exclusion.clear() + + def merge(self, item, keep_old=False, exclude_fields=None): + merge_model_objects(self, item, keep_old=keep_old, + exclude_fields=exclude_fields) + self.generate_merge_candidate() + + + +def __get_stats_cache_values(model_name, model_pk): + StatsCache = apps.get_model("ishtar_common", "StatsCache") + q = StatsCache.objects.filter( + model=model_name, model_pk=model_pk + ) + nb = q.count() + if nb >= 1: + sc = q.all()[0] + for extra in q.order_by("-id").all()[1:]: + extra.delete() + else: + sc = StatsCache.objects.create( + model=model_name, model_pk=model_pk + ) + values = sc.values + if not values: + values = {} + return sc, values + + +@task() +def _update_stats(app, model, model_pk, funcname): + model_name = app + "." + model + model = apps.get_model(app, model) + try: + item = model.objects.get(pk=model_pk) + except model.DoesNotExist: + return + value = getattr(item, funcname)() + sc, current_values = __get_stats_cache_values(model_name, model_pk) + current_values[funcname] = value + sc.values = current_values + sc.update_requested = None + sc.updated = datetime.datetime.now() + sc.save() + +def update_stats(statscache, item, funcname): + if not settings.USE_BACKGROUND_TASK: + current_values = statscache.values + if not current_values: + current_values = {} + value = getattr(item, funcname)() + current_values[funcname] = value + statscache.values = current_values + statscache.updated = datetime.datetime.now() + statscache.save() + return current_values + + now = datetime.datetime.now() + app_name = item._meta.app_label + model_name = item._meta.model_name + statscache.update_requested = now.isoformat() + statscache.save() + _update_stats.delay(app_name, model_name, item.pk, funcname) + return statscache.values + + +class DashboardFormItem: + """ + Provide methods to manage statistics + """ + + def last_stats_update(self): + model_name = self._meta.app_label + "." + self._meta.model_name + StatsCache = apps.get_model("ishtar_common", "StatsCache") + q = StatsCache.objects.filter( + model=model_name, model_pk=self.pk).order_by("-updated") + if not q.count(): + return + return q.all()[0].updated + + def _get_or_set_stats(self, funcname, update=False, + expected_type=None): + model_name = self._meta.app_label + "." + self._meta.model_name + StatsCache = apps.get_model("ishtar_common", "StatsCache") + sc, __ = StatsCache.objects.get_or_create( + model=model_name, model_pk=self.pk + ) + if not update: + values = sc.values + if funcname not in values: + if expected_type is not None: + return expected_type() + return 0 + else: + values = update_stats(sc, self, funcname) + if funcname in values: + values = values[funcname] + else: + values = 0 + if expected_type is not None and not isinstance(values, expected_type): + return expected_type() + return values + + @classmethod + def get_periods(cls, slice='month', fltr={}, date_source='creation'): + date_var = date_source + '_date' + q = cls.objects.filter(**{date_var + '__isnull': False}) + if fltr: + q = q.filter(**fltr) + if slice == 'year': + return [res[date_var].year for res in list(q.values(date_var) + .annotate( + Count("id")).order_by())] + elif slice == 'month': + return [(res[date_var].year, res[date_var].month) + for res in list(q.values(date_var) + .annotate(Count("id")).order_by())] + return [] + + @classmethod + def get_by_year(cls, year, fltr={}, date_source='creation'): + date_var = date_source + '_date' + q = cls.objects.filter(**{date_var + '__isnull': False}) + if fltr: + q = q.filter(**fltr) + return q.filter( + **{date_var + '__year': year}).order_by('pk').distinct('pk') + + @classmethod + def get_by_month(cls, year, month, fltr={}, date_source='creation'): + date_var = date_source + '_date' + q = cls.objects.filter(**{date_var + '__isnull': False}) + if fltr: + q = q.filter(**fltr) + q = q.filter( + **{date_var + '__year': year, date_var + '__month': month}) + return q.order_by('pk').distinct('pk') + + @classmethod + def get_total_number(cls, fltr=None): + q = cls.objects + if fltr: + q = q.filter(**fltr) + return q.order_by('pk').distinct('pk').count() + + +class DocumentItem: + ALT_NAMES = { + 'documents__image__isnull': + SearchAltName( + pgettext_lazy("key for text search", "has-image"), + 'documents__image__isnull'), + 'documents__associated_url__isnull': + SearchAltName( + pgettext_lazy("key for text search", "has-url"), + 'documents__associated_url__isnull'), + 'documents__associated_file__isnull': + SearchAltName( + pgettext_lazy("key for text search", "has-attached-file"), + 'documents__associated_file__isnull'), + } + + def public_representation(self): + images = [] + if getattr(self, "main_image", None): + images.append(self.main_image.public_representation()) + images += [ + image.public_representation() + for image in self.images_without_main_image.all() + ] + return {"images": images} + + @property + def images(self): + if not hasattr(self, 'documents'): + Document = apps.get_model("ishtar_common", "Document") + return Document.objects.none() + return self.documents.filter( + image__isnull=False).exclude(image="").order_by("pk") + + @property + def images_without_main_image(self): + if not hasattr(self, 'main_image') or not hasattr(self, 'documents'): + return self.images + if not self.main_image: + return self.documents.filter( + image__isnull=False).exclude( + image="").order_by("pk") + return self.documents.filter( + image__isnull=False).exclude( + image="").exclude(pk=self.main_image.pk).order_by("pk") + + def get_extra_actions(self, request): + """ + For sheet template: return "Add document / image" action + """ + # url, base_text, icon, extra_text, extra css class, is a quick action + try: + actions = super(DocumentItem, self).get_extra_actions(request) + except AttributeError: + actions = [] + + if not hasattr(self, 'SLUG'): + return actions + + can_add_doc = self.can_do(request, 'add_document') + if can_add_doc and ( + not hasattr(self, "is_locked") or + not self.is_locked(request.user)): + actions += [ + ( + reverse("create-document") + "?{}={}".format( + self.SLUG, self.pk), + _("Add document/image"), + "fa fa-plus", + _("doc./image"), + "", + False + ) + ] + return actions + + +def document_attached_changed(sender, **kwargs): + # associate a default main image + instance = kwargs.get("instance", None) + model = kwargs.get("model", None) + pk_set = kwargs.get("pk_set", None) + if not instance or not model: + return + + if hasattr(instance, "documents"): + items = [instance] + else: + if not pk_set: + return + try: + items = [model.objects.get(pk=pk) for pk in pk_set] + except model.DoesNotExist: + return + + for item in items: + q = item.documents.filter( + image__isnull=False).exclude(image='') + if item.main_image: + if q.filter(pk=item.main_image.pk).count(): + return + # the association has disappear not the main image anymore + item.main_image = None + item.skip_history_when_saving = True + item.save() + if not q.count(): + return + # by default get the lowest pk + item.main_image = q.order_by('pk').all()[0] + item.skip_history_when_saving = True + item.save() + + +class QuickAction: + """ + Quick action available from tables + """ + + def __init__(self, url, icon_class='', text='', target=None, rights=None, + module=None): + self.url = url + self.icon_class = icon_class + self.text = text + self.rights = rights + self.target = target + self.module = module + assert self.target in ('one', 'many', None) + + def is_available(self, user, session=None, obj=None): + if self.module and not getattr(get_current_profile(), self.module): + return False + if not self.rights: # no restriction + return True + if not user or not hasattr(user, 'ishtaruser') or not user.ishtaruser: + return False + user = user.ishtaruser + + for right in self.rights: + if user.has_perm(right, session=session, obj=obj): + return True + return False + + @property + def rendered_icon(self): + if not self.icon_class: + return "" + return "<i class='{}' aria-hidden='true'></i>".format(self.icon_class) + + @property + def base_url(self): + if self.target is None: + url = reverse(self.url) + else: + # put arbitrary pk for the target + url = reverse(self.url, args=[0]) + url = url[:-2] # all quick action url have to finish with the + # pk of the selected item and a "/" + return url + + +class DynamicRequest: + def __init__(self, label, app_name, model_name, form_key, search_key, + type_query, search_query): + self.label = label + self.form_key = form_key + self.search_key = search_key + self.app_name = app_name + self.model_name = model_name + self.type_query = type_query + self.search_query = search_query + + def get_all_types(self): + model = apps.get_app_config(self.app_name).get_model(self.model_name) + return model.objects.filter(available=True) + + def get_form_fields(self): + fields = {} + for item in self.get_all_types().all(): + fields[self.form_key + "-" + item.txt_idx] = forms.CharField( + label=str(self.label) + " " + str(item), + required=False + ) + return fields + + def get_extra_query(self, slug): + return { + self.type_query: slug + } + + def get_alt_names(self): + alt_names = {} + for item in self.get_all_types().all(): + alt_names[self.form_key + "-" + item.txt_idx] = SearchAltName( + self.search_key + "-" + item.txt_idx, self.search_query, + self.get_extra_query(item.txt_idx), distinct_query=True + ) + return alt_names + + +class SpatialReferenceSystem(GeneralType): + order = models.IntegerField(_("Order"), default=10) + auth_name = models.CharField( + _("Authority name"), default=u'EPSG', max_length=256) + srid = models.IntegerField(_("Authority SRID")) + + class Meta: + verbose_name = _("Spatial reference system") + verbose_name_plural = _("Spatial reference systems") + ordering = ('label',) + + +post_save.connect(post_save_cache, sender=SpatialReferenceSystem) +post_delete.connect(post_save_cache, sender=SpatialReferenceSystem) + + +class GeoItem(models.Model): + GEO_SOURCE = ( + ('T', _("Town")), ('P', _("Precise")), ('M', _("Polygon")) + ) + + # gis + x = models.FloatField(_(u'X'), blank=True, null=True) + y = models.FloatField(_(u'Y'), blank=True, null=True) + z = models.FloatField(_(u'Z'), blank=True, null=True) + estimated_error_x = models.FloatField(_(u'Estimated error for X'), + blank=True, null=True) + estimated_error_y = models.FloatField(_(u'Estimated error for Y'), + blank=True, null=True) + estimated_error_z = models.FloatField(_(u'Estimated error for Z'), + blank=True, null=True) + spatial_reference_system = models.ForeignKey( + SpatialReferenceSystem, verbose_name=_("Spatial Reference System"), + blank=True, null=True) + point = models.PointField(_("Point"), blank=True, null=True, dim=3) + point_2d = models.PointField(_("Point (2D)"), blank=True, null=True) + point_source = models.CharField( + _("Point source"), choices=GEO_SOURCE, max_length=1, blank=True, + null=True) + point_source_item = models.CharField( + _("Point source item"), max_length=100, blank=True, null=True) + multi_polygon = models.MultiPolygonField(_("Multi polygon"), blank=True, + null=True) + multi_polygon_source = models.CharField( + _("Multi-polygon source"), choices=GEO_SOURCE, max_length=1, + blank=True, null=True) + multi_polygon_source_item = models.CharField( + _("Multi polygon source item"), max_length=100, blank=True, null=True) + + GEO_LABEL = "" + + class Meta: + abstract = True + + def get_town_centroid(self): + raise NotImplementedError + + def get_town_polygons(self): + raise NotImplementedError + + @property + def display_coordinates(self): + if not self.point_2d: + return "" + profile = get_current_profile() + if not profile.display_srs or not profile.display_srs.srid: + return self.x, self.y + point = self.point_2d.transform(profile.display_srs.srid, clone=True) + return round(point.x, 5), round(point.y, 5) + + @property + def display_spatial_reference_system(self): + profile = get_current_profile() + if not profile.display_srs or not profile.display_srs.srid: + return self.spatial_reference_system + return profile.display_srs + + def get_precise_points(self): + if self.point_source == 'P' and self.point_2d: + return self.point_2d, self.point, self.point_source_item + + def get_precise_polygons(self): + if self.multi_polygon_source == 'P' and self.multi_polygon: + return self.multi_polygon, self.multi_polygon_source_item + + def most_precise_geo(self): + if self.point_source == 'M': + return 'multi_polygon' + current_source = str(self.__class__._meta.verbose_name) + if self.multi_polygon_source_item == current_source \ + and (self.multi_polygon_source == "P" or + self.point_source_item != current_source): + return 'multi_polygon' + if self.point_source_item == current_source \ + and self.point_source == 'P': + return 'point' + if self.multi_polygon_source == 'P': + return 'multi_polygon' + if self.point_source == 'P': + return 'point' + if self.multi_polygon: + return 'multi_polygon' + if self.point_2d: + return 'point' + + def geo_point_source(self): + if not self.point_source: + return "" + src = "{} - {}".format( + dict(self.GEO_SOURCE)[self.point_source], + self.point_source_item + ) + return src + + def geo_polygon_source(self): + if not self.multi_polygon_source: + return "" + src = "{} - {}".format( + dict(self.GEO_SOURCE)[self.multi_polygon_source], + self.multi_polygon_source_item + ) + return src + + def _geojson_serialize(self, geom_attr): + if not hasattr(self, geom_attr): + return "" + cached_label_key = 'cached_label' + if self.GEO_LABEL: + cached_label_key = self.GEO_LABEL + if getattr(self, "CACHED_LABELS", None): + cached_label_key = self.CACHED_LABELS[-1] + geojson = serialize( + 'geojson', + self.__class__.objects.filter(pk=self.pk), + geometry_field=geom_attr, fields=(cached_label_key,)) + geojson_dct = json.loads(geojson) + profile = get_current_profile() + precision = profile.point_precision + + features = geojson_dct.pop('features') + for idx in range(len(features)): + feature = features[idx] + lbl = feature['properties'].pop(cached_label_key) + feature['properties']['name'] = lbl + feature['properties']['id'] = self.pk + if precision is not None: + geom_type = feature["geometry"].get("type", None) + if geom_type == "Point": + feature["geometry"]["coordinates"] = [ + round(coord, precision) + for coord in feature["geometry"]["coordinates"] + ] + geojson_dct['features'] = features + geojson_dct['link_template'] = simple_link_to_window(self).replace( + '999999', '<pk>' + ) + geojson = json.dumps(geojson_dct) + return geojson + + @property + def point_2d_geojson(self): + return self._geojson_serialize('point_2d') + + @property + def multi_polygon_geojson(self): + return self._geojson_serialize('multi_polygon') + + +class ImageContainerModel: + def _get_image_path(self, filename): + return "{}/{}".format(self._get_base_image_path(), filename) + + def _get_base_image_path(self): + n = datetime.datetime.now() + return "upload/{}/{:02d}/{:02d}".format(n.year, n.month, n.day) + + +class QRCodeItem(models.Model, ImageContainerModel): + HAS_QR_CODE = True + qrcode = models.ImageField(upload_to=get_image_path, blank=True, null=True, + max_length=255) + + class Meta: + abstract = True + + @property + def qrcode_path(self): + if not self.qrcode: + self.generate_qrcode() + if not self.qrcode: # error on qrcode generation + return "" + return self.qrcode.path + + def generate_qrcode(self, request=None, secure=True, tmpdir=None): + url = self.get_absolute_url() + site = Site.objects.get_current() + if request: + scheme = request.scheme + else: + if secure: + scheme = "https" + else: + scheme = "http" + url = scheme + "://" + site.domain + url + TinyUrl = apps.get_model("ishtar_common", "TinyUrl") + tiny_url = TinyUrl() + tiny_url.link = url + tiny_url.save() + short_url = scheme + "://" + site.domain + reverse( + 'tiny-redirect', args=[tiny_url.get_short_id()]) + qr = pyqrcode.create(short_url, version=settings.ISHTAR_QRCODE_VERSION) + tmpdir_created = False + if not tmpdir: + tmpdir = tempfile.mkdtemp("-qrcode") + tmpdir_created = True + filename = tmpdir + os.sep + 'qrcode.png' + qr.png(filename, scale=settings.ISHTAR_QRCODE_SCALE) + with open(filename, 'rb') as qrfile: + self.qrcode.save("qrcode.png", File(qrfile)) + self.skip_history_when_saving = True + self._no_move = True + self.save() + if tmpdir_created: + shutil.rmtree(tmpdir) + + +class SearchVectorConfig: + def __init__(self, key, language=None, func=None): + self.key = key + if language: + self.language = language + if language == "local": + self.language = settings.ISHTAR_SEARCH_LANGUAGE + else: + self.language = "simple" + self.func = func + + def format(self, value): + if value == 'None': + value = '' + if not self.func: + return [value] + return self.func(value) + + +class ShortMenuItem: + """ + Item available in the short menu + """ + UP_MODEL_QUERY = {} + + @classmethod + def get_short_menu_class(cls, pk): + return '' + + @property + def short_class_name(self): + return "" + + +class MainItem(ShortMenuItem): + """ + Item with quick actions available from tables + Extra actions are available from sheets + """ + QUICK_ACTIONS = [] + + @classmethod + def get_quick_actions(cls, user, session=None, obj=None): + """ + Get a list of (url, title, icon, target) actions for an user + """ + qas = [] + for action in cls.QUICK_ACTIONS: + if not action.is_available(user, session=session, obj=obj): + continue + qas.append([action.base_url, + mark_safe(action.text), + mark_safe(action.rendered_icon), + action.target or ""]) + return qas + + @classmethod + def get_quick_action_by_url(cls, url): + for action in cls.QUICK_ACTIONS: + if action.url == url: + return action + + def regenerate_external_id(self): + if not hasattr(self, "external_id"): + return + self.skip_history_when_saving = True + self._no_move = True + if hasattr(self, "auto_external_id"): + self.external_id = None + self.save() + + def get_extra_actions(self, request): + if not hasattr(self, 'SLUG'): + return [] + + actions = [] + if request.user.is_superuser and hasattr(self, "auto_external_id"): + actions += [ + ( + reverse("regenerate-external-id") + "?{}={}".format( + self.SLUG, self.pk), + _("Regenerate ID"), + "fa fa-key", + _("regen."), + "", + True + ) + ] + + return actions diff --git a/ishtar_common/serializers.py b/ishtar_common/serializers.py index 84108c135..55989adcb 100644 --- a/ishtar_common/serializers.py +++ b/ishtar_common/serializers.py @@ -13,6 +13,7 @@ from django.contrib.contenttypes.models import ContentType from django.contrib.auth.models import Group, Permission from . import models +from .models_common import State, Department from archaeological_operations.models import ActType from ishtar_common.serializers_utils import generic_get_results, \ @@ -102,7 +103,7 @@ def importer_serialization(archive=False, return_empty_types=False, GEO_MODEL_LIST = [ - models.State, models.Department, models.Town, models.Area + State, Department, models.Town, models.Area ] diff --git a/ishtar_common/tasks.py b/ishtar_common/tasks.py index 223eded06..96db07b1b 100644 --- a/ishtar_common/tasks.py +++ b/ishtar_common/tasks.py @@ -27,7 +27,7 @@ from django.core.files import File from django.db.models import Q from django.utils.translation import ugettext_lazy as _ -from ishtar_common.models import Town, Department +from ishtar_common.models_common import Town, Department from ishtar_common.utils import task from ishtar_common.serializers import full_serialization, restore_serialized diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py index 6d112f2d1..94a636b7b 100644 --- a/ishtar_common/tests.py +++ b/ishtar_common/tests.py @@ -47,7 +47,7 @@ from django.test import TestCase as BaseTestCase from django.test.client import Client from django.test.runner import DiscoverRunner -from ishtar_common import models +from ishtar_common import models, models_common from ishtar_common import views from ishtar_common.apps import admin_site from ishtar_common.serializers import type_serialization, \ @@ -792,7 +792,7 @@ class SerializationTest(GenericSerializationTest, TestCase): self.generic_serialization_test(importer_serialization) def create_geo_default(self): - s = models.State.objects.create(label="test", number="999") + s = models_common.State.objects.create(label="test", number="999") d = models.Department.objects.create(label="test", number="999", state=s) t1 = models.Town.objects.create( diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py index 41a844026..31459f861 100644 --- a/ishtar_common/utils.py +++ b/ishtar_common/utils.py @@ -37,6 +37,7 @@ import subprocess import sys import tempfile import time +import uuid import xmltodict import zipfile @@ -48,7 +49,7 @@ from django.contrib.contenttypes.models import ContentType from django.contrib.gis.geos import GEOSGeometry from django.contrib.sessions.backends.db import SessionStore from django.core.cache import cache -from django.core.exceptions import SuspiciousOperation +from django.core.exceptions import SuspiciousOperation, ObjectDoesNotExist from django.core.files import File from django.core.validators import EMPTY_VALUES from django.core.urlresolvers import reverse @@ -713,9 +714,6 @@ def _post_save_geo(sender, **kwargs): """ Convert raw x, y, z point to real geo field """ - from ishtar_common.models import get_current_profile # not clean but utils - # is loaded before models - profile = get_current_profile() if not profile.mapping: return @@ -1758,3 +1756,115 @@ def try_fix_file(filename, make_copy=True, hard=False): if make_copy: shutil.copy2(full_file, filename) return f + + +def get_current_profile(force=False): + IshtarSiteProfile = apps.get_model("ishtar_common", "IshtarSiteProfile") + return IshtarSiteProfile.get_current_profile(force=force) + + +PARSE_FORMULA = re.compile("{([^}]*)}") + + +def _deduplicate(value): + new_values = [] + for v in value.split('-'): + if v not in new_values: + new_values.append(v) + return '-'.join(new_values) + + +FORMULA_FILTERS = { + 'upper': lambda x: x.upper(), + 'lower': lambda x: x.lower(), + 'capitalize': lambda x: x.capitalize(), + 'slug': lambda x: slugify(x), + 'deduplicate': _deduplicate +} + + +def get_external_id(key, item): + profile = get_current_profile() + if not hasattr(profile, key): + return + formula = getattr(profile, key) + dct = {} + for fkey in PARSE_FORMULA.findall(formula): + filtered = fkey.split('|') + initial_key = fkey[:] + fkey = filtered[0] + filters = [] + for filtr in filtered[1:]: + if filtr in FORMULA_FILTERS: + filters.append(FORMULA_FILTERS[filtr]) + if fkey.startswith('settings__'): + dct[fkey] = getattr(settings, fkey[len('settings__'):]) or '' + continue + obj = item + for k in fkey.split('__'): + try: + obj = getattr(obj, k) + except ObjectDoesNotExist: + obj = None + if hasattr(obj, 'all') and hasattr(obj, 'count'): # query manager + if not obj.count(): + break + obj = obj.all()[0] + elif callable(obj): + obj = obj() + if obj is None: + break + if obj is None: + dct[initial_key] = '' + else: + dct[initial_key] = str(obj) + for filtr in filters: + dct[initial_key] = filtr(dct[initial_key]) + values = formula.format(**dct).split('||') + value = values[0] + for filtr in values[1:]: + if filtr not in FORMULA_FILTERS: + value += '||' + filtr + continue + value = FORMULA_FILTERS[filtr](value) + return value + + +PRIVATE_FIELDS = ('id', 'history_modifier', 'order', 'uuid') + + +def duplicate_item(item, user=None, data=None): + model = item.__class__ + new = model.objects.get(pk=item.pk) + + for field in model._meta.fields: + # pk is in PRIVATE_FIELDS so: new.pk = None and a new + # item will be created on save + if field.name == "uuid": + new.uuid = uuid.uuid4() + elif field.name in PRIVATE_FIELDS: + setattr(new, field.name, None) + if user: + new.history_user = user + if data: + for k in data: + setattr(new, k, data[k]) + new.save() + + # m2m fields + m2m = [field.name for field in model._meta.many_to_many + if field.name not in PRIVATE_FIELDS] + for field in m2m: + for val in getattr(item, field).all(): + if val not in getattr(new, field).all(): + getattr(new, field).add(val) + return new + + +def get_image_path(instance, filename): + # when using migrations instance is not a real ImageModel instance + if not hasattr(instance, '_get_image_path'): + n = datetime.datetime.now() + return "upload/{}/{:02d}/{:02d}/{}".format( + n.year, n.month, n.day, filename) + return instance._get_image_path(filename) diff --git a/ishtar_common/views_item.py b/ishtar_common/views_item.py index 8b066194f..056385918 100644 --- a/ishtar_common/views_item.py +++ b/ishtar_common/views_item.py @@ -33,9 +33,9 @@ from weasyprint import HTML, CSS from weasyprint.fonts import FontConfiguration from ishtar_common.utils import check_model_access_control, CSV_OPTIONS, \ - get_all_field_names, Round + get_all_field_names, Round, PRIVATE_FIELDS from ishtar_common.models import HistoryError, get_current_profile, \ - PRIVATE_FIELDS, GeneralType, SearchAltName + GeneralType, SearchAltName from .menus import Menu from . import models @@ -507,21 +507,21 @@ def _parse_query_string(string, query_parameters, current_dct, exc_dct, string = string.strip().lower() match = RE_FACET.search(string) - if match or u"=" in string: + if match or "=" in string: queries = [] if match: for idx, gp in enumerate(match.groups()): if not idx: base_term = gp elif gp: - queries.append(u'"{}"'.format(gp)) + queries.append('"{}"'.format(gp)) else: - splited = string.split(u"=") + splited = string.split("=") if len(splited) == 2: base_term = splited[0] queries.append(splited[1]) if queries: - excluded = base_term.startswith(u"-") + excluded = base_term.startswith("-") if excluded: base_term = base_term[1:] if base_term in query_parameters: @@ -687,7 +687,7 @@ def _search_manage_search_vector(model, dct, exc_dct, distinct_queries, if search_query: # remove inside parenthesis search_query = \ - search_query.replace(u'(', u'').replace(u')', u'').strip() + search_query.replace('(', '').replace(')', '').strip() if search_query: if 'extras' not in dct: dct['extras'] = [] @@ -830,10 +830,14 @@ def _manage_facet_search(model, dct, and_reqs): if not hasattr(model, "general_types"): return general_types = model.general_types() + hierarchic_fields = HIERARCHIC_FIELDS[:] for base_k in general_types: - if base_k in HIERARCHIC_FIELDS: # already managed + if base_k in hierarchic_fields: # already managed continue - k = base_k + "__pk" + if base_k.endswith("_id"): + k = base_k + else: + k = base_k + "__pk" if k not in dct or not dct[k].startswith('"') \ or not dct[k].startswith('"'): continue diff --git a/ishtar_common/wizards.py b/ishtar_common/wizards.py index 63f7ee30a..9ee0c5896 100644 --- a/ishtar_common/wizards.py +++ b/ishtar_common/wizards.py @@ -1983,7 +1983,7 @@ class SourceWizard(Wizard): DOCUMENT_EXCLUDED = models.Document.RELATED_MODELS + [ "id", "history_creator", "history_modifier", "search_vector", "imports", - "last_modified" + "last_modified", "document" ] |