summaryrefslogtreecommitdiff
path: root/ishtar_common
diff options
context:
space:
mode:
authorÉtienne Loks <etienne.loks@iggdrasil.net>2020-10-07 19:09:30 +0200
committerÉtienne Loks <etienne.loks@iggdrasil.net>2021-02-28 12:15:21 +0100
commit1e3da04336b9095e4497d098ea19c3178bc74cf6 (patch)
tree9cd21bf7e51d271b958a9a4b2b85367adbb97992 /ishtar_common
parentcf6139abc5a64a2ff52eccdcf0424870cff6d57c (diff)
downloadIshtar-1e3da04336b9095e4497d098ea19c3178bc74cf6.tar.bz2
Ishtar-1e3da04336b9095e4497d098ea19c3178bc74cf6.zip
Refactoring of models. Document container - declare only id
Diffstat (limited to 'ishtar_common')
-rw-r--r--ishtar_common/admin.py18
-rw-r--r--ishtar_common/forms_common.py18
-rw-r--r--ishtar_common/migrations/0201_squashed.py6
-rw-r--r--ishtar_common/migrations/0204_auto_20200514_1124.py27
-rw-r--r--ishtar_common/migrations/0204_auto_20201007_1630.py (renamed from ishtar_common/migrations/0205_auto_20200527_1500.py)52
-rw-r--r--ishtar_common/models.py2890
-rw-r--r--ishtar_common/models_common.py2777
-rw-r--r--ishtar_common/serializers.py3
-rw-r--r--ishtar_common/tasks.py2
-rw-r--r--ishtar_common/tests.py4
-rw-r--r--ishtar_common/utils.py118
-rw-r--r--ishtar_common/views_item.py22
-rw-r--r--ishtar_common/wizards.py2
13 files changed, 3036 insertions, 2903 deletions
diff --git a/ishtar_common/admin.py b/ishtar_common/admin.py
index 287f28249..b5ff67567 100644
--- a/ishtar_common/admin.py
+++ b/ishtar_common/admin.py
@@ -62,7 +62,7 @@ from django.views.decorators.csrf import csrf_protect
from django import forms
-from ishtar_common import models
+from ishtar_common import models, models_common
from ishtar_common.apps import admin_site
from ishtar_common.model_merging import merge_model_objects
from ishtar_common.utils import get_cache, create_slug
@@ -376,10 +376,10 @@ admin_site.register(models.IshtarSiteProfile, IshtarSiteProfileAdmin)
class DepartmentAdmin(admin.ModelAdmin):
list_display = ('number', 'label',)
- model = models.Department
+ model = models_common.Department
-admin_site.register(models.Department, DepartmentAdmin)
+admin_site.register(models_common.Department, DepartmentAdmin)
class OrganizationAdmin(HistorizedObjectAdmin):
@@ -765,7 +765,7 @@ class ImportGEOJSONActionAdmin(object):
continue
num_insee = values.pop('numero_insee')
year = values.pop('year') or None
- t, c = models.Town.objects.get_or_create(
+ t, c = models_common.Town.objects.get_or_create(
numero_insee=num_insee, year=year,
defaults=values)
if c:
@@ -861,7 +861,7 @@ class ImportJSONActionAdmin(admin.ModelAdmin):
class AdminRelatedTownForm(forms.ModelForm):
class Meta:
- model = models.Town.children.through
+ model = models_common.Town.children.through
exclude = []
from_town = AutoCompleteSelectField(
'town', required=True, label=_(u"Parent"))
@@ -869,7 +869,7 @@ class AdminRelatedTownForm(forms.ModelForm):
class AdminTownForm(forms.ModelForm):
class Meta:
- model = models.Town
+ model = models_common.Town
exclude = ['imports', 'departement']
center = PointField(label=_(u"Center"), required=False,
widget=OSMWidget)
@@ -880,7 +880,7 @@ class AdminTownForm(forms.ModelForm):
class TownParentInline(admin.TabularInline):
- model = models.Town.children.through
+ model = models_common.Town.children.through
fk_name = 'to_town'
form = AdminRelatedTownForm
verbose_name = _(u"Parent")
@@ -891,7 +891,7 @@ class TownParentInline(admin.TabularInline):
class TownAdmin(ImportGEOJSONActionAdmin, ImportActionAdmin):
change_list_template = "admin/town_change_list.html"
- model = models.Town
+ model = models_common.Town
list_display = ['name', 'year']
search_fields = ['name']
readonly_fields = ['cached_label']
@@ -907,7 +907,7 @@ class TownAdmin(ImportGEOJSONActionAdmin, ImportActionAdmin):
import_keys = ['slug', 'txt_idx', 'numero_insee']
-admin_site.register(models.Town, TownAdmin)
+admin_site.register(models_common.Town, TownAdmin)
class GeneralTypeAdmin(ImportActionAdmin, ImportJSONActionAdmin):
diff --git a/ishtar_common/forms_common.py b/ishtar_common/forms_common.py
index 49d1829bc..d6d4e197a 100644
--- a/ishtar_common/forms_common.py
+++ b/ishtar_common/forms_common.py
@@ -1281,12 +1281,18 @@ class DocumentForm(forms.ModelForm, CustomForm, ManageOldType):
model=models.Format, label=_("Format"), choices=[],
required=False)
scale = forms.CharField(label=_("Scale"), max_length=30, required=False)
- container = widgets.ModelJQueryAutocompleteField(
+ container_id = forms.IntegerField(
label=_("Current container"),
- model=Container, required=False)
- container_ref = widgets.ModelJQueryAutocompleteField(
+ widget=widgets.JQueryAutoComplete(
+ reverse_lazy('autocomplete-container'),
+ associated_model=Container, new=True),
+ validators=[models.valid_id(Container)], required=False)
+ container_ref_id = forms.IntegerField(
label=_("Reference container"),
- model=Container, required=False)
+ widget=widgets.JQueryAutoComplete(
+ reverse_lazy('autocomplete-container'),
+ associated_model=Container, new=True),
+ validators=[models.valid_id(Container)], required=False)
authors = widgets.Select2MultipleField(
label=_("Authors"), required=False, model=models.Author,
remote="autocomplete-author")
@@ -1378,7 +1384,7 @@ class DocumentForm(forms.ModelForm, CustomForm, ManageOldType):
'receipt_date_in_documentation', 'creation_date',
'publisher', 'language', 'isbn', 'issn', 'licenses',
'source', 'source_free_input',
- 'container', "container_ref",
+ 'container_id', "container_ref_id",
'comment', 'description', 'additional_information', 'duplicate'
]
@@ -1390,7 +1396,7 @@ class DocumentForm(forms.ModelForm, CustomForm, ManageOldType):
'receipt_date': FormHeader(_("Dates")),
'publisher': FormHeader(_("Publishing"), collapse=True),
'source': FormHeader(_("Source"), collapse=True),
- 'container': FormHeader(_("Warehouse"), collapse=True),
+ 'container_id': FormHeader(_("Warehouse"), collapse=True),
'comment': FormHeader(_("Advanced"), collapse=True),
'finds': FormHeader(_("Related items")),
}
diff --git a/ishtar_common/migrations/0201_squashed.py b/ishtar_common/migrations/0201_squashed.py
index d0710636e..d7b65626a 100644
--- a/ishtar_common/migrations/0201_squashed.py
+++ b/ishtar_common/migrations/0201_squashed.py
@@ -203,7 +203,11 @@ class Migration(migrations.Migration):
'ordering': ('title',),
'permissions': (('view_document', 'Peut voir tous les Documents'), ('view_own_document', 'Peut voir ses propres Documents'), ('add_own_document', 'Peut ajouter son propre Document'), ('change_own_document', 'Peut modifier ses propres Documents'), ('delete_own_document', 'Peut supprimer ses propres Documents')),
},
- bases=(ishtar_common.models.StatisticItem, ishtar_common.models.TemplateItem, ishtar_common.models.OwnPerms, models.Model, ishtar_common.models.CachedGen, ishtar_common.models.FixAssociated, ishtar_common.models.CascasdeUpdate, ishtar_common.models.ImageContainerModel, ishtar_common.models.ValueGetter, ishtar_common.models.MainItem),
+ bases=(ishtar_common.models.StatisticItem,
+ ishtar_common.models.TemplateItem,
+ ishtar_common.models.OwnPerms, models.Model,
+ ishtar_common.models.CachedGen,
+ ishtar_common.models.FixAssociated, ishtar_common.models.CascasdeUpdate, ishtar_common.models.ImageContainerModel, ishtar_common.models.ValueGetter, ishtar_common.models.MainItem),
),
migrations.CreateModel(
name='DocumentTemplate',
diff --git a/ishtar_common/migrations/0204_auto_20200514_1124.py b/ishtar_common/migrations/0204_auto_20200514_1124.py
deleted file mode 100644
index 4eea40bf3..000000000
--- a/ishtar_common/migrations/0204_auto_20200514_1124.py
+++ /dev/null
@@ -1,27 +0,0 @@
-# -*- coding: utf-8 -*-
-# Generated by Django 1.11.27 on 2020-05-14 11:24
-from __future__ import unicode_literals
-
-from django.db import migrations, models
-import django.db.models.deletion
-
-
-class Migration(migrations.Migration):
-
- dependencies = [
- ('archaeological_warehouse', '0103_auto_container_views'),
- ('ishtar_common', '0203_auto_20200407_1142'),
- ]
-
- operations = [
- migrations.AddField(
- model_name='document',
- name='container',
- field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='contained_documents', to='archaeological_warehouse.Container', verbose_name='Container'),
- ),
- migrations.AddField(
- model_name='document',
- name='container_ref',
- field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='contained_documents_ref', to='archaeological_warehouse.Container', verbose_name='Reference container'),
- ),
- ]
diff --git a/ishtar_common/migrations/0205_auto_20200527_1500.py b/ishtar_common/migrations/0204_auto_20201007_1630.py
index deed0ad90..9b5ab0876 100644
--- a/ishtar_common/migrations/0205_auto_20200527_1500.py
+++ b/ishtar_common/migrations/0204_auto_20201007_1630.py
@@ -1,18 +1,18 @@
# -*- coding: utf-8 -*-
-# Generated by Django 1.11.27 on 2020-05-27 15:00
+# Generated by Django 1.11.27 on 2020-10-07 16:30
from __future__ import unicode_literals
import django.core.validators
from django.db import migrations, models
import django.db.models.deletion
-import ishtar_common.models
+import ishtar_common.models_common
import re
class Migration(migrations.Migration):
dependencies = [
- ('ishtar_common', '0204_auto_20200514_1124'),
+ ('ishtar_common', '0203_auto_20200407_1142'),
]
operations = [
@@ -30,7 +30,7 @@ class Migration(migrations.Migration):
'verbose_name_plural': 'Document tags',
'ordering': ('label',),
},
- bases=(ishtar_common.models.Cached, models.Model),
+ bases=(ishtar_common.models_common.Cached, models.Model),
),
migrations.CreateModel(
name='Language',
@@ -46,7 +46,7 @@ class Migration(migrations.Migration):
'verbose_name': 'Language',
'verbose_name_plural': 'Languages',
},
- bases=(ishtar_common.models.Cached, models.Model),
+ bases=(ishtar_common.models_common.Cached, models.Model),
),
migrations.AlterModelOptions(
name='sourcetype',
@@ -54,6 +54,16 @@ class Migration(migrations.Migration):
),
migrations.AddField(
model_name='document',
+ name='container_id',
+ field=models.PositiveIntegerField(blank=True, null=True, verbose_name='Container ID'),
+ ),
+ migrations.AddField(
+ model_name='document',
+ name='container_ref_id',
+ field=models.PositiveIntegerField(blank=True, null=True, verbose_name='Container ID'),
+ ),
+ migrations.AddField(
+ model_name='document',
name='isbn',
field=models.CharField(blank=True, max_length=13, null=True, verbose_name='ISBN'),
),
@@ -65,7 +75,7 @@ class Migration(migrations.Migration):
migrations.AddField(
model_name='document',
name='publisher',
- field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='ishtar_common.Organization', verbose_name='Publisher'),
+ field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='publish', to='ishtar_common.Organization', verbose_name='Publisher'),
),
migrations.AddField(
model_name='document',
@@ -92,6 +102,36 @@ class Migration(migrations.Migration):
name='is_localized',
field=models.BooleanField(default=False, help_text='Setting a language for this type of document is relevant', verbose_name='Is localized'),
),
+ migrations.AlterField(
+ model_name='document',
+ name='format_type',
+ field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, to='ishtar_common.Format', verbose_name='Medium'),
+ ),
+ migrations.AlterField(
+ model_name='importercolumn',
+ name='regexp_pre_filter',
+ field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='columns', to='ishtar_common.Regexp'),
+ ),
+ migrations.AlterField(
+ model_name='importercolumn',
+ name='value_format',
+ field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='columns', to='ishtar_common.ValueFormater'),
+ ),
+ migrations.AlterField(
+ model_name='importertype',
+ name='associated_models',
+ field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='importer_type_associated', to='ishtar_common.ImporterModel', verbose_name='Associated model'),
+ ),
+ migrations.AlterField(
+ model_name='importertype',
+ name='created_models',
+ field=models.ManyToManyField(blank=True, help_text='Leave blank for no restrictions', related_name='importer_type_created', to='ishtar_common.ImporterModel', verbose_name='Models that can accept new items'),
+ ),
+ migrations.AlterField(
+ model_name='importtarget',
+ name='formater_type',
+ field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='targets', to='ishtar_common.FormaterType'),
+ ),
migrations.AddField(
model_name='document',
name='language',
diff --git a/ishtar_common/models.py b/ishtar_common/models.py
index 001c1894f..f23cab926 100644
--- a/ishtar_common/models.py
+++ b/ishtar_common/models.py
@@ -21,7 +21,6 @@
Models description
"""
import copy
-from collections import OrderedDict
import datetime
import inspect
from importlib import import_module
@@ -29,9 +28,6 @@ from jinja2 import TemplateSyntaxError, UndefinedError
import json
import logging
import os
-import pyqrcode
-import re
-import shutil
import string
import tempfile
import time
@@ -46,7 +42,6 @@ import zipfile
from urllib.parse import urlencode
from xml.etree import ElementTree as ET
-from django import forms
from django.apps import apps
from django.conf import settings
from django.contrib.auth.models import User, Group
@@ -54,32 +49,22 @@ from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.contrib.gis.db import models
from django.contrib.postgres.fields import JSONField
-from django.contrib.postgres.search import SearchVectorField, SearchVector
from django.contrib.postgres.indexes import GinIndex
from django.contrib.sites.models import Site
from django.core.cache import cache
from django.core.exceptions import ObjectDoesNotExist, ValidationError, \
MultipleObjectsReturned
from django.core.files.uploadedfile import SimpleUploadedFile
-from django.core.files import File
-from django.core.serializers import serialize
from django.core.urlresolvers import reverse, NoReverseMatch
-from django.core.validators import validate_slug
-from django.db import connection
from django.db.models import Q, Max, Count, F
from django.db.models.signals import post_save, post_delete, m2m_changed
from django.db.utils import DatabaseError
from django.template.defaultfilters import slugify
from django.utils.functional import lazy
-from django.utils.safestring import SafeText, mark_safe
-from django.utils.translation import activate, deactivate
from ishtar_common.utils import ugettext_lazy as _, ugettext, \
- pgettext_lazy
+ pgettext_lazy, get_external_id, get_current_profile, duplicate_item, \
+ get_image_path
from ishtar_common.utils_secretary import IshtarSecretaryRenderer
-from simple_history.models import HistoricalRecords as BaseHistoricalRecords
-from simple_history.signals import post_create_historical_record, \
- pre_create_historical_record
-from unidecode import unidecode
from ishtar_common.alternative_configs import ALTERNATE_CONFIGS, \
ALTERNATE_CONFIGS_CHOICES
@@ -87,16 +72,24 @@ from ishtar_common.alternative_configs import ALTERNATE_CONFIGS, \
from ishtar_common.data_importer import pre_importer_action
from ishtar_common.model_managers import SlugModelManager, ExternalIdManager, \
- TypeManager, UUIDModelManager
+ UUIDModelManager
from ishtar_common.model_merging import merge_model_objects
from ishtar_common.models_imports import ImporterModel, ImporterType, \
ImporterDefault, ImporterDefaultValues, ImporterColumn, \
ImporterDuplicateField, Regexp, ImportTarget, TargetKey, FormaterType, \
Import, TargetKeyGroup, ValueFormater
-from ishtar_common.templatetags.link_to_window import simple_link_to_window
-from ishtar_common.utils import get_cache, disable_for_loaddata, create_slug, \
- get_all_field_names, merge_tsvectors, cached_label_changed, post_save_geo, \
- generate_relation_graph, max_size_help, task
+from ishtar_common.utils import get_cache, create_slug, \
+ get_all_field_names, cached_label_changed, \
+ generate_relation_graph, max_size_help
+
+from ishtar_common.models_common import GeneralType, HierarchicalType, \
+ BaseHistorizedItem, LightHistorizedItem, FullSearch, Imported, \
+ FixAssociated, SearchAltName, HistoryError, OwnPerms, Cached, \
+ Address, post_save_cache, TemplateItem, SpatialReferenceSystem, \
+ DashboardFormItem, document_attached_changed, SearchAltName, \
+ DynamicRequest, GeoItem, QRCodeItem, SearchVectorConfig, DocumentItem, \
+ QuickAction, MainItem, Merge, ShortMenuItem, Town, ImageContainerModel, \
+ StatisticItem, CachedGen, CascasdeUpdate, Department, State
__all__ = [
'ImporterModel', 'ImporterType', 'ImporterDefault', 'ImporterDefaultValues',
@@ -107,7 +100,8 @@ __all__ = [
'LightHistorizedItem', 'OwnPerms', 'Address', 'post_save_cache',
'DashboardFormItem', 'ShortMenuItem', 'document_attached_changed',
'SearchAltName', 'DynamicRequest', 'GeoItem', 'QRCodeItem',
- 'SearchVectorConfig', 'DocumentItem'
+ 'SearchVectorConfig', 'DocumentItem', 'CachedGen', 'StatisticItem',
+ 'CascasdeUpdate', 'Department', 'State'
]
logger = logging.getLogger(__name__)
@@ -262,88 +256,6 @@ class HistoryModel(models.Model):
create=create)
-class HistoricalRecords(BaseHistoricalRecords):
- def _save_historic(self, manager, instance, history_date, history_type,
- history_user, history_change_reason, using, attrs):
- history_instance = manager.model(
- history_date=history_date,
- history_type=history_type,
- history_user=history_user,
- history_change_reason=history_change_reason,
- **attrs
- )
-
- pre_create_historical_record.send(
- sender=manager.model,
- instance=instance,
- history_date=history_date,
- history_user=history_user,
- history_change_reason=history_change_reason,
- history_instance=history_instance,
- using=using,
- )
-
- history_instance.save(using=using)
-
- post_create_historical_record.send(
- sender=manager.model,
- instance=instance,
- history_instance=history_instance,
- history_date=history_date,
- history_user=history_user,
- history_change_reason=history_change_reason,
- using=using,
- )
-
- def create_historical_record(self, instance, history_type, using=None):
- try:
- history_modifier = getattr(instance, 'history_modifier', None)
- assert history_modifier
- except (User.DoesNotExist, AssertionError):
- # on batch removing of users, user could have disappeared
- return
- history_date = getattr(instance, "_history_date",
- datetime.datetime.now())
- history_change_reason = getattr(instance, "changeReason", None)
- force = getattr(instance, "_force_history", False)
- manager = getattr(instance, self.manager_name)
- attrs = {}
- for field in instance._meta.fields:
- attrs[field.attname] = getattr(instance, field.attname)
- q_history = instance.history \
- .filter(history_modifier_id=history_modifier.pk) \
- .order_by('-history_date', '-history_id')
- # instance.skip_history_when_saving = True
- if not q_history.count():
- if force:
- delattr(instance, '_force_history')
- self._save_historic(
- manager, instance, history_date, history_type, history_modifier,
- history_change_reason, using, attrs)
- return
- old_instance = q_history.all()[0]
- # multiple saving by the same user in a very short time are generaly
- # caused by post_save signals it is not relevant to keep them
- min_history_date = datetime.datetime.now() \
- - datetime.timedelta(seconds=5)
- q = q_history.filter(history_date__isnull=False,
- history_date__gt=min_history_date) \
- .order_by('-history_date', '-history_id')
- if not force and q.count():
- return
-
- if force:
- delattr(instance, '_force_history')
-
- # record a new version only if data have been changed
- for field in instance._meta.fields:
- if getattr(old_instance, field.attname) != attrs[field.attname]:
- self._save_historic(manager, instance, history_date,
- history_type, history_modifier,
- history_change_reason, using, attrs)
- return
-
-
def valid_id(cls):
# valid ID validator for models
def func(value):
@@ -383,741 +295,6 @@ def is_unique(cls, field):
return func
-class OwnPerms(object):
- """
- Manage special permissions for object's owner
- """
-
- @classmethod
- def get_query_owns(cls, ishtaruser):
- """
- Query object to get own items
- """
- return None # implement for each object
-
- def can_view(self, request):
- if hasattr(self, "LONG_SLUG"):
- perm = "view_" + self.LONG_SLUG
- else:
- perm = "view_" + self.SLUG
- return self.can_do(request, perm)
-
- def can_do(self, request, action_name):
- """
- Check permission availability for the current object.
- :param request: request object
- :param action_name: action name eg: "change_find" - "own" variation is
- checked
- :return: boolean
- """
- if not getattr(request.user, 'ishtaruser', None):
- return False
- splited = action_name.split('_')
- action_own_name = splited[0] + '_own_' + '_'.join(splited[1:])
- user = request.user
- if action_own_name == "view_own_findbasket":
- action_own_name = "view_own_find"
- return user.ishtaruser.has_right(action_name, request.session) or \
- (user.ishtaruser.has_right(action_own_name, request.session)
- and self.is_own(user.ishtaruser))
-
- def is_own(self, user, alt_query_own=None):
- """
- Check if the current object is owned by the user
- """
- if isinstance(user, IshtarUser):
- ishtaruser = user
- elif hasattr(user, 'ishtaruser'):
- ishtaruser = user.ishtaruser
- else:
- return False
- if not alt_query_own:
- query = self.get_query_owns(ishtaruser)
- else:
- query = getattr(self, alt_query_own)(ishtaruser)
- if not query:
- return False
- query &= Q(pk=self.pk)
- return self.__class__.objects.filter(query).count()
-
- @classmethod
- def has_item_of(cls, user):
- """
- Check if the user own some items
- """
- if isinstance(user, IshtarUser):
- ishtaruser = user
- elif hasattr(user, 'ishtaruser'):
- ishtaruser = user.ishtaruser
- else:
- return False
- query = cls.get_query_owns(ishtaruser)
- if not query:
- return False
- return cls.objects.filter(query).count()
-
- @classmethod
- def _return_get_owns(cls, owns, values, get_short_menu_class,
- label_key='cached_label'):
- if not owns:
- return []
- sorted_values = []
- if hasattr(cls, 'BASKET_MODEL'):
- owns_len = len(owns)
- for idx, item in enumerate(reversed(owns)):
- if get_short_menu_class:
- item = item[0]
- if type(item) == cls.BASKET_MODEL:
- basket = owns.pop(owns_len - idx - 1)
- sorted_values.append(basket)
- sorted_values = list(reversed(sorted_values))
- if not values:
- if not get_short_menu_class:
- return sorted_values + list(
- sorted(owns, key=lambda x: getattr(x, label_key) or ""))
- return sorted_values + list(
- sorted(owns, key=lambda x: getattr(x[0], label_key) or ""))
- if not get_short_menu_class:
- return sorted_values + list(
- sorted(owns, key=lambda x: x[label_key] or ""))
- return sorted_values + list(
- sorted(owns, key=lambda x: x[0][label_key] or ""))
-
- @classmethod
- def get_owns(cls, user, replace_query=None, limit=None, values=None,
- get_short_menu_class=False, menu_filtr=None):
- """
- Get Own items
- """
- if not replace_query:
- replace_query = {}
- if hasattr(user, 'is_authenticated') and not user.is_authenticated():
- returned = cls.objects.filter(pk__isnull=True)
- if values:
- returned = []
- return returned
- if isinstance(user, User):
- try:
- ishtaruser = IshtarUser.objects.get(user_ptr=user)
- except IshtarUser.DoesNotExist:
- returned = cls.objects.filter(pk__isnull=True)
- if values:
- returned = []
- return returned
- elif isinstance(user, IshtarUser):
- ishtaruser = user
- else:
- if values:
- return []
- return cls.objects.filter(pk__isnull=True)
- items = []
- if hasattr(cls, 'BASKET_MODEL'):
- items = list(cls.BASKET_MODEL.objects.filter(user=ishtaruser).all())
- query = cls.get_query_owns(ishtaruser)
- if not query and not replace_query:
- returned = cls.objects.filter(pk__isnull=True)
- if values:
- returned = []
- return returned
- if query:
- q = cls.objects.filter(query)
- else: # replace_query
- q = cls.objects.filter(replace_query)
- if values:
- q = q.values(*values)
- if limit:
- items += list(q.order_by('-pk')[:limit])
- else:
- items += list(q.order_by(*cls._meta.ordering).all())
- if get_short_menu_class:
- if values:
- if 'id' not in values:
- raise NotImplementedError(
- "Call of get_owns with get_short_menu_class option and"
- " no 'id' in values is not implemented")
- my_items = []
- for i in items:
- if hasattr(cls, 'BASKET_MODEL') and \
- type(i) == cls.BASKET_MODEL:
- dct = dict([(k, getattr(i, k)) for k in values])
- my_items.append(
- (dct, cls.BASKET_MODEL.get_short_menu_class(i.pk)))
- else:
- my_items.append((i, cls.get_short_menu_class(i['id'])))
- items = my_items
- else:
- items = [(i, cls.get_short_menu_class(i.pk)) for i in items]
- return items
-
- @classmethod
- def _get_query_owns_dicts(cls, ishtaruser):
- """
- List of query own dict to construct the query.
- Each dict are join with an AND operator, each dict key, values are
- joined with OR operator
- """
- return []
-
- @classmethod
- def _construct_query_own(cls, prefix, dct_list):
- q = None
- for subquery_dict in dct_list:
- subquery = None
- for k in subquery_dict:
- subsubquery = Q(**{prefix + k: subquery_dict[k]})
- if subquery:
- subquery |= subsubquery
- else:
- subquery = subsubquery
- if not subquery:
- continue
- if q:
- q &= subquery
- else:
- q = subquery
- return q
-
-
-class CachedGen(object):
- @classmethod
- def refresh_cache(cls):
- raise NotImplementedError()
-
- @classmethod
- def _add_cache_key_to_refresh(cls, keys):
- cache_ckey, current_keys = get_cache(cls, ['_current_keys'])
- if type(current_keys) != list:
- current_keys = []
- if keys not in current_keys:
- current_keys.append(keys)
- cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT)
-
-
-class Cached(CachedGen):
- slug_field = 'txt_idx'
-
- @classmethod
- def refresh_cache(cls):
- cache_ckey, current_keys = get_cache(cls, ['_current_keys'])
- if not current_keys:
- return
- for keys in current_keys:
- if len(keys) == 2 and keys[0] == '__slug':
- cls.get_cache(keys[1], force=True)
- elif keys[0] == '__get_types':
- default = None
- empty_first = True
- exclude = []
- if len(keys) >= 2:
- default = keys.pop()
- if len(keys) > 1:
- empty_first = bool(keys.pop())
- exclude = keys[1:]
- cls.get_types(
- exclude=exclude, empty_first=empty_first, default=default,
- force=True)
- elif keys[0] == '__get_help':
- cls.get_help(force=True)
-
- @classmethod
- def _add_cache_key_to_refresh(cls, keys):
- cache_ckey, current_keys = get_cache(cls, ['_current_keys'])
- if type(current_keys) != list:
- current_keys = []
- if keys not in current_keys:
- current_keys.append(keys)
- cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT)
-
- @classmethod
- def get_cache(cls, slug, force=False):
- cache_key, value = get_cache(cls, ['__slug', slug])
- if not force and value:
- return value
- try:
- k = {cls.slug_field: slug}
- obj = cls.objects.get(**k)
- cache.set(cache_key, obj, settings.CACHE_TIMEOUT)
- return obj
- except cls.DoesNotExist:
- cache.set(cache_key, None, settings.CACHE_TIMEOUT)
- return None
-
-
-@disable_for_loaddata
-def post_save_cache(sender, **kwargs):
- sender.refresh_cache()
-
-
-class GeneralType(Cached, models.Model):
- """
- Abstract class for "types"
- """
- label = models.TextField(_("Label"))
- txt_idx = models.TextField(
- _("Textual ID"), validators=[validate_slug],
- unique=True,
- help_text=_(
- "The slug is the standardized version of the name. It contains "
- "only lowercase letters, numbers and hyphens. Each slug must "
- "be unique."))
- comment = models.TextField(_("Comment"), blank=True, null=True)
- available = models.BooleanField(_("Available"), default=True)
- HELP_TEXT = ""
- objects = TypeManager()
-
- class Meta:
- abstract = True
-
- def __str__(self):
- return self.label
-
- def natural_key(self):
- return (self.txt_idx,)
-
- def history_compress(self):
- return self.txt_idx
-
- @classmethod
- def history_decompress(cls, value, create=False):
- if not value:
- return []
- res = []
- for txt_idx in value:
- try:
- res.append(cls.objects.get(txt_idx=txt_idx))
- except cls.DoesNotExist:
- continue
- return res
-
- @property
- def explicit_label(self):
- return "{} ({})".format(self.label, self._meta.verbose_name)
-
- @classmethod
- def create_default_for_test(cls):
- return [cls.objects.create(label='Test %d' % i) for i in range(5)]
-
- @property
- def short_label(self):
- return self.label
-
- @property
- def name(self):
- return self.label
-
- @classmethod
- def get_or_create(cls, slug, label=''):
- """
- Get or create a new item.
-
- :param slug: textual id
- :param label: label for initialization if the item doesn't exist (not
- mandatory)
-
- :return: instancied item of the base class
- """
-
- item = cls.get_cache(slug)
- if item:
- return item
- if isinstance(slug, list):
- slug = slug[0]
- item, __ = cls.objects.get_or_create(
- txt_idx=slug, defaults={'label': label})
- return item
-
- @classmethod
- def get_or_create_pk(cls, slug):
- """
- Get an id from a slug. Create the associated item if needed.
-
- :param slug: textual id
-
- :return: id of the item (string)
- """
- return str(cls.get_or_create(slug).pk)
-
- @classmethod
- def get_or_create_pks(cls, slugs):
- """
- Get and merge a list of ids from a slug list. Create the associated
- items if needed.
-
- :param slugs: textual ids
-
- :return: string with ids separated by "_"
- """
- items = []
- for slug in slugs:
- items.append(str(cls.get_or_create(slug).pk))
- return "_".join(items)
-
- @classmethod
- def get_help(cls, dct=None, exclude=None, force=False, full_hierarchy=None):
- if not dct:
- dct = {}
- if not exclude:
- exclude = []
- keys = ['__get_help']
- keys += ["{}".format(ex) for ex in exclude]
- keys += ['{}-{}'.format(str(k), dct[k]) for k in dct]
- cache_key, value = get_cache(cls, keys)
- if value and not force:
- return mark_safe(value)
- help_text = cls.HELP_TEXT
- c_rank = -1
- help_items = "\n"
- for item in cls.get_types(dct=dct, instances=True, exclude=exclude):
- if hasattr(item, '__iter__'):
- pk = item[0]
- item = cls.objects.get(pk=pk)
- item.rank = c_rank + 1
- if hasattr(item, 'parent'):
- c_item = item
- parents = []
- while c_item.parent:
- parents.append(c_item.parent.label)
- c_item = c_item.parent
- parents.reverse()
- parents.append(item.label)
- item.label = " / ".join(parents)
- if not item.comment:
- continue
- if c_rank > item.rank:
- help_items += "</dl>\n"
- elif c_rank < item.rank:
- help_items += "<dl>\n"
- c_rank = item.rank
- help_items += "<dt>%s</dt><dd>%s</dd>" % (
- item.label, "<br/>".join(item.comment.split('\n')))
- c_rank += 1
- if c_rank:
- help_items += c_rank * "</dl>"
- if help_text or help_items != u'\n':
- help_text = help_text + help_items
- else:
- help_text = ""
- cache.set(cache_key, help_text, settings.CACHE_TIMEOUT)
- return mark_safe(help_text)
-
- @classmethod
- def _get_initial_types(cls, initial, type_pks, instance=False):
- new_vals = []
- if not initial:
- return []
- if type(initial) not in (list, tuple):
- initial = [initial]
- for value in initial:
- try:
- pk = int(value)
- except (ValueError, TypeError):
- continue
- if pk in type_pks:
- continue
- try:
- extra_type = cls.objects.get(pk=pk)
- if instance:
- new_vals.append(extra_type)
- else:
- new_vals.append((extra_type.pk, str(extra_type)))
- except cls.DoesNotExist:
- continue
- return new_vals
-
- @classmethod
- def get_types(cls, dct=None, instances=False, exclude=None,
- empty_first=True, default=None, initial=None, force=False,
- full_hierarchy=False):
- if not dct:
- dct = {}
- if not exclude:
- exclude = []
- types = []
- if not instances and empty_first and not default:
- types = [('', '--')]
- types += cls._pre_get_types(dct, instances, exclude,
- default, force,
- get_full_hierarchy=full_hierarchy)
- if not initial:
- return types
- new_vals = cls._get_initial_types(initial, [idx for idx, lbl in types])
- types += new_vals
- return types
-
- @classmethod
- def _pre_get_types(cls, dct=None, instances=False, exclude=None,
- default=None, force=False, get_full_hierarchy=False):
- if not dct:
- dct = {}
- if not exclude:
- exclude = []
- # cache
- cache_key = None
- if not instances:
- keys = ['__get_types']
- keys += ["{}".format(ex) for ex in exclude] + \
- ["{}".format(default)]
- keys += ['{}-{}'.format(str(k), dct[k]) for k in dct]
- cache_key, value = get_cache(cls, keys)
- if value and not force:
- return value
- base_dct = dct.copy()
- if hasattr(cls, 'parent'):
- if not cache_key:
- return cls._get_parent_types(
- base_dct, instances, exclude=exclude,
- default=default, get_full_hierarchy=get_full_hierarchy)
- vals = [v for v in cls._get_parent_types(
- base_dct, instances, exclude=exclude,
- default=default, get_full_hierarchy=get_full_hierarchy)]
- cache.set(cache_key, vals, settings.CACHE_TIMEOUT)
- return vals
-
- if not cache_key:
- return cls._get_types(base_dct, instances, exclude=exclude,
- default=default)
- vals = [
- v for v in cls._get_types(base_dct, instances, exclude=exclude,
- default=default)
- ]
- cache.set(cache_key, vals, settings.CACHE_TIMEOUT)
- return vals
-
- @classmethod
- def _get_types(cls, dct=None, instances=False, exclude=None, default=None):
- if not dct:
- dct = {}
- if not exclude:
- exclude = []
- dct['available'] = True
- if default:
- try:
- default = cls.objects.get(txt_idx=default)
- yield (default.pk, _(str(default)))
- except cls.DoesNotExist:
- pass
- items = cls.objects.filter(**dct)
- if default and default != "None":
- if hasattr(default, 'txt_idx'):
- exclude.append(default.txt_idx)
- else:
- exclude.append(default)
- if exclude:
- items = items.exclude(txt_idx__in=exclude)
- for item in items.order_by(*cls._meta.ordering).all():
- if instances:
- item.rank = 0
- yield item
- else:
- yield (item.pk, _(str(item)) if item and str(item) else '')
-
- @classmethod
- def _get_childs_list(cls, dct=None, exclude=None, instances=False):
- if not dct:
- dct = {}
- if not exclude:
- exclude = []
- if 'parent' in dct:
- dct.pop('parent')
- childs = cls.objects.filter(**dct)
- if exclude:
- childs = childs.exclude(txt_idx__in=exclude)
- if hasattr(cls, 'order'):
- childs = childs.order_by('order')
- res = {}
- if instances:
- for item in childs.all():
- parent_id = item.parent_id or 0
- if parent_id not in res:
- res[parent_id] = []
- res[parent_id].append(item)
- else:
- for item in childs.values("id", "parent_id", "label").all():
- parent_id = item["parent_id"] or 0
- if item["id"] == item["parent_id"]:
- parent_id = 0
- if parent_id not in res:
- res[parent_id] = []
- res[parent_id].append((item["id"], item["label"]))
- return res
-
- PREFIX = "&#x2502; "
- PREFIX_EMPTY = "&nbsp; "
- PREFIX_MEDIUM = "&#x251C; "
- PREFIX_LAST = "&#x2514; "
- PREFIX_CODES = ["\u2502", "\u251C", "\u2514"]
-
- @classmethod
- def _get_childs(cls, item, child_list, prefix=0, instances=False,
- is_last=False, last_of=None, get_full_hierarchy=False):
- if not last_of:
- last_of = []
-
- prefix += 1
- current_child_lst = []
- if item in child_list:
- current_child_lst = child_list[item]
-
- lst = []
- total = len(current_child_lst)
- full_hierarchy_initial = get_full_hierarchy
- for idx, child in enumerate(current_child_lst):
- mylast_of = last_of[:]
- p = ''
- if instances:
- child.rank = prefix
- lst.append(child)
- else:
- if full_hierarchy_initial:
- if isinstance(full_hierarchy_initial, str):
- p = full_hierarchy_initial + " > "
- else:
- p = ""
- else:
- cprefix = prefix
- while cprefix:
- cprefix -= 1
- if not cprefix:
- if (idx + 1) == total:
- p += cls.PREFIX_LAST
- else:
- p += cls.PREFIX_MEDIUM
- elif is_last:
- if mylast_of:
- clast = mylast_of.pop(0)
- if clast:
- p += cls.PREFIX_EMPTY
- else:
- p += cls.PREFIX
- else:
- p += cls.PREFIX_EMPTY
- else:
- p += cls.PREFIX
- lst.append((
- child[0], SafeText(p + str(_(child[1])))
- ))
- clast_of = last_of[:]
- clast_of.append(idx + 1 == total)
- if instances:
- child_id = child.id
- else:
- child_id = child[0]
- if get_full_hierarchy:
- if p:
- if not p.endswith(" > "):
- p += " > "
- get_full_hierarchy = p + child[1]
- else:
- get_full_hierarchy = child[1]
- for sub_child in cls._get_childs(
- child_id, child_list, prefix, instances,
- is_last=((idx + 1) == total), last_of=clast_of,
- get_full_hierarchy=get_full_hierarchy):
- lst.append(sub_child)
- return lst
-
- @classmethod
- def _get_parent_types(cls, dct=None, instances=False, exclude=None,
- default=None, get_full_hierarchy=False):
- if not dct:
- dct = {}
- if not exclude:
- exclude = []
- dct['available'] = True
- child_list = cls._get_childs_list(dct, exclude, instances)
-
- if 0 in child_list:
- for item in child_list[0]:
- if instances:
- item.rank = 0
- item_id = item.pk
- yield item
- else:
- item_id = item[0]
- yield item
- if get_full_hierarchy:
- get_full_hierarchy = item[1]
- for child in cls._get_childs(
- item_id, child_list, instances=instances,
- get_full_hierarchy=get_full_hierarchy):
- yield child
-
- def save(self, *args, **kwargs):
- if not self.id and not self.label:
- txt_idx = self.txt_idx
- if isinstance(txt_idx, list):
- txt_idx = txt_idx[0]
- self.txt_idx = txt_idx
- self.label = " ".join(" ".join(self.txt_idx.split('-'))
- .split('_')).title()
- if not self.txt_idx:
- self.txt_idx = slugify(self.label)[:100]
-
- # clean old keys
- if self.pk:
- old = self.__class__.objects.get(pk=self.pk)
- content_type = ContentType.objects.get_for_model(self.__class__)
- if slugify(self.label) != slugify(old.label):
- ItemKey.objects.filter(
- object_id=self.pk, key=slugify(old.label),
- content_type=content_type).delete()
- if self.txt_idx != old.txt_idx:
- ItemKey.objects.filter(
- object_id=self.pk, key=old.txt_idx,
- content_type=content_type).delete()
-
- obj = super(GeneralType, self).save(*args, **kwargs)
- self.generate_key(force=True)
- return obj
-
- def add_key(self, key, force=False, importer=None, group=None,
- user=None):
- content_type = ContentType.objects.get_for_model(self.__class__)
- if not importer and not force and ItemKey.objects.filter(
- key=key, content_type=content_type).count():
- return
- filtr = {'key': key, 'content_type': content_type}
- if group:
- filtr['group'] = group
- elif user:
- filtr['user'] = user
- else:
- filtr['importer'] = importer
- if force:
- ItemKey.objects.filter(**filtr).exclude(object_id=self.pk).delete()
- filtr['object_id'] = self.pk
- ItemKey.objects.get_or_create(**filtr)
-
- def generate_key(self, force=False):
- for key in (slugify(self.label), self.txt_idx):
- self.add_key(key)
-
- def get_keys(self, importer):
- keys = [self.txt_idx]
- content_type = ContentType.objects.get_for_model(self.__class__)
- base_q = Q(content_type=content_type, object_id=self.pk)
- subquery = Q(importer__isnull=True, user__isnull=True,
- group__isnull=True)
- subquery |= Q(user__isnull=True, group__isnull=True,
- importer=importer)
- if importer.user:
- subquery |= Q(user=importer.user, group__isnull=True,
- importer=importer)
- if importer.associated_group:
- subquery |= Q(user__isnull=True, group=importer.associated_group,
- importer=importer)
- q = ItemKey.objects.filter(base_q & subquery)
- for ik in q.exclude(key=self.txt_idx).all():
- keys.append(ik.key)
- return keys
-
- @classmethod
- def generate_keys(cls):
- # content_type = ContentType.objects.get_for_model(cls)
- for item in cls.objects.all():
- item.generate_key()
-
-
def get_general_type_label(model, slug):
obj = model.get_cache(slug)
if not obj:
@@ -1125,23 +302,6 @@ def get_general_type_label(model, slug):
return str(obj)
-class HierarchicalType(GeneralType):
- parent = models.ForeignKey('self', blank=True, null=True,
- on_delete=models.SET_NULL,
- verbose_name=_("Parent"))
-
- class Meta:
- abstract = True
-
- def full_label(self):
- lbls = [self.label]
- item = self
- while item.parent:
- item = item.parent
- lbls.append(item.label)
- return " > ".join(reversed(lbls))
-
-
class TinyUrl(models.Model):
CHAR_MAP = string.ascii_letters + string.digits
CHAR_MAP_LEN = len(CHAR_MAP)
@@ -1183,24 +343,6 @@ class ItemKey(models.Model):
return self.key
-def get_image_path(instance, filename):
- # when using migrations instance is not a real ImageModel instance
- if not hasattr(instance, '_get_image_path'):
- n = datetime.datetime.now()
- return "upload/{}/{:02d}/{:02d}/{}".format(
- n.year, n.month, n.day, filename)
- return instance._get_image_path(filename)
-
-
-class ImageContainerModel(object):
- def _get_image_path(self, filename):
- return "{}/{}".format(self._get_base_image_path(), filename)
-
- def _get_base_image_path(self):
- n = datetime.datetime.now()
- return "upload/{}/{:02d}/{:02d}".format(n.year, n.month, n.day)
-
-
class ImageModel(models.Model, ImageContainerModel):
image = models.ImageField(upload_to=get_image_path, blank=True, null=True,
max_length=255, help_text=max_size_help())
@@ -1286,17 +428,6 @@ class ImageModel(models.Model, ImageContainerModel):
)
-class HistoryError(Exception):
- def __init__(self, value):
- self.value = value
-
- def __str__(self):
- return repr(self.value)
-
-
-PRIVATE_FIELDS = ('id', 'history_modifier', 'order', 'uuid')
-
-
class BulkUpdatedItem(object):
@classmethod
def bulk_recursion(cls, transaction_id, extra_args):
@@ -1456,1071 +587,6 @@ class JsonDataField(models.Model):
_("Content types of the field and of the menu do not match"))
-class JsonData(models.Model, CachedGen):
- data = JSONField(default={}, blank=True)
-
- class Meta:
- abstract = True
-
- def pre_save(self):
- if not self.data:
- self.data = {}
-
- @property
- def json_sections(self):
- sections = []
- try:
- content_type = ContentType.objects.get_for_model(self)
- except ContentType.DoesNotExists:
- return sections
- fields = list(JsonDataField.objects.filter(
- content_type=content_type, display=True, section__isnull=True
- ).all()) # no section fields
-
- fields += list(JsonDataField.objects.filter(
- content_type=content_type, display=True, section__isnull=False
- ).order_by('section__order', 'order').all())
-
- for field in fields:
- value = None
- data = self.data.copy()
- for key in field.key.split('__'):
- if key in data:
- value = copy.copy(data[key])
- data = data[key]
- else:
- value = None
- break
- if value is None:
- continue
- if type(value) in (list, tuple):
- value = " ; ".join([str(v) for v in value])
- section_name = field.section.name if field.section else None
- if not sections or section_name != sections[-1][0]:
- # if section name is identical it is the same
- sections.append((section_name, []))
- sections[-1][1].append((field.name, value))
- return sections
-
- @classmethod
- def refresh_cache(cls):
- __, refreshed = get_cache(cls, ['cache_refreshed'])
- if refreshed and time.time() - refreshed < 1:
- return
- cache_ckey, current_keys = get_cache(cls, ['_current_keys'])
- if not current_keys:
- return
- for keys in current_keys:
- if keys[0] == '__get_dynamic_choices':
- cls._get_dynamic_choices(keys[1], force=True)
-
- @classmethod
- def _get_dynamic_choices(cls, key, force=False):
- """
- Get choice from existing values
- :param key: data key
- :param force: if set to True do not use cache
- :return: tuple of choices (id, value)
- """
- cache_key, value = get_cache(cls, ['__get_dynamic_choices', key])
- if not force and value:
- return value
- choices = set()
- splitted_key = key[len('data__'):].split('__')
- q = cls.objects.filter(
- data__has_key=key[len('data__'):]).values_list('data', flat=True)
- for value in q.all():
- for k in splitted_key:
- value = value[k]
- choices.add(value)
- choices = [('', '')] + [(v, v) for v in sorted(list(choices))]
- cache.set(cache_key, choices, settings.CACHE_SMALLTIMEOUT)
- return choices
-
-
-class Imported(models.Model):
- imports = models.ManyToManyField(
- Import, blank=True,
- related_name="imported_%(app_label)s_%(class)s")
-
- class Meta:
- abstract = True
-
-
-class SearchAltName(object):
- def __init__(self, search_key, search_query, extra_query=None,
- distinct_query=False):
- self.search_key = search_key
- self.search_query = search_query
- self.extra_query = extra_query or {}
- self.distinct_query = distinct_query
-
-
-class DynamicRequest(object):
- def __init__(self, label, app_name, model_name, form_key, search_key,
- type_query, search_query):
- self.label = label
- self.form_key = form_key
- self.search_key = search_key
- self.app_name = app_name
- self.model_name = model_name
- self.type_query = type_query
- self.search_query = search_query
-
- def get_all_types(self):
- model = apps.get_app_config(self.app_name).get_model(self.model_name)
- return model.objects.filter(available=True)
-
- def get_form_fields(self):
- fields = {}
- for item in self.get_all_types().all():
- fields[self.form_key + "-" + item.txt_idx] = forms.CharField(
- label=str(self.label) + " " + str(item),
- required=False
- )
- return fields
-
- def get_extra_query(self, slug):
- return {
- self.type_query: slug
- }
-
- def get_alt_names(self):
- alt_names = {}
- for item in self.get_all_types().all():
- alt_names[self.form_key + "-" + item.txt_idx] = SearchAltName(
- self.search_key + "-" + item.txt_idx, self.search_query,
- self.get_extra_query(item.txt_idx), distinct_query=True
- )
- return alt_names
-
-
-class SearchVectorConfig(object):
- def __init__(self, key, language=None, func=None):
- self.key = key
- if language:
- self.language = language
- if language == "local":
- self.language = settings.ISHTAR_SEARCH_LANGUAGE
- else:
- self.language = "simple"
- self.func = func
-
- def format(self, value):
- if value == 'None':
- value = ''
- if not self.func:
- return [value]
- return self.func(value)
-
-
-class FullSearch(models.Model):
- search_vector = SearchVectorField(_("Search vector"), blank=True, null=True,
- help_text=_("Auto filled at save"))
-
- EXTRA_REQUEST_KEYS = {}
- DYNAMIC_REQUESTS = {}
- ALT_NAMES = {}
-
- BASE_SEARCH_VECTORS = []
- PROPERTY_SEARCH_VECTORS = []
- INT_SEARCH_VECTORS = []
- M2M_SEARCH_VECTORS = []
- PARENT_SEARCH_VECTORS = []
- # prevent circular dependency
- PARENT_ONLY_SEARCH_VECTORS = []
-
- class Meta:
- abstract = True
-
- @classmethod
- def general_types(cls):
- for k in get_all_field_names(cls):
- field = cls._meta.get_field(k)
- if not hasattr(field, 'rel') or not field.rel:
- continue
- rel_model = field.rel.to
- if issubclass(rel_model, (GeneralType, HierarchicalType)):
- yield k
-
- @classmethod
- def get_alt_names(cls):
- alt_names = cls.ALT_NAMES.copy()
- for dr_k in cls.DYNAMIC_REQUESTS:
- alt_names.update(cls.DYNAMIC_REQUESTS[dr_k].get_alt_names())
- return alt_names
-
- @classmethod
- def get_query_parameters(cls):
- query_parameters = {}
- for v in cls.get_alt_names().values():
- for language_code, language_lbl in settings.LANGUAGES:
- activate(language_code)
- query_parameters[str(v.search_key)] = v
- deactivate()
- return query_parameters
-
- def _update_search_field(self, search_vector_conf, search_vectors, data):
- for value in search_vector_conf.format(data):
- with connection.cursor() as cursor:
- cursor.execute("SELECT to_tsvector(%s, %s)", [
- search_vector_conf.language, value])
- row = cursor.fetchone()
- search_vectors.append(row[0])
-
- def _update_search_number_field(self, search_vectors, val):
- search_vectors.append("'{}':1".format(val))
-
- def update_search_vector(self, save=True, exclude_parent=False):
- """
- Update the search vector
- :param save: True if you want to save the object immediately
- :return: True if modified
- """
- if not hasattr(self, 'search_vector'):
- return
- if not self.pk:
- # logger.warning("Cannot update search vector before save or "
- # "after deletion.")
- return
- if not self.BASE_SEARCH_VECTORS and not self.M2M_SEARCH_VECTORS \
- and not self.INT_SEARCH_VECTORS \
- and not self.PROPERTY_SEARCH_VECTORS \
- and not self.PARENT_SEARCH_VECTORS:
- logger.warning("No search_vectors defined for {}".format(
- self.__class__))
- return
- if getattr(self, '_search_updated', None):
- return
- self._search_updated = True
-
- old_search = ""
- if self.search_vector:
- old_search = self.search_vector[:]
- search_vectors = []
- base_q = self.__class__.objects.filter(pk=self.pk)
-
- # many to many have to be queried one by one otherwise only one is fetch
- for m2m_search_vector in self.M2M_SEARCH_VECTORS:
- key = m2m_search_vector.key.split('__')[0]
- rel_key = getattr(self, key)
- for item in rel_key.values('pk').all():
- query_dct = {key + "__pk": item['pk']}
- q = copy.copy(base_q).filter(**query_dct)
- q = q.annotate(
- search=SearchVector(
- m2m_search_vector.key,
- config=m2m_search_vector.language)
- ).values('search')
- search_vectors.append(q.all()[0]['search'])
-
- # int/float are not well managed by the SearchVector
- for int_search_vector in self.INT_SEARCH_VECTORS:
- q = base_q.values(int_search_vector.key)
- for val in int_search_vector.format(
- q.all()[0][int_search_vector.key]):
- self._update_search_number_field(search_vectors, val)
-
- if not exclude_parent:
- # copy parent vector fields
- for PARENT_SEARCH_VECTOR in self.PARENT_SEARCH_VECTORS:
- parent = getattr(self, PARENT_SEARCH_VECTOR)
- if hasattr(parent, 'all'): # m2m
- for p in parent.all():
- search_vectors.append(p.search_vector)
- elif parent:
- search_vectors.append(parent.search_vector)
-
- for PARENT_ONLY_SEARCH_VECTOR in self.PARENT_ONLY_SEARCH_VECTORS:
- parent = getattr(self, PARENT_ONLY_SEARCH_VECTOR)
- if hasattr(parent, 'all'): # m2m
- for p in parent.all():
- search_vectors.append(
- p.update_search_vector(save=False, exclude_parent=True)
- )
- elif parent:
- search_vectors.append(
- parent.update_search_vector(save=False, exclude_parent=True)
- )
-
- if self.BASE_SEARCH_VECTORS:
- # query "simple" fields
- q = base_q.values(*[sv.key for sv in self.BASE_SEARCH_VECTORS])
- res = q.all()[0]
- for base_search_vector in self.BASE_SEARCH_VECTORS:
- data = res[base_search_vector.key]
- data = unidecode(str(data))
- self._update_search_field(base_search_vector,
- search_vectors, data)
-
- if self.PROPERTY_SEARCH_VECTORS:
- for property_search_vector in self.PROPERTY_SEARCH_VECTORS:
- data = getattr(self, property_search_vector.key)
- if callable(data):
- data = data()
- if not data:
- continue
- data = str(data)
- self._update_search_field(property_search_vector,
- search_vectors, data)
-
- if hasattr(self, 'data') and self.data:
- content_type = ContentType.objects.get_for_model(self)
- for json_field in JsonDataField.objects.filter(
- content_type=content_type,
- search_index=True).all():
- data = copy.deepcopy(self.data)
- no_data = False
- for key in json_field.key.split('__'):
- if key not in data:
- no_data = True
- break
- data = data[key]
- if no_data or not data:
- continue
-
- if json_field.value_type == 'B':
- if data is True:
- data = json_field.name
- else:
- continue
- elif json_field.value_type in ('I', 'F'):
- self._update_search_number_field(search_vectors, data)
- continue
- elif json_field.value_type == 'D':
- # only index year
- self._update_search_number_field(search_vectors, data.year)
- continue
- for lang in ("simple", settings.ISHTAR_SEARCH_LANGUAGE):
- with connection.cursor() as cursor:
- cursor.execute("SELECT to_tsvector(%s, %s)",
- [lang, data])
- row = cursor.fetchone()
- search_vectors.append(row[0])
- new_search_vector = merge_tsvectors(search_vectors)
- changed = old_search != new_search_vector
- self.search_vector = new_search_vector
- if save and changed:
- self.__class__.objects.filter(pk=self.pk).update(
- search_vector=new_search_vector)
- elif not save:
- return new_search_vector
- return changed
-
-
-class FixAssociated(object):
- ASSOCIATED = {}
-
- def fix_associated(self):
- for key in self.ASSOCIATED:
- item = getattr(self, key)
- if not item:
- continue
- dct = self.ASSOCIATED[key]
- for dct_key in dct:
- subkey, ctype = dct_key
- expected_values = dct[dct_key]
- if not isinstance(expected_values, (list, tuple)):
- expected_values = [expected_values]
- if hasattr(ctype, "txt_idx"):
- try:
- expected_values = [ctype.objects.get(txt_idx=v)
- for v in expected_values]
- except ctype.DoesNotExist:
- # type not yet initialized
- return
- current_vals = getattr(item, subkey)
- is_many = False
- if hasattr(current_vals, "all"):
- is_many = True
- current_vals = current_vals.all()
- else:
- current_vals = [current_vals]
- is_ok = False
- for current_val in current_vals:
- if current_val in expected_values:
- is_ok = True
- break
- if is_ok:
- continue
- # the first value is used
- new_value = expected_values[0]
- if is_many:
- getattr(item, subkey).add(new_value)
- else:
- setattr(item, subkey, new_value)
-
-
-class QRCodeItem(models.Model, ImageContainerModel):
- HAS_QR_CODE = True
- qrcode = models.ImageField(upload_to=get_image_path, blank=True, null=True,
- max_length=255)
-
- class Meta:
- abstract = True
-
- @property
- def qrcode_path(self):
- if not self.qrcode:
- self.generate_qrcode()
- if not self.qrcode: # error on qrcode generation
- return ""
- return self.qrcode.path
-
- def generate_qrcode(self, request=None, secure=True, tmpdir=None):
- url = self.get_absolute_url()
- site = Site.objects.get_current()
- if request:
- scheme = request.scheme
- else:
- if secure:
- scheme = "https"
- else:
- scheme = "http"
- url = scheme + "://" + site.domain + url
- tiny_url = TinyUrl()
- tiny_url.link = url
- tiny_url.save()
- short_url = scheme + "://" + site.domain + reverse(
- 'tiny-redirect', args=[tiny_url.get_short_id()])
- qr = pyqrcode.create(short_url, version=settings.ISHTAR_QRCODE_VERSION)
- tmpdir_created = False
- if not tmpdir:
- tmpdir = tempfile.mkdtemp("-qrcode")
- tmpdir_created = True
- filename = tmpdir + os.sep + 'qrcode.png'
- qr.png(filename, scale=settings.ISHTAR_QRCODE_SCALE)
- with open(filename, 'rb') as qrfile:
- self.qrcode.save("qrcode.png", File(qrfile))
- self.skip_history_when_saving = True
- self._no_move = True
- self.save()
- if tmpdir_created:
- shutil.rmtree(tmpdir)
-
-
-class DocumentItem(object):
- ALT_NAMES = {
- 'documents__image__isnull':
- SearchAltName(
- pgettext_lazy("key for text search", "has-image"),
- 'documents__image__isnull'),
- 'documents__associated_url__isnull':
- SearchAltName(
- pgettext_lazy("key for text search", "has-url"),
- 'documents__associated_url__isnull'),
- 'documents__associated_file__isnull':
- SearchAltName(
- pgettext_lazy("key for text search", "has-attached-file"),
- 'documents__associated_file__isnull'),
- }
-
- def public_representation(self):
- images = []
- if getattr(self, "main_image", None):
- images.append(self.main_image.public_representation())
- images += [
- image.public_representation()
- for image in self.images_without_main_image.all()
- ]
- return {"images": images}
-
- @property
- def images(self):
- if not hasattr(self, 'documents'):
- return Document.objects.none()
- return self.documents.filter(
- image__isnull=False).exclude(image="").order_by("pk")
-
- @property
- def images_without_main_image(self):
- if not hasattr(self, 'main_image') or not hasattr(self, 'documents'):
- return self.images
- if not self.main_image:
- return self.documents.filter(
- image__isnull=False).exclude(
- image="").order_by("pk")
- return self.documents.filter(
- image__isnull=False).exclude(
- image="").exclude(pk=self.main_image.pk).order_by("pk")
-
- def get_extra_actions(self, request):
- """
- For sheet template: return "Add document / image" action
- """
- # url, base_text, icon, extra_text, extra css class, is a quick action
- try:
- actions = super(DocumentItem, self).get_extra_actions(request)
- except AttributeError:
- actions = []
-
- if not hasattr(self, 'SLUG'):
- return actions
-
- can_add_doc = self.can_do(request, 'add_document')
- if can_add_doc and (
- not hasattr(self, "is_locked") or
- not self.is_locked(request.user)):
- actions += [
- (
- reverse("create-document") + "?{}={}".format(
- self.SLUG, self.pk),
- _("Add document/image"),
- "fa fa-plus",
- _("doc./image"),
- "",
- False
- )
- ]
- return actions
-
-
-class SpatialReferenceSystem(GeneralType):
- order = models.IntegerField(_("Order"), default=10)
- auth_name = models.CharField(
- _("Authority name"), default=u'EPSG', max_length=256)
- srid = models.IntegerField(_("Authority SRID"))
-
- class Meta:
- verbose_name = _("Spatial reference system")
- verbose_name_plural = _("Spatial reference systems")
- ordering = ('label',)
-
-
-post_save.connect(post_save_cache, sender=SpatialReferenceSystem)
-post_delete.connect(post_save_cache, sender=SpatialReferenceSystem)
-
-
-class GeoItem(models.Model):
- GEO_SOURCE = (
- ('T', _("Town")), ('P', _("Precise")), ('M', _("Polygon"))
- )
-
- # gis
- x = models.FloatField(_(u'X'), blank=True, null=True)
- y = models.FloatField(_(u'Y'), blank=True, null=True)
- z = models.FloatField(_(u'Z'), blank=True, null=True)
- estimated_error_x = models.FloatField(_(u'Estimated error for X'),
- blank=True, null=True)
- estimated_error_y = models.FloatField(_(u'Estimated error for Y'),
- blank=True, null=True)
- estimated_error_z = models.FloatField(_(u'Estimated error for Z'),
- blank=True, null=True)
- spatial_reference_system = models.ForeignKey(
- SpatialReferenceSystem, verbose_name=_("Spatial Reference System"),
- blank=True, null=True)
- point = models.PointField(_("Point"), blank=True, null=True, dim=3)
- point_2d = models.PointField(_("Point (2D)"), blank=True, null=True)
- point_source = models.CharField(
- _("Point source"), choices=GEO_SOURCE, max_length=1, blank=True,
- null=True)
- point_source_item = models.CharField(
- _("Point source item"), max_length=100, blank=True, null=True)
- multi_polygon = models.MultiPolygonField(_("Multi polygon"), blank=True,
- null=True)
- multi_polygon_source = models.CharField(
- _("Multi-polygon source"), choices=GEO_SOURCE, max_length=1,
- blank=True, null=True)
- multi_polygon_source_item = models.CharField(
- _("Multi polygon source item"), max_length=100, blank=True, null=True)
-
- GEO_LABEL = ""
-
- class Meta:
- abstract = True
-
- def get_town_centroid(self):
- raise NotImplementedError
-
- def get_town_polygons(self):
- raise NotImplementedError
-
- @property
- def display_coordinates(self):
- if not self.point_2d:
- return ""
- profile = get_current_profile()
- if not profile.display_srs or not profile.display_srs.srid:
- return self.x, self.y
- point = self.point_2d.transform(profile.display_srs.srid, clone=True)
- return round(point.x, 5), round(point.y, 5)
-
- @property
- def display_spatial_reference_system(self):
- profile = get_current_profile()
- if not profile.display_srs or not profile.display_srs.srid:
- return self.spatial_reference_system
- return profile.display_srs
-
- def get_precise_points(self):
- if self.point_source == 'P' and self.point_2d:
- return self.point_2d, self.point, self.point_source_item
-
- def get_precise_polygons(self):
- if self.multi_polygon_source == 'P' and self.multi_polygon:
- return self.multi_polygon, self.multi_polygon_source_item
-
- def most_precise_geo(self):
- if self.point_source == 'M':
- return 'multi_polygon'
- current_source = str(self.__class__._meta.verbose_name)
- if self.multi_polygon_source_item == current_source \
- and (self.multi_polygon_source == "P" or
- self.point_source_item != current_source):
- return 'multi_polygon'
- if self.point_source_item == current_source\
- and self.point_source == 'P':
- return 'point'
- if self.multi_polygon_source == 'P':
- return 'multi_polygon'
- if self.point_source == 'P':
- return 'point'
- if self.multi_polygon:
- return 'multi_polygon'
- if self.point_2d:
- return 'point'
-
- def geo_point_source(self):
- if not self.point_source:
- return ""
- src = "{} - {}".format(
- dict(self.GEO_SOURCE)[self.point_source],
- self.point_source_item
- )
- return src
-
- def geo_polygon_source(self):
- if not self.multi_polygon_source:
- return ""
- src = "{} - {}".format(
- dict(self.GEO_SOURCE)[self.multi_polygon_source],
- self.multi_polygon_source_item
- )
- return src
-
- def _geojson_serialize(self, geom_attr):
- if not hasattr(self, geom_attr):
- return ""
- cached_label_key = 'cached_label'
- if self.GEO_LABEL:
- cached_label_key = self.GEO_LABEL
- if getattr(self, "CACHED_LABELS", None):
- cached_label_key = self.CACHED_LABELS[-1]
- geojson = serialize(
- 'geojson',
- self.__class__.objects.filter(pk=self.pk),
- geometry_field=geom_attr, fields=(cached_label_key,))
- geojson_dct = json.loads(geojson)
- profile = get_current_profile()
- precision = profile.point_precision
-
- features = geojson_dct.pop('features')
- for idx in range(len(features)):
- feature = features[idx]
- lbl = feature['properties'].pop(cached_label_key)
- feature['properties']['name'] = lbl
- feature['properties']['id'] = self.pk
- if precision is not None:
- geom_type = feature["geometry"].get("type", None)
- if geom_type == "Point":
- feature["geometry"]["coordinates"] = [
- round(coord, precision)
- for coord in feature["geometry"]["coordinates"]
- ]
- geojson_dct['features'] = features
- geojson_dct['link_template'] = simple_link_to_window(self).replace(
- '999999', '<pk>'
- )
- geojson = json.dumps(geojson_dct)
- return geojson
-
- @property
- def point_2d_geojson(self):
- return self._geojson_serialize('point_2d')
-
- @property
- def multi_polygon_geojson(self):
- return self._geojson_serialize('multi_polygon')
-
-
-class TemplateItem:
- @classmethod
- def _label_templates_q(cls):
- model_name = "{}.{}".format(
- cls.__module__, cls.__name__)
- q = Q(associated_model__klass=model_name,
- for_labels=True, available=True)
- alt_model_name = model_name.replace(
- "models_finds", "models").replace(
- "models_treatments", "models")
- if alt_model_name != model_name:
- q |= Q(associated_model__klass=model_name,
- for_labels=True, available=True)
- return DocumentTemplate.objects.filter(q)
-
- @classmethod
- def has_label_templates(cls):
- return cls._label_templates_q().count()
-
- @classmethod
- def label_templates(cls):
- return cls._label_templates_q()
-
- def get_extra_templates(self, request):
- cls = self.__class__
- templates = []
- name = str(cls.__name__)
- module = str(cls.__module__)
- if "archaeological_finds" in module:
- if "models_finds" in name or "models_treatments" in name:
- names = [
- name,
- name.replace("models_finds", "models"
- ).replace("models_treatments", "models")
- ]
- else:
- names = [name, name.replace("models", "models_finds"),
- name.replace("models", "models_treatments")]
- else:
- names = [name]
- model_names = [
- "{}.{}".format(module, name) for name in names
- ]
- q = DocumentTemplate.objects.filter(
- associated_model__klass__in=model_names,
- for_labels=False, available=True)
- for template in q.all():
- urlname = "generate-document"
- templates.append(
- (template.name, reverse(
- urlname, args=[template.slug, self.pk]))
- )
- return templates
-
-
-class StatisticItem:
- STATISTIC_MODALITIES = [] # example: "year", "operation_type__label"
- STATISTIC_MODALITIES_OPTIONS = OrderedDict() # example:
- # OrderedDict([('year', _("Year")),
- # ("operation_type__label", _("Operation type"))])
- STATISTIC_SUM_VARIABLE = OrderedDict(
- (("pk", (_("Number"), 1)),)
- ) # example: "Price", "Volume" - the number is a multiplier
-
-
-class CascasdeUpdate:
- DOWN_MODEL_UPDATE = []
-
- def cascade_update(self):
- for down_model in self.DOWN_MODEL_UPDATE:
- if not settings.USE_BACKGROUND_TASK:
- rel = getattr(self, down_model)
- if hasattr(rel.model, "need_update"):
- rel.update(need_update=True)
- continue
- for item in getattr(self, down_model).all():
- cached_label_changed(item.__class__, instance=item)
- if hasattr(item, "point_2d"):
- post_save_geo(item.__class__, instance=item)
-
-
-def duplicate_item(item, user=None, data=None):
- model = item.__class__
- new = model.objects.get(pk=item.pk)
-
- for field in model._meta.fields:
- # pk is in PRIVATE_FIELDS so: new.pk = None and a new
- # item will be created on save
- if field.name == "uuid":
- new.uuid = uuid.uuid4()
- elif field.name in PRIVATE_FIELDS:
- setattr(new, field.name, None)
- if user:
- new.history_user = user
- if data:
- for k in data:
- setattr(new, k, data[k])
- new.save()
-
- # m2m fields
- m2m = [field.name for field in model._meta.many_to_many
- if field.name not in PRIVATE_FIELDS]
- for field in m2m:
- for val in getattr(item, field).all():
- if val not in getattr(new, field).all():
- getattr(new, field).add(val)
- return new
-
-
-class BaseHistorizedItem(StatisticItem, TemplateItem, FullSearch, Imported,
- JsonData, FixAssociated, CascasdeUpdate):
- """
- Historized item with external ID management.
- All historized items are searchable and have a data json field.
- Historized items can be "locked" for edition.
- """
- IS_BASKET = False
- SHOW_URL = None
- EXTERNAL_ID_KEY = ''
- EXTERNAL_ID_DEPENDENCIES = []
- HISTORICAL_M2M = []
-
- history_modifier = models.ForeignKey(
- User, related_name='+', on_delete=models.SET_NULL,
- verbose_name=_("Last editor"), blank=True, null=True)
- history_creator = models.ForeignKey(
- User, related_name='+', on_delete=models.SET_NULL,
- verbose_name=_("Creator"), blank=True, null=True)
- last_modified = models.DateTimeField(auto_now=True)
- history_m2m = JSONField(default={}, blank=True)
- need_update = models.BooleanField(
- verbose_name=_("Need update"), default=False)
- locked = models.BooleanField(
- verbose_name=_("Item locked for edition"), default=False)
- lock_user = models.ForeignKey(
- User, related_name='+', on_delete=models.SET_NULL,
- verbose_name=_("Locked by"), blank=True, null=True)
-
- ALT_NAMES = {
- 'history_creator': SearchAltName(
- pgettext_lazy("key for text search", u"created-by"),
- 'history_creator__ishtaruser__person__cached_label__iexact'
- ),
- 'history_modifier': SearchAltName(
- pgettext_lazy("key for text search", u"modified-by"),
- 'history_modifier__ishtaruser__person__cached_label__iexact'
- ),
- 'modified_before': SearchAltName(
- pgettext_lazy("key for text search", "modified-before"),
- 'last_modified__lte'
- ),
- 'modified_after': SearchAltName(
- pgettext_lazy("key for text search", "modified-after"),
- 'last_modified__gte'
- ),
- }
-
- class Meta:
- abstract = True
-
- @classmethod
- def get_verbose_name(cls):
- return cls._meta.verbose_name
-
- def is_locked(self, user=None):
- if not user:
- return self.locked
- return self.locked and (not self.lock_user or self.lock_user != user)
-
- def merge(self, item, keep_old=False):
- merge_model_objects(self, item, keep_old=keep_old)
-
- def public_representation(self):
- return {}
-
- def duplicate(self, user=None, data=None):
- return duplicate_item(self, user, data)
-
- def update_external_id(self, save=False):
- if not self.EXTERNAL_ID_KEY or (
- self.external_id and
- not getattr(self, 'auto_external_id', False)):
- return
- external_id = get_external_id(self.EXTERNAL_ID_KEY, self)
- if external_id == self.external_id:
- return
- self.auto_external_id = True
- self.external_id = external_id
- self._cached_label_checked = False
- if save:
- self.skip_history_when_saving = True
- self.save()
- return external_id
-
- def get_last_history_date(self):
- q = self.history.values("history_date").order_by('-history_date')
- if not q.count():
- return
- return q.all()[0]['history_date']
-
- def get_previous(self, step=None, date=None, strict=False):
- """
- Get a "step" previous state of the item
- """
- assert step or date
- historized = self.history.all()
- item = None
- if step:
- if len(historized) <= step:
- # silently return the last step if too far in the history
- item = historized[len(historized) - 1]
- else:
- item = historized[step]
- else:
- for step, item in enumerate(historized):
- if item.history_date == date:
- break
- # ended with no match
- if item.history_date != date:
- return
- item._step = step
- if len(historized) != (step + 1):
- item._previous = historized[step + 1].history_date
- else:
- item._previous = None
- if step > 0:
- item._next = historized[step - 1].history_date
- else:
- item._next = None
- item.history_date = historized[step].history_date
- model = self.__class__
- for k in get_all_field_names(model):
- field = model._meta.get_field(k)
- if hasattr(field, 'rel') and field.rel:
- if not hasattr(item, k + '_id'):
- setattr(item, k, getattr(self, k))
- continue
- val = getattr(item, k + '_id')
- if not val:
- setattr(item, k, None)
- continue
- try:
- val = field.rel.to.objects.get(pk=val)
- setattr(item, k, val)
- except ObjectDoesNotExist:
- if strict:
- raise HistoryError("The class %s has no pk %d" % (
- str(field.rel.to), val))
- setattr(item, k, None)
- item.pk = self.pk
- return item
-
- @property
- def last_edition_date(self):
- try:
- return self.history.order_by('-history_date').all()[0].history_date
- except (AttributeError, IndexError):
- return
-
- @property
- def history_creation_date(self):
- try:
- return self.history.order_by('history_date').all()[0].history_date
- except (AttributeError, IndexError):
- return
-
- def rollback(self, date):
- """
- Rollback to a previous state
- """
- to_del, new_item = [], None
- for item in self.history.all():
- if item.history_date == date:
- new_item = item
- break
- to_del.append(item)
- if not new_item:
- raise HistoryError("The date to rollback to doesn't exist.")
- try:
- field_keys = [f.name for f in self._meta.fields]
- for k in field_keys:
- if k != 'id' and hasattr(self, k):
- if not hasattr(new_item, k):
- k = k + "_id"
- setattr(self, k, getattr(new_item, k))
-
- try:
- self.history_modifier = User.objects.get(
- pk=new_item.history_modifier_id)
- except User.ObjectDoesNotExist:
- pass
- self.save()
- saved_m2m = new_item.history_m2m.copy()
- for hist_key in self.HISTORICAL_M2M:
- # after each association m2m is rewrite - force the original
- # to be reset
- new_item.history_m2m = saved_m2m
- values = new_item.m2m_listing(hist_key, create=True) or []
- hist_field = getattr(self, hist_key)
- hist_field.clear()
- for val in values:
- hist_field.add(val)
- # force label regeneration
- self._cached_label_checked = False
- self.save()
- except ObjectDoesNotExist:
- raise HistoryError("The rollback has failed.")
- # clean the obsolete history
- for historized_item in to_del:
- historized_item.delete()
-
- def m2m_listing(self, key):
- return getattr(self, key).all()
-
- def values(self):
- values = {}
- for f in self._meta.fields:
- k = f.name
- if k != 'id':
- values[k] = getattr(self, k)
- return values
-
- def get_absolute_url(self):
- try:
- return reverse('display-item', args=[self.SLUG, self.pk])
- except NoReverseMatch:
- return
-
- def get_show_url(self):
- show_url = self.SHOW_URL
- if not show_url:
- show_url = 'show-' + self.__class__.__name__.lower()
- try:
- return reverse(show_url, args=[self.pk, ''])
- except NoReverseMatch:
- return
-
- @property
- def associated_filename(self):
- if [True for attr in ('get_town_label', 'get_department', 'reference',
- 'short_class_name') if not hasattr(self, attr)]:
- return ''
- items = [slugify(self.get_department()),
- slugify(self.get_town_label()).upper(),
- slugify(self.short_class_name),
- slugify(self.reference),
- slugify(self.name or '').replace('-', '_').capitalize()]
- last_edition_date = self.last_edition_date
- if last_edition_date:
- items.append(last_edition_date.strftime('%Y%m%d'))
- else:
- items.append('00000000')
- return "-".join([str(item) for item in items])
-
- def save(self, *args, **kwargs):
- created = not self.pk
- if not getattr(self, 'skip_history_when_saving', False):
- assert hasattr(self, 'history_modifier')
- if created:
- self.history_creator = self.history_modifier
- # external ID can have related item not available before save
- external_id_updated = kwargs.pop('external_id_updated') \
- if 'external_id_updated' in kwargs else False
- if not created and not external_id_updated:
- self.update_external_id()
- super(BaseHistorizedItem, self).save(*args, **kwargs)
- if created and self.update_external_id():
- # force resave for external ID creation
- self.skip_history_when_saving = True
- self._updated_id = True
- return self.save(external_id_updated=True)
- for dep in self.EXTERNAL_ID_DEPENDENCIES:
- for obj in getattr(self, dep).all():
- obj.update_external_id(save=True)
- self.fix_associated()
- return True
-
-
LOGICAL_TYPES = (
('above', _("Above")),
('below', _("Below")),
@@ -2623,207 +689,10 @@ class SearchQuery(models.Model):
return str(self.label)
-class ShortMenuItem(object):
- """
- Item available in the short menu
- """
- UP_MODEL_QUERY = {}
-
- @classmethod
- def get_short_menu_class(cls, pk):
- return ''
-
- @property
- def short_class_name(self):
- return ""
-
-
-class QuickAction(object):
- """
- Quick action available from tables
- """
-
- def __init__(self, url, icon_class='', text='', target=None, rights=None,
- module=None):
- self.url = url
- self.icon_class = icon_class
- self.text = text
- self.rights = rights
- self.target = target
- self.module = module
- assert self.target in ('one', 'many', None)
-
- def is_available(self, user, session=None, obj=None):
- if self.module and not getattr(get_current_profile(), self.module):
- return False
- if not self.rights: # no restriction
- return True
- if not user or not hasattr(user, 'ishtaruser') or not user.ishtaruser:
- return False
- user = user.ishtaruser
-
- for right in self.rights:
- if user.has_perm(right, session=session, obj=obj):
- return True
- return False
-
- @property
- def rendered_icon(self):
- if not self.icon_class:
- return ""
- return "<i class='{}' aria-hidden='true'></i>".format(self.icon_class)
-
- @property
- def base_url(self):
- if self.target is None:
- url = reverse(self.url)
- else:
- # put arbitrary pk for the target
- url = reverse(self.url, args=[0])
- url = url[:-2] # all quick action url have to finish with the
- # pk of the selected item and a "/"
- return url
-
-
-class MainItem(ShortMenuItem):
- """
- Item with quick actions available from tables
- Extra actions are available from sheets
- """
- QUICK_ACTIONS = []
-
- @classmethod
- def get_quick_actions(cls, user, session=None, obj=None):
- """
- Get a list of (url, title, icon, target) actions for an user
- """
- qas = []
- for action in cls.QUICK_ACTIONS:
- if not action.is_available(user, session=session, obj=obj):
- continue
- qas.append([action.base_url,
- mark_safe(action.text),
- mark_safe(action.rendered_icon),
- action.target or ""])
- return qas
-
- @classmethod
- def get_quick_action_by_url(cls, url):
- for action in cls.QUICK_ACTIONS:
- if action.url == url:
- return action
-
- def regenerate_external_id(self):
- if not hasattr(self, "external_id"):
- return
- self.skip_history_when_saving = True
- self._no_move = True
- if hasattr(self, "auto_external_id"):
- self.external_id = None
- self.save()
-
- def get_extra_actions(self, request):
- if not hasattr(self, 'SLUG'):
- return []
-
- actions = []
- if request.user.is_superuser and hasattr(self, "auto_external_id"):
- actions += [
- (
- reverse("regenerate-external-id") + "?{}={}".format(
- self.SLUG, self.pk),
- _("Regenerate ID"),
- "fa fa-key",
- _("regen."),
- "",
- True
- )
- ]
-
- return actions
-
-
-class LightHistorizedItem(BaseHistorizedItem):
- history_date = models.DateTimeField(default=datetime.datetime.now)
-
- class Meta:
- abstract = True
-
- def save(self, *args, **kwargs):
- super(LightHistorizedItem, self).save(*args, **kwargs)
- return self
-
-
-PARSE_FORMULA = re.compile("{([^}]*)}")
-
-
-def _deduplicate(value):
- new_values = []
- for v in value.split(u'-'):
- if v not in new_values:
- new_values.append(v)
- return u'-'.join(new_values)
-
-
-FORMULA_FILTERS = {
- 'upper': lambda x: x.upper(),
- 'lower': lambda x: x.lower(),
- 'capitalize': lambda x: x.capitalize(),
- 'slug': lambda x: slugify(x),
- 'deduplicate': _deduplicate
-}
-
-
-def get_external_id(key, item):
- profile = get_current_profile()
- if not hasattr(profile, key):
- return
- formula = getattr(profile, key)
- dct = {}
- for fkey in PARSE_FORMULA.findall(formula):
- filtered = fkey.split('|')
- initial_key = fkey[:]
- fkey = filtered[0]
- filters = []
- for filtr in filtered[1:]:
- if filtr in FORMULA_FILTERS:
- filters.append(FORMULA_FILTERS[filtr])
- if fkey.startswith('settings__'):
- dct[fkey] = getattr(settings, fkey[len('settings__'):]) or ''
- continue
- obj = item
- for k in fkey.split('__'):
- try:
- obj = getattr(obj, k)
- except ObjectDoesNotExist:
- obj = None
- if hasattr(obj, 'all') and hasattr(obj, 'count'): # query manager
- if not obj.count():
- break
- obj = obj.all()[0]
- elif callable(obj):
- obj = obj()
- if obj is None:
- break
- if obj is None:
- dct[initial_key] = ''
- else:
- dct[initial_key] = str(obj)
- for filtr in filters:
- dct[initial_key] = filtr(dct[initial_key])
- values = formula.format(**dct).split('||')
- value = values[0]
- for filtr in values[1:]:
- if filtr not in FORMULA_FILTERS:
- value += '||' + filtr
- continue
- value = FORMULA_FILTERS[filtr](value)
- return value
-
-
class Language(GeneralType):
iso_code = models.CharField(_("ISO code"), null=True, blank=True,
max_length=2)
+
class Meta:
verbose_name = _("Language")
verbose_name_plural = _("Languages")
@@ -3109,10 +978,6 @@ class IshtarSiteProfile(models.Model, Cached):
return obj
-def get_current_profile(force=False):
- return IshtarSiteProfile.get_current_profile(force=force)
-
-
def _profile_mapping():
return get_current_profile().mapping
@@ -3357,141 +1222,6 @@ class StatsCache(models.Model):
verbose_name_plural = _("Caches for stats")
-def update_stats(statscache, item, funcname):
- if not settings.USE_BACKGROUND_TASK:
- current_values = statscache.values
- if not current_values:
- current_values = {}
- value = getattr(item, funcname)()
- current_values[funcname] = value
- statscache.values = current_values
- statscache.updated = datetime.datetime.now()
- statscache.save()
- return current_values
-
- now = datetime.datetime.now()
- app_name = item._meta.app_label
- model_name = item._meta.model_name
- statscache.update_requested = now.isoformat()
- statscache.save()
- _update_stats.delay(app_name, model_name, item.pk, funcname)
- return statscache.values
-
-
-def __get_stats_cache_values(model_name, model_pk):
- q = StatsCache.objects.filter(
- model=model_name, model_pk=model_pk
- )
- nb = q.count()
- if nb >= 1:
- sc = q.all()[0]
- for extra in q.order_by("-id").all()[1:]:
- extra.delete()
- else:
- sc = StatsCache.objects.create(
- model=model_name, model_pk=model_pk
- )
- values = sc.values
- if not values:
- values = {}
- return sc, values
-
-
-@task()
-def _update_stats(app, model, model_pk, funcname):
- model_name = app + "." + model
- model = apps.get_model(app, model)
- try:
- item = model.objects.get(pk=model_pk)
- except model.DoesNotExist:
- return
- value = getattr(item, funcname)()
- sc, current_values = __get_stats_cache_values(model_name, model_pk)
- current_values[funcname] = value
- sc.values = current_values
- sc.update_requested = None
- sc.updated = datetime.datetime.now()
- sc.save()
-
-
-class DashboardFormItem(object):
- """
- Provide methods to manage statistics
- """
-
- def last_stats_update(self):
- model_name = self._meta.app_label + "." + self._meta.model_name
- q = StatsCache.objects.filter(
- model=model_name, model_pk=self.pk).order_by("-updated")
- if not q.count():
- return
- return q.all()[0].updated
-
- def _get_or_set_stats(self, funcname, update=False,
- expected_type=None):
- model_name = self._meta.app_label + "." + self._meta.model_name
- sc, __ = StatsCache.objects.get_or_create(
- model=model_name, model_pk=self.pk
- )
- if not update:
- values = sc.values
- if funcname not in values:
- if expected_type is not None:
- return expected_type()
- return 0
- else:
- values = update_stats(sc, self, funcname)
- if funcname in values:
- values = values[funcname]
- else:
- values = 0
- if expected_type is not None and not isinstance(values, expected_type):
- return expected_type()
- return values
-
- @classmethod
- def get_periods(cls, slice='month', fltr={}, date_source='creation'):
- date_var = date_source + '_date'
- q = cls.objects.filter(**{date_var + '__isnull': False})
- if fltr:
- q = q.filter(**fltr)
- if slice == 'year':
- return [res[date_var].year for res in list(q.values(date_var)
- .annotate(
- Count("id")).order_by())]
- elif slice == 'month':
- return [(res[date_var].year, res[date_var].month)
- for res in list(q.values(date_var)
- .annotate(Count("id")).order_by())]
- return []
-
- @classmethod
- def get_by_year(cls, year, fltr={}, date_source='creation'):
- date_var = date_source + '_date'
- q = cls.objects.filter(**{date_var + '__isnull': False})
- if fltr:
- q = q.filter(**fltr)
- return q.filter(
- **{date_var + '__year': year}).order_by('pk').distinct('pk')
-
- @classmethod
- def get_by_month(cls, year, month, fltr={}, date_source='creation'):
- date_var = date_source + '_date'
- q = cls.objects.filter(**{date_var + '__isnull': False})
- if fltr:
- q = q.filter(**fltr)
- q = q.filter(
- **{date_var + '__year': year, date_var + '__month': month})
- return q.order_by('pk').distinct('pk')
-
- @classmethod
- def get_total_number(cls, fltr=None):
- q = cls.objects
- if fltr:
- q = q.filter(**fltr)
- return q.order_by('pk').distinct('pk').count()
-
-
class Dashboard(object):
def __init__(self, model, slice='year', date_source=None, show_detail=None,
fltr=None):
@@ -3780,254 +1510,6 @@ class DocumentTemplate(models.Model):
return output_name
-class NumberManager(models.Manager):
- def get_by_natural_key(self, number):
- return self.get(number=number)
-
-
-class State(models.Model):
- label = models.CharField(_("Label"), max_length=30)
- number = models.CharField(_("Number"), unique=True, max_length=3)
- objects = NumberManager()
-
- class Meta:
- verbose_name = _("State")
- ordering = ['number']
-
- def __str__(self):
- return self.label
-
- def natural_key(self):
- return (self.number,)
-
-
-class Department(models.Model):
- label = models.CharField(_("Label"), max_length=30)
- number = models.CharField(_("Number"), unique=True, max_length=3)
- state = models.ForeignKey(
- 'State', verbose_name=_("State"), blank=True, null=True,
- on_delete=models.SET_NULL,
- )
- objects = NumberManager()
-
- class Meta:
- verbose_name = _("Department")
- verbose_name_plural = _("Departments")
- ordering = ['number']
-
- def __str__(self):
- return self.label
-
- def natural_key(self):
- return (self.number,)
-
- def history_compress(self):
- return self.number
-
- @classmethod
- def history_decompress(cls, full_value, create=False):
- if not full_value:
- return []
- res = []
- for value in full_value:
- try:
- res.append(cls.objects.get(number=value))
- except cls.DoesNotExist:
- continue
- return res
-
-
-class Arrondissement(models.Model):
- name = models.CharField("Nom", max_length=30)
- department = models.ForeignKey(Department, verbose_name="Département")
-
- def __str__(self):
- return settings.JOINT.join((self.name, str(self.department)))
-
-
-class Canton(models.Model):
- name = models.CharField("Nom", max_length=30)
- arrondissement = models.ForeignKey(Arrondissement,
- verbose_name="Arrondissement")
-
- def __str__(self):
- return settings.JOINT.join(
- (self.name, str(self.arrondissement)))
-
-
-class TownManager(models.GeoManager):
- def get_by_natural_key(self, numero_insee, year):
- return self.get(numero_insee=numero_insee, year=year)
-
-
-class Town(Imported, models.Model):
- name = models.CharField(_("Name"), max_length=100)
- surface = models.IntegerField(_("Surface (m2)"), blank=True, null=True)
- center = models.PointField(_("Localisation"), srid=settings.SRID,
- blank=True, null=True)
- limit = models.MultiPolygonField(_("Limit"), blank=True, null=True)
- numero_insee = models.CharField("Code commune (numéro INSEE)",
- max_length=120)
- departement = models.ForeignKey(
- Department, verbose_name=_("Department"),
- on_delete=models.SET_NULL, null=True, blank=True)
- year = models.IntegerField(
- _("Year of creation"), null=True, blank=True,
- help_text=_("Filling this field is relevant to distinguish old towns "
- "from new towns."))
- children = models.ManyToManyField(
- 'Town', verbose_name=_("Town children"), blank=True,
- related_name='parents')
- cached_label = models.CharField(_("Cached name"), max_length=500,
- null=True, blank=True, db_index=True)
- objects = TownManager()
-
- class Meta:
- verbose_name = _("Town")
- verbose_name_plural = _("Towns")
- if settings.COUNTRY == 'fr':
- ordering = ['numero_insee']
- unique_together = (('numero_insee', 'year'),)
-
- def natural_key(self):
- return (self.numero_insee, self.year)
-
- def history_compress(self):
- values = {'numero_insee': self.numero_insee,
- 'year': self.year or ""}
- return values
-
- def get_values(self, prefix='', no_values=False, filtr=None, **kwargs):
- values = {}
- if not filtr or prefix in filtr or "label" in filtr:
- if prefix:
- values[prefix] = str(self)
- else:
- values['label'] = str(self)
- if not filtr or prefix + "name" in filtr:
- values[prefix + "name"] = self.name
- if not filtr or prefix + "numero_insee" in filtr:
- values[prefix + "numero_insee"] = self.numero_insee
- return values
-
- @classmethod
- def history_decompress(cls, full_value, create=False):
- if not full_value:
- return []
- res = []
- for value in full_value:
- try:
- res.append(
- cls.objects.get(numero_insee=value['numero_insee'],
- year=value['year'] or None))
- except cls.DoesNotExist:
- continue
- return res
-
- def __str__(self):
- return self.cached_label or ""
-
- @property
- def label_with_areas(self):
- label = [self.name]
- if self.numero_insee:
- label.append("({})".format(self.numero_insee))
- for area in self.areas.all():
- label.append(" - ")
- label.append(area.full_label)
- return " ".join(label)
-
- def generate_geo(self, force=False):
- force = self.generate_limit(force=force)
- self.generate_center(force=force)
- self.generate_area(force=force)
-
- def generate_limit(self, force=False):
- if not force and self.limit:
- return
- parents = None
- if not self.parents.count():
- return
- for parent in self.parents.all():
- if not parent.limit:
- return
- if not parents:
- parents = parent.limit
- else:
- parents = parents.union(parent.limit)
- # if union is a simple polygon make it a multi
- if 'MULTI' not in parents.wkt:
- parents = parents.wkt.replace('POLYGON', 'MULTIPOLYGON(') + ")"
- if not parents:
- return
- self.limit = parents
- self.save()
- return True
-
- def generate_center(self, force=False):
- if not force and (self.center or not self.limit):
- return
- self.center = self.limit.centroid
- if not self.center:
- return False
- self.save()
- return True
-
- def generate_area(self, force=False):
- if not force and (self.surface or not self.limit):
- return
- surface = self.limit.transform(settings.SURFACE_SRID,
- clone=True).area
- if surface > 214748364 or not surface:
- return False
- self.surface = surface
- self.save()
- return True
-
- def update_town_code(self):
- if not self.numero_insee or not self.children.count() or not self.year:
- return
- old_num = self.numero_insee[:]
- numero = old_num.split('-')[0]
- self.numero_insee = "{}-{}".format(numero, self.year)
- if self.numero_insee != old_num:
- return True
-
- def _generate_cached_label(self):
- cached_label = self.name
- if settings.COUNTRY == "fr" and self.numero_insee:
- dpt_len = 2
- if self.numero_insee.startswith('97') or \
- self.numero_insee.startswith('98') or \
- self.numero_insee[0] not in ('0', '1', '2', '3', '4', '5',
- '6', '7', '8', '9'):
- dpt_len = 3
- cached_label = "%s - %s" % (self.name, self.numero_insee[:dpt_len])
- if self.year and self.children.count():
- cached_label += " ({})".format(self.year)
- return cached_label
-
-
-def post_save_town(sender, **kwargs):
- cached_label_changed(sender, **kwargs)
- town = kwargs['instance']
- town.generate_geo()
- if town.update_town_code():
- town.save()
-
-
-post_save.connect(post_save_town, sender=Town)
-
-
-def town_child_changed(sender, **kwargs):
- town = kwargs['instance']
- if town.update_town_code():
- town.save()
-
-
-m2m_changed.connect(town_child_changed, sender=Town.children.through)
-
-
class Area(HierarchicalType):
towns = models.ManyToManyField(Town, verbose_name=_("Towns"), blank=True,
related_name='areas')
@@ -4057,261 +1539,6 @@ class Area(HierarchicalType):
return " / ".join(label)
-class Address(BaseHistorizedItem):
- FIELDS = (
- "address", "address_complement", "postal_code", "town",
- "precise_town", "country",
- "alt_address", "alt_address_complement", "alt_postal_code", "alt_town",
- "alt_country",
- "phone", "phone_desc", "phone2", "phone_desc2", "phone3", "phone_desc3",
- "raw_phone", "mobile_phone", "email", "alt_address_is_prefered"
- )
- address = models.TextField(_("Address"), null=True, blank=True)
- address_complement = models.TextField(_("Address complement"), null=True,
- blank=True)
- postal_code = models.CharField(_("Postal code"), max_length=10, null=True,
- blank=True)
- town = models.CharField(_("Town (freeform)"), max_length=150, null=True,
- blank=True)
- precise_town = models.ForeignKey(
- Town, verbose_name=_("Town (precise)"), null=True, blank=True)
- country = models.CharField(_("Country"), max_length=30, null=True,
- blank=True)
- alt_address = models.TextField(_("Other address: address"), null=True,
- blank=True)
- alt_address_complement = models.TextField(
- _("Other address: address complement"), null=True, blank=True)
- alt_postal_code = models.CharField(_("Other address: postal code"),
- max_length=10, null=True, blank=True)
- alt_town = models.CharField(_("Other address: town"), max_length=70,
- null=True, blank=True)
- alt_country = models.CharField(_("Other address: country"),
- max_length=30, null=True, blank=True)
- phone = models.CharField(_("Phone"), max_length=18, null=True, blank=True)
- phone_desc = models.CharField(_("Phone description"), max_length=300,
- null=True, blank=True)
- phone2 = models.CharField(_("Phone description 2"), max_length=18,
- null=True, blank=True)
- phone_desc2 = models.CharField(_("Phone description 2"), max_length=300,
- null=True, blank=True)
- phone3 = models.CharField(_("Phone 3"), max_length=18, null=True,
- blank=True)
- phone_desc3 = models.CharField(_("Phone description 3"), max_length=300,
- null=True, blank=True)
- raw_phone = models.TextField(_("Raw phone"), blank=True, null=True)
- mobile_phone = models.CharField(_("Mobile phone"), max_length=18,
- null=True, blank=True)
- email = models.EmailField(
- _("Email"), max_length=300, blank=True, null=True)
- alt_address_is_prefered = models.BooleanField(
- _("Alternative address is prefered"), default=False)
- SUB_ADDRESSES = []
-
- class Meta:
- abstract = True
-
- def get_short_html_items(self):
- items = []
- if self.address:
- items.append(
- """<span class="subadress">{}</span>""".format(self.address))
- if self.address_complement:
- items.append(
- """<span class="subadress-complement">{}</span>""".format(
- self.address_complement))
- if self.postal_code:
- items.append(
- """<span class="postal-code">{}</span>""".format(
- self.postal_code))
- if self.precise_town:
- items.append(
- """<span class="town">{}</span>""".format(
- self.precise_town.name))
- elif self.town:
- items.append(
- """<span class="town">{}</span>""".format(
- self.town))
- if self.country:
- items.append(
- """<span class="country">{}</span>""".format(
- self.country))
- return items
-
- def get_short_html_detail(self):
- html = """<div class="address">"""
- items = self.get_short_html_items()
- if not items:
- items = [
- "<span class='no-address'>{}</span>".format(
- _("No associated address")
- )
- ]
- html += "".join(items)
- html += """</div>"""
- return html
-
- def get_town_centroid(self):
- if self.precise_town:
- return self.precise_town.center, self._meta.verbose_name
- for sub_address in self.SUB_ADDRESSES:
- sub_item = getattr(self, sub_address)
- if sub_item and sub_item.precise_town:
- return sub_item.precise_town.center, sub_item._meta.verbose_name
-
- def get_town_polygons(self):
- if self.precise_town:
- return self.precise_town.limit, self._meta.verbose_name
- for sub_address in self.SUB_ADDRESSES:
- sub_item = getattr(self, sub_address)
- if sub_item and sub_item.precise_town:
- return sub_item.precise_town.limit, sub_item._meta.verbose_name
-
- def get_attribute(self, attr):
- if self.town or self.precise_town:
- return getattr(self, attr)
- for sub_address in self.SUB_ADDRESSES:
- sub_item = getattr(self, sub_address)
- if not sub_item:
- continue
- if sub_item.town or sub_item.precise_town:
- return getattr(sub_item, attr)
- return getattr(self, attr)
-
- def get_address(self):
- return self.get_attribute("address")
-
- def get_address_complement(self):
- return self.get_attribute("address_complement")
-
- def get_postal_code(self):
- return self.get_attribute("postal_code")
-
- def get_town(self):
- return self.get_attribute("town")
-
- def get_precise_town(self):
- return self.get_attribute("precise_town")
-
- def get_country(self):
- return self.get_attribute("country")
-
- def simple_lbl(self):
- return str(self)
-
- def full_address(self):
- lbl = self.simple_lbl()
- if lbl:
- lbl += "\n"
- lbl += self.address_lbl()
- return lbl
-
- def address_lbl(self):
- lbl = u''
- prefix = ''
- if self.alt_address_is_prefered:
- prefix = 'alt_'
- if getattr(self, prefix + 'address'):
- lbl += getattr(self, prefix + 'address')
- if getattr(self, prefix + 'address_complement'):
- if lbl:
- lbl += "\n"
- lbl += getattr(self, prefix + 'address_complement')
- postal_code = getattr(self, prefix + 'postal_code')
- town = getattr(self, prefix + 'town')
- if postal_code or town:
- if lbl:
- lbl += "\n"
- lbl += "{}{}{}".format(
- postal_code or '',
- " " if postal_code and town else '',
- town or '')
- if self.phone:
- if lbl:
- lbl += "\n"
- lbl += "{} {}".format(str(_("Tel: ")), self.phone)
- if self.mobile_phone:
- if lbl:
- lbl += "\n"
- lbl += "{} {}".format(str(_("Mobile: ")), self.mobile_phone)
- if self.email:
- if lbl:
- lbl += "\n"
- lbl += "{} {}".format(str(_("Email: ")), self.email)
- return lbl
-
-
-class Merge(models.Model):
- merge_key = models.TextField(_("Merge key"), blank=True, null=True)
- merge_candidate = models.ManyToManyField("self", blank=True)
- merge_exclusion = models.ManyToManyField("self", blank=True)
- archived = models.NullBooleanField(default=False,
- blank=True, null=True)
- # 1 for one word similarity, 2 for two word similarity, etc.
- MERGE_CLEMENCY = None
- EMPTY_MERGE_KEY = '--'
- MERGE_ATTRIBUTE = "name"
-
- class Meta:
- abstract = True
-
- def generate_merge_key(self):
- if self.archived:
- return
- merge_attr = getattr(self, self.MERGE_ATTRIBUTE)
- self.merge_key = slugify(merge_attr if merge_attr else '')
- if not self.merge_key:
- self.merge_key = self.EMPTY_MERGE_KEY
- self.merge_key = self.merge_key
-
- def generate_merge_candidate(self):
- if self.archived:
- return
- if not self.merge_key:
- self.generate_merge_key()
- self.save(merge_key_generated=True)
- if not self.pk or self.merge_key == self.EMPTY_MERGE_KEY:
- return
- q = self.__class__.objects \
- .exclude(pk=self.pk) \
- .exclude(merge_exclusion=self) \
- .exclude(merge_candidate=self) \
- .exclude(archived=True)
- if not self.MERGE_CLEMENCY:
- q = q.filter(merge_key=self.merge_key)
- else:
- subkeys_front = "-".join(
- self.merge_key.split('-')[:self.MERGE_CLEMENCY])
- subkeys_back = "-".join(
- self.merge_key.split('-')[-self.MERGE_CLEMENCY:])
- q = q.filter(Q(merge_key__istartswith=subkeys_front) |
- Q(merge_key__iendswith=subkeys_back))
- for item in q.all():
- self.merge_candidate.add(item)
-
- def save(self, *args, **kwargs):
- # prevent circular save
- merge_key_generated = False
- if 'merge_key_generated' in kwargs:
- merge_key_generated = kwargs.pop('merge_key_generated')
- self.generate_merge_key()
- item = super(Merge, self).save(*args, **kwargs)
- if not merge_key_generated:
- self.merge_candidate.clear()
- self.generate_merge_candidate()
- return item
-
- def archive(self):
- self.archived = True
- self.save()
- self.merge_candidate.clear()
- self.merge_exclusion.clear()
-
- def merge(self, item, keep_old=False, exclude_fields=None):
- merge_model_objects(self, item, keep_old=keep_old,
- exclude_fields=exclude_fields)
- self.generate_merge_candidate()
-
-
class OrganizationType(GeneralType):
class Meta:
verbose_name = _("Organization type")
@@ -5704,15 +2931,12 @@ class Document(BaseHistorizedItem, QRCodeItem, OwnPerms, ImageModel,
verbose_name=_("Receipt date in documentation"))
item_number = models.IntegerField(_("Number of items"), default=1)
description = models.TextField(_("Description"), blank=True, null=True)
- container = models.ForeignKey(
- "archaeological_warehouse.Container", verbose_name=_("Container"),
- blank=True, null=True, related_name='contained_documents',
- on_delete=models.SET_NULL)
- container_ref = models.ForeignKey(
- "archaeological_warehouse.Container",
- verbose_name=_("Reference container"),
- blank=True, null=True,
- related_name='contained_documents_ref', on_delete=models.SET_NULL)
+ container_id = models.PositiveIntegerField(
+ verbose_name=_("Container ID"), blank=True, null=True)
+ # container = models.ForeignKey("archaeological_warehouse.Container")
+ container_ref_id = models.PositiveIntegerField(
+ verbose_name=_("Container ID"), blank=True, null=True)
+ # container_ref = models.ForeignKey("archaeological_warehouse.Container")
comment = models.TextField(_("Comment"), blank=True, null=True)
additional_information = models.TextField(_("Additional information"),
blank=True, null=True)
@@ -5750,6 +2974,26 @@ class Document(BaseHistorizedItem, QRCodeItem, OwnPerms, ImageModel,
def natural_key(self):
return (self.external_id,)
+ @property
+ def container(self):
+ if not self.container_id:
+ return
+ Container = apps.get_model("archaeological_warehouse", "Container")
+ try:
+ return Container.objects.get(pk=self.container_id)
+ except Container.DoesNotExist:
+ return
+
+ @property
+ def container_ref(self):
+ if not self.container_ref_id:
+ return
+ Container = apps.get_model("archaeological_warehouse", "Container")
+ try:
+ return Container.objects.get(pk=self.container_ref_id)
+ except Container.DoesNotExist:
+ return
+
"""
@property
def code(self):
@@ -6177,6 +3421,16 @@ class Document(BaseHistorizedItem, QRCodeItem, OwnPerms, ImageModel,
self.set_index()
if not self.associated_url:
self.associated_url = None
+ container = self.container
+ if self.container_id and not container:
+ self.container_id = None
+ if container and container.pk != self.container_id:
+ self.container_id = container.pk
+ container_ref = self.container_ref
+ if self.container_ref_id and not container_ref:
+ self.container_ref_id = None
+ if container_ref and container_ref.pk != self.container_ref_id:
+ self.container_ref_id = container_ref.pk
super(Document, self).save(*args, **kwargs)
if self.image and not no_path_change and \
@@ -6190,42 +3444,6 @@ class Document(BaseHistorizedItem, QRCodeItem, OwnPerms, ImageModel,
self.save(no_path_change=True)
-def document_attached_changed(sender, **kwargs):
- # associate a default main image
- instance = kwargs.get("instance", None)
- model = kwargs.get("model", None)
- pk_set = kwargs.get("pk_set", None)
- if not instance or not model:
- return
-
- if hasattr(instance, "documents"):
- items = [instance]
- else:
- if not pk_set:
- return
- try:
- items = [model.objects.get(pk=pk) for pk in pk_set]
- except model.DoesNotExist:
- return
-
- for item in items:
- q = item.documents.filter(
- image__isnull=False).exclude(image='')
- if item.main_image:
- if q.filter(pk=item.main_image.pk).count():
- return
- # the association has disappear not the main image anymore
- item.main_image = None
- item.skip_history_when_saving = True
- item.save()
- if not q.count():
- return
- # by default get the lowest pk
- item.main_image = q.order_by('pk').all()[0]
- item.skip_history_when_saving = True
- item.save()
-
-
post_save.connect(cached_label_changed, sender=Document)
diff --git a/ishtar_common/models_common.py b/ishtar_common/models_common.py
new file mode 100644
index 000000000..b7685b8b5
--- /dev/null
+++ b/ishtar_common/models_common.py
@@ -0,0 +1,2777 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+"""
+Generic models and tools for models
+"""
+
+"""
+from ishtar_common.models import GeneralType, get_external_id, \
+ LightHistorizedItem, OwnPerms, Address, post_save_cache, \
+ DashboardFormItem, document_attached_changed, SearchAltName, \
+ DynamicRequest, GeoItem, QRCodeItem, SearchVectorConfig, DocumentItem, \
+ QuickAction, MainItem, Merge
+
+
+"""
+
+import copy
+from collections import OrderedDict
+import datetime
+import json
+import logging
+import os
+import pyqrcode
+import shutil
+import tempfile
+import time
+
+from django import forms
+from django.apps import apps
+from django.conf import settings
+from django.contrib.auth.models import User, Group
+from django.contrib.contenttypes.models import ContentType
+from django.contrib.gis.db import models
+from django.contrib.postgres.fields import JSONField
+from django.contrib.postgres.search import SearchVectorField, SearchVector
+from django.contrib.sites.models import Site
+from django.core.cache import cache
+from django.core.exceptions import ObjectDoesNotExist
+from django.core.files import File
+from django.core.serializers import serialize
+from django.core.urlresolvers import reverse, NoReverseMatch
+from django.core.validators import validate_slug
+from django.db import connection
+from django.db.models import Q, Count
+from django.db.models.signals import post_save, post_delete, m2m_changed
+from django.template.defaultfilters import slugify
+from django.utils.safestring import SafeText, mark_safe
+from django.utils.translation import activate, deactivate
+from ishtar_common.utils import ugettext_lazy as _, \
+ pgettext_lazy, get_image_path
+from simple_history.models import HistoricalRecords as BaseHistoricalRecords
+from simple_history.signals import post_create_historical_record, \
+ pre_create_historical_record
+from unidecode import unidecode
+
+from ishtar_common.model_managers import TypeManager
+from ishtar_common.model_merging import merge_model_objects
+from ishtar_common.models_imports import Import
+from ishtar_common.templatetags.link_to_window import simple_link_to_window
+from ishtar_common.utils import get_cache, disable_for_loaddata, \
+ get_all_field_names, merge_tsvectors, cached_label_changed, post_save_geo, \
+ task, duplicate_item, get_external_id, get_current_profile
+
+"""
+from ishtar_common.models import get_external_id, \
+ LightHistorizedItem, OwnPerms, Address, post_save_cache, \
+ DashboardFormItem, document_attached_changed, SearchAltName, \
+ DynamicRequest, GeoItem, QRCodeItem, SearchVectorConfig, DocumentItem, \
+ QuickAction, MainItem, Merge
+
+
+"""
+
+logger = logging.getLogger(__name__)
+
+
+class CachedGen(object):
+ @classmethod
+ def refresh_cache(cls):
+ raise NotImplementedError()
+
+ @classmethod
+ def _add_cache_key_to_refresh(cls, keys):
+ cache_ckey, current_keys = get_cache(cls, ['_current_keys'])
+ if type(current_keys) != list:
+ current_keys = []
+ if keys not in current_keys:
+ current_keys.append(keys)
+ cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT)
+
+
+class Cached(CachedGen):
+ slug_field = 'txt_idx'
+
+ @classmethod
+ def refresh_cache(cls):
+ cache_ckey, current_keys = get_cache(cls, ['_current_keys'])
+ if not current_keys:
+ return
+ for keys in current_keys:
+ if len(keys) == 2 and keys[0] == '__slug':
+ cls.get_cache(keys[1], force=True)
+ elif keys[0] == '__get_types':
+ default = None
+ empty_first = True
+ exclude = []
+ if len(keys) >= 2:
+ default = keys.pop()
+ if len(keys) > 1:
+ empty_first = bool(keys.pop())
+ exclude = keys[1:]
+ cls.get_types(
+ exclude=exclude, empty_first=empty_first, default=default,
+ force=True)
+ elif keys[0] == '__get_help':
+ cls.get_help(force=True)
+
+ @classmethod
+ def _add_cache_key_to_refresh(cls, keys):
+ cache_ckey, current_keys = get_cache(cls, ['_current_keys'])
+ if type(current_keys) != list:
+ current_keys = []
+ if keys not in current_keys:
+ current_keys.append(keys)
+ cache.set(cache_ckey, current_keys, settings.CACHE_TIMEOUT)
+
+ @classmethod
+ def get_cache(cls, slug, force=False):
+ cache_key, value = get_cache(cls, ['__slug', slug])
+ if not force and value:
+ return value
+ try:
+ k = {cls.slug_field: slug}
+ obj = cls.objects.get(**k)
+ cache.set(cache_key, obj, settings.CACHE_TIMEOUT)
+ return obj
+ except cls.DoesNotExist:
+ cache.set(cache_key, None, settings.CACHE_TIMEOUT)
+ return None
+
+
+@disable_for_loaddata
+def post_save_cache(sender, **kwargs):
+ sender.refresh_cache()
+
+
+class GeneralType(Cached, models.Model):
+ """
+ Abstract class for "types"
+ """
+ label = models.TextField(_("Label"))
+ txt_idx = models.TextField(
+ _("Textual ID"), validators=[validate_slug],
+ unique=True,
+ help_text=_(
+ "The slug is the standardized version of the name. It contains "
+ "only lowercase letters, numbers and hyphens. Each slug must "
+ "be unique."))
+ comment = models.TextField(_("Comment"), blank=True, null=True)
+ available = models.BooleanField(_("Available"), default=True)
+ HELP_TEXT = ""
+ objects = TypeManager()
+
+ class Meta:
+ abstract = True
+
+ def __str__(self):
+ return self.label
+
+ def natural_key(self):
+ return (self.txt_idx,)
+
+ def history_compress(self):
+ return self.txt_idx
+
+ @classmethod
+ def history_decompress(cls, value, create=False):
+ if not value:
+ return []
+ res = []
+ for txt_idx in value:
+ try:
+ res.append(cls.objects.get(txt_idx=txt_idx))
+ except cls.DoesNotExist:
+ continue
+ return res
+
+ @property
+ def explicit_label(self):
+ return "{} ({})".format(self.label, self._meta.verbose_name)
+
+ @classmethod
+ def create_default_for_test(cls):
+ return [cls.objects.create(label='Test %d' % i) for i in range(5)]
+
+ @property
+ def short_label(self):
+ return self.label
+
+ @property
+ def name(self):
+ return self.label
+
+ @classmethod
+ def get_or_create(cls, slug, label=''):
+ """
+ Get or create a new item.
+
+ :param slug: textual id
+ :param label: label for initialization if the item doesn't exist (not
+ mandatory)
+
+ :return: instancied item of the base class
+ """
+
+ item = cls.get_cache(slug)
+ if item:
+ return item
+ item, created = cls.objects.get_or_create(
+ txt_idx=slug, defaults={'label': label})
+ return item
+
+ @classmethod
+ def get_or_create_pk(cls, slug):
+ """
+ Get an id from a slug. Create the associated item if needed.
+
+ :param slug: textual id
+
+ :return: id of the item (string)
+ """
+ return str(cls.get_or_create(slug).pk)
+
+ @classmethod
+ def get_or_create_pks(cls, slugs):
+ """
+ Get and merge a list of ids from a slug list. Create the associated
+ items if needed.
+
+ :param slugs: textual ids
+
+ :return: string with ids separated by "_"
+ """
+ items = []
+ for slug in slugs:
+ items.append(str(cls.get_or_create(slug).pk))
+ return "_".join(items)
+
+ @classmethod
+ def get_help(cls, dct=None, exclude=None, force=False, full_hierarchy=None):
+ if not dct:
+ dct = {}
+ if not exclude:
+ exclude = []
+ keys = ['__get_help']
+ keys += ["{}".format(ex) for ex in exclude]
+ keys += ['{}-{}'.format(str(k), dct[k]) for k in dct]
+ cache_key, value = get_cache(cls, keys)
+ if value and not force:
+ return mark_safe(value)
+ help_text = cls.HELP_TEXT
+ c_rank = -1
+ help_items = "\n"
+ for item in cls.get_types(dct=dct, instances=True, exclude=exclude):
+ if hasattr(item, '__iter__'):
+ pk = item[0]
+ item = cls.objects.get(pk=pk)
+ item.rank = c_rank + 1
+ if hasattr(item, 'parent'):
+ c_item = item
+ parents = []
+ while c_item.parent:
+ parents.append(c_item.parent.label)
+ c_item = c_item.parent
+ parents.reverse()
+ parents.append(item.label)
+ item.label = " / ".join(parents)
+ if not item.comment:
+ continue
+ if c_rank > item.rank:
+ help_items += "</dl>\n"
+ elif c_rank < item.rank:
+ help_items += "<dl>\n"
+ c_rank = item.rank
+ help_items += "<dt>%s</dt><dd>%s</dd>" % (
+ item.label, "<br/>".join(item.comment.split('\n')))
+ c_rank += 1
+ if c_rank:
+ help_items += c_rank * "</dl>"
+ if help_text or help_items != u'\n':
+ help_text = help_text + help_items
+ else:
+ help_text = ""
+ cache.set(cache_key, help_text, settings.CACHE_TIMEOUT)
+ return mark_safe(help_text)
+
+ @classmethod
+ def _get_initial_types(cls, initial, type_pks, instance=False):
+ new_vals = []
+ if not initial:
+ return []
+ if type(initial) not in (list, tuple):
+ initial = [initial]
+ for value in initial:
+ try:
+ pk = int(value)
+ except (ValueError, TypeError):
+ continue
+ if pk in type_pks:
+ continue
+ try:
+ extra_type = cls.objects.get(pk=pk)
+ if instance:
+ new_vals.append(extra_type)
+ else:
+ new_vals.append((extra_type.pk, str(extra_type)))
+ except cls.DoesNotExist:
+ continue
+ return new_vals
+
+ @classmethod
+ def get_types(cls, dct=None, instances=False, exclude=None,
+ empty_first=True, default=None, initial=None, force=False,
+ full_hierarchy=False):
+ if not dct:
+ dct = {}
+ if not exclude:
+ exclude = []
+ types = []
+ if not instances and empty_first and not default:
+ types = [('', '--')]
+ types += cls._pre_get_types(dct, instances, exclude,
+ default, force,
+ get_full_hierarchy=full_hierarchy)
+ if not initial:
+ return types
+ new_vals = cls._get_initial_types(initial, [idx for idx, lbl in types])
+ types += new_vals
+ return types
+
+ @classmethod
+ def _pre_get_types(cls, dct=None, instances=False, exclude=None,
+ default=None, force=False, get_full_hierarchy=False):
+ if not dct:
+ dct = {}
+ if not exclude:
+ exclude = []
+ # cache
+ cache_key = None
+ if not instances:
+ keys = ['__get_types']
+ keys += ["{}".format(ex) for ex in exclude] + \
+ ["{}".format(default)]
+ keys += ['{}-{}'.format(str(k), dct[k]) for k in dct]
+ cache_key, value = get_cache(cls, keys)
+ if value and not force:
+ return value
+ base_dct = dct.copy()
+ if hasattr(cls, 'parent'):
+ if not cache_key:
+ return cls._get_parent_types(
+ base_dct, instances, exclude=exclude,
+ default=default, get_full_hierarchy=get_full_hierarchy)
+ vals = [v for v in cls._get_parent_types(
+ base_dct, instances, exclude=exclude,
+ default=default, get_full_hierarchy=get_full_hierarchy)]
+ cache.set(cache_key, vals, settings.CACHE_TIMEOUT)
+ return vals
+
+ if not cache_key:
+ return cls._get_types(base_dct, instances, exclude=exclude,
+ default=default)
+ vals = [
+ v for v in cls._get_types(base_dct, instances, exclude=exclude,
+ default=default)
+ ]
+ cache.set(cache_key, vals, settings.CACHE_TIMEOUT)
+ return vals
+
+ @classmethod
+ def _get_types(cls, dct=None, instances=False, exclude=None, default=None):
+ if not dct:
+ dct = {}
+ if not exclude:
+ exclude = []
+ dct['available'] = True
+ if default:
+ try:
+ default = cls.objects.get(txt_idx=default)
+ yield (default.pk, _(str(default)))
+ except cls.DoesNotExist:
+ pass
+ items = cls.objects.filter(**dct)
+ if default and default != "None":
+ if hasattr(default, 'txt_idx'):
+ exclude.append(default.txt_idx)
+ else:
+ exclude.append(default)
+ if exclude:
+ items = items.exclude(txt_idx__in=exclude)
+ for item in items.order_by(*cls._meta.ordering).all():
+ if instances:
+ item.rank = 0
+ yield item
+ else:
+ yield (item.pk, _(str(item)) if item and str(item) else '')
+
+ @classmethod
+ def _get_childs_list(cls, dct=None, exclude=None, instances=False):
+ if not dct:
+ dct = {}
+ if not exclude:
+ exclude = []
+ if 'parent' in dct:
+ dct.pop('parent')
+ childs = cls.objects.filter(**dct)
+ if exclude:
+ childs = childs.exclude(txt_idx__in=exclude)
+ if hasattr(cls, 'order'):
+ childs = childs.order_by('order')
+ res = {}
+ if instances:
+ for item in childs.all():
+ parent_id = item.parent_id or 0
+ if parent_id not in res:
+ res[parent_id] = []
+ res[parent_id].append(item)
+ else:
+ for item in childs.values("id", "parent_id", "label").all():
+ parent_id = item["parent_id"] or 0
+ if item["id"] == item["parent_id"]:
+ parent_id = 0
+ if parent_id not in res:
+ res[parent_id] = []
+ res[parent_id].append((item["id"], item["label"]))
+ return res
+
+ PREFIX = "&#x2502; "
+ PREFIX_EMPTY = "&nbsp; "
+ PREFIX_MEDIUM = "&#x251C; "
+ PREFIX_LAST = "&#x2514; "
+ PREFIX_CODES = ["\u2502", "\u251C", "\u2514"]
+
+ @classmethod
+ def _get_childs(cls, item, child_list, prefix=0, instances=False,
+ is_last=False, last_of=None, get_full_hierarchy=False):
+ if not last_of:
+ last_of = []
+
+ prefix += 1
+ current_child_lst = []
+ if item in child_list:
+ current_child_lst = child_list[item]
+
+ lst = []
+ total = len(current_child_lst)
+ full_hierarchy_initial = get_full_hierarchy
+ for idx, child in enumerate(current_child_lst):
+ mylast_of = last_of[:]
+ p = ''
+ if instances:
+ child.rank = prefix
+ lst.append(child)
+ else:
+ if full_hierarchy_initial:
+ if isinstance(full_hierarchy_initial, str):
+ p = full_hierarchy_initial + " > "
+ else:
+ p = ""
+ else:
+ cprefix = prefix
+ while cprefix:
+ cprefix -= 1
+ if not cprefix:
+ if (idx + 1) == total:
+ p += cls.PREFIX_LAST
+ else:
+ p += cls.PREFIX_MEDIUM
+ elif is_last:
+ if mylast_of:
+ clast = mylast_of.pop(0)
+ if clast:
+ p += cls.PREFIX_EMPTY
+ else:
+ p += cls.PREFIX
+ else:
+ p += cls.PREFIX_EMPTY
+ else:
+ p += cls.PREFIX
+ lst.append((
+ child[0], SafeText(p + str(_(child[1])))
+ ))
+ clast_of = last_of[:]
+ clast_of.append(idx + 1 == total)
+ if instances:
+ child_id = child.id
+ else:
+ child_id = child[0]
+ if get_full_hierarchy:
+ if p:
+ if not p.endswith(" > "):
+ p += " > "
+ get_full_hierarchy = p + child[1]
+ else:
+ get_full_hierarchy = child[1]
+ for sub_child in cls._get_childs(
+ child_id, child_list, prefix, instances,
+ is_last=((idx + 1) == total), last_of=clast_of,
+ get_full_hierarchy=get_full_hierarchy):
+ lst.append(sub_child)
+ return lst
+
+ @classmethod
+ def _get_parent_types(cls, dct=None, instances=False, exclude=None,
+ default=None, get_full_hierarchy=False):
+ if not dct:
+ dct = {}
+ if not exclude:
+ exclude = []
+ dct['available'] = True
+ child_list = cls._get_childs_list(dct, exclude, instances)
+
+ if 0 in child_list:
+ for item in child_list[0]:
+ if instances:
+ item.rank = 0
+ item_id = item.pk
+ yield item
+ else:
+ item_id = item[0]
+ yield item
+ if get_full_hierarchy:
+ get_full_hierarchy = item[1]
+ for child in cls._get_childs(
+ item_id, child_list, instances=instances,
+ get_full_hierarchy=get_full_hierarchy):
+ yield child
+
+ def save(self, *args, **kwargs):
+ ItemKey = apps.get_model("ishtar_common", "ItemKey")
+ if not self.id and not self.label:
+ txt_idx = self.txt_idx
+ if isinstance(txt_idx, list):
+ txt_idx = txt_idx[0]
+ self.txt_idx = txt_idx
+ self.label = " ".join(" ".join(self.txt_idx.split('-'))
+ .split('_')).title()
+ if not self.txt_idx:
+ self.txt_idx = slugify(self.label)[:100]
+
+ # clean old keys
+ if self.pk:
+ old = self.__class__.objects.get(pk=self.pk)
+ content_type = ContentType.objects.get_for_model(self.__class__)
+ if slugify(self.label) != slugify(old.label):
+ ItemKey.objects.filter(
+ object_id=self.pk, key=slugify(old.label),
+ content_type=content_type).delete()
+ if self.txt_idx != old.txt_idx:
+ ItemKey.objects.filter(
+ object_id=self.pk, key=old.txt_idx,
+ content_type=content_type).delete()
+
+ obj = super(GeneralType, self).save(*args, **kwargs)
+ self.generate_key(force=True)
+ return obj
+
+ def add_key(self, key, force=False, importer=None, group=None,
+ user=None):
+ ItemKey = apps.get_model("ishtar_common", "ItemKey")
+ content_type = ContentType.objects.get_for_model(self.__class__)
+ if not importer and not force and ItemKey.objects.filter(
+ key=key, content_type=content_type).count():
+ return
+ filtr = {'key': key, 'content_type': content_type}
+ if group:
+ filtr['group'] = group
+ elif user:
+ filtr['user'] = user
+ else:
+ filtr['importer'] = importer
+ if force:
+ ItemKey.objects.filter(**filtr).exclude(object_id=self.pk).delete()
+ filtr['object_id'] = self.pk
+ ItemKey.objects.get_or_create(**filtr)
+
+ def generate_key(self, force=False):
+ for key in (slugify(self.label), self.txt_idx):
+ self.add_key(key)
+
+ def get_keys(self, importer):
+ ItemKey = apps.get_model("ishtar_common", "ItemKey")
+ keys = [self.txt_idx]
+ content_type = ContentType.objects.get_for_model(self.__class__)
+ base_q = Q(content_type=content_type, object_id=self.pk)
+ subquery = Q(importer__isnull=True, user__isnull=True,
+ group__isnull=True)
+ subquery |= Q(user__isnull=True, group__isnull=True,
+ importer=importer)
+ if importer.user:
+ subquery |= Q(user=importer.user, group__isnull=True,
+ importer=importer)
+ if importer.associated_group:
+ subquery |= Q(user__isnull=True, group=importer.associated_group,
+ importer=importer)
+ q = ItemKey.objects.filter(base_q & subquery)
+ for ik in q.exclude(key=self.txt_idx).all():
+ keys.append(ik.key)
+ return keys
+
+ @classmethod
+ def generate_keys(cls):
+ # content_type = ContentType.objects.get_for_model(cls)
+ for item in cls.objects.all():
+ item.generate_key()
+
+
+class HierarchicalType(GeneralType):
+ parent = models.ForeignKey('self', blank=True, null=True,
+ on_delete=models.SET_NULL,
+ verbose_name=_("Parent"))
+
+ class Meta:
+ abstract = True
+
+ def full_label(self):
+ lbls = [self.label]
+ item = self
+ while item.parent:
+ item = item.parent
+ lbls.append(item.label)
+ return " > ".join(reversed(lbls))
+
+
+class StatisticItem:
+ STATISTIC_MODALITIES = [] # example: "year", "operation_type__label"
+ STATISTIC_MODALITIES_OPTIONS = OrderedDict() # example:
+ # OrderedDict([('year', _("Year")),
+ # ("operation_type__label", _("Operation type"))])
+ STATISTIC_SUM_VARIABLE = OrderedDict(
+ (("pk", (_("Number"), 1)),)
+ ) # example: "Price", "Volume" - the number is a multiplier
+
+
+class TemplateItem:
+ @classmethod
+ def _label_templates_q(cls):
+ model_name = "{}.{}".format(
+ cls.__module__, cls.__name__)
+ q = Q(associated_model__klass=model_name,
+ for_labels=True, available=True)
+ alt_model_name = model_name.replace(
+ "models_finds", "models").replace(
+ "models_treatments", "models")
+ if alt_model_name != model_name:
+ q |= Q(associated_model__klass=model_name,
+ for_labels=True, available=True)
+ DocumentTemplate = apps.get_model("ishtar_common", "DocumentTemplate")
+ return DocumentTemplate.objects.filter(q)
+
+ @classmethod
+ def has_label_templates(cls):
+ return cls._label_templates_q().count()
+
+ @classmethod
+ def label_templates(cls):
+ return cls._label_templates_q()
+
+ def get_extra_templates(self, request):
+ cls = self.__class__
+ templates = []
+ name = str(cls.__name__)
+ module = str(cls.__module__)
+ if "archaeological_finds" in module:
+ if "models_finds" in name or "models_treatments" in name:
+ names = [
+ name,
+ name.replace("models_finds", "models"
+ ).replace("models_treatments", "models")
+ ]
+ else:
+ names = [name, name.replace("models", "models_finds"),
+ name.replace("models", "models_treatments")]
+ else:
+ names = [name]
+ model_names = [
+ "{}.{}".format(module, name) for name in names
+ ]
+ DocumentTemplate = apps.get_model("ishtar_common", "DocumentTemplate")
+ q = DocumentTemplate.objects.filter(
+ associated_model__klass__in=model_names,
+ for_labels=False, available=True)
+ for template in q.all():
+ urlname = "generate-document"
+ templates.append(
+ (template.name, reverse(
+ urlname, args=[template.slug, self.pk]))
+ )
+ return templates
+
+
+class FullSearch(models.Model):
+ search_vector = SearchVectorField(_("Search vector"), blank=True, null=True,
+ help_text=_("Auto filled at save"))
+
+ EXTRA_REQUEST_KEYS = {}
+ DYNAMIC_REQUESTS = {}
+ ALT_NAMES = {}
+
+ BASE_SEARCH_VECTORS = []
+ PROPERTY_SEARCH_VECTORS = []
+ INT_SEARCH_VECTORS = []
+ M2M_SEARCH_VECTORS = []
+ PARENT_SEARCH_VECTORS = []
+ # prevent circular dependency
+ PARENT_ONLY_SEARCH_VECTORS = []
+
+ class Meta:
+ abstract = True
+
+ @classmethod
+ def general_types(cls):
+ for k in get_all_field_names(cls):
+ field = cls._meta.get_field(k)
+ if not hasattr(field, 'rel') or not field.rel:
+ continue
+ rel_model = field.rel.to
+ if issubclass(rel_model, (GeneralType, HierarchicalType)):
+ yield k
+
+ @classmethod
+ def get_alt_names(cls):
+ alt_names = cls.ALT_NAMES.copy()
+ for dr_k in cls.DYNAMIC_REQUESTS:
+ alt_names.update(cls.DYNAMIC_REQUESTS[dr_k].get_alt_names())
+ return alt_names
+
+ @classmethod
+ def get_query_parameters(cls):
+ query_parameters = {}
+ for v in cls.get_alt_names().values():
+ for language_code, language_lbl in settings.LANGUAGES:
+ activate(language_code)
+ query_parameters[str(v.search_key)] = v
+ deactivate()
+ return query_parameters
+
+ def _update_search_field(self, search_vector_conf, search_vectors, data):
+ for value in search_vector_conf.format(data):
+ with connection.cursor() as cursor:
+ cursor.execute("SELECT to_tsvector(%s, %s)", [
+ search_vector_conf.language, value])
+ row = cursor.fetchone()
+ search_vectors.append(row[0])
+
+ def _update_search_number_field(self, search_vectors, val):
+ search_vectors.append("'{}':1".format(val))
+
+ def update_search_vector(self, save=True, exclude_parent=False):
+ """
+ Update the search vector
+ :param save: True if you want to save the object immediately
+ :return: True if modified
+ """
+ if not hasattr(self, 'search_vector'):
+ return
+ if not self.pk:
+ # logger.warning("Cannot update search vector before save or "
+ # "after deletion.")
+ return
+ if not self.BASE_SEARCH_VECTORS and not self.M2M_SEARCH_VECTORS \
+ and not self.INT_SEARCH_VECTORS \
+ and not self.PROPERTY_SEARCH_VECTORS \
+ and not self.PARENT_SEARCH_VECTORS:
+ logger.warning("No search_vectors defined for {}".format(
+ self.__class__))
+ return
+ if getattr(self, '_search_updated', None):
+ return
+ JsonDataField = apps.get_model("ishtar_common", "JsonDataField")
+ self._search_updated = True
+
+ old_search = ""
+ if self.search_vector:
+ old_search = self.search_vector[:]
+ search_vectors = []
+ base_q = self.__class__.objects.filter(pk=self.pk)
+
+ # many to many have to be queried one by one otherwise only one is fetch
+ for m2m_search_vector in self.M2M_SEARCH_VECTORS:
+ key = m2m_search_vector.key.split('__')[0]
+ rel_key = getattr(self, key)
+ for item in rel_key.values('pk').all():
+ query_dct = {key + "__pk": item['pk']}
+ q = copy.copy(base_q).filter(**query_dct)
+ q = q.annotate(
+ search=SearchVector(
+ m2m_search_vector.key,
+ config=m2m_search_vector.language)
+ ).values('search')
+ search_vectors.append(q.all()[0]['search'])
+
+ # int/float are not well managed by the SearchVector
+ for int_search_vector in self.INT_SEARCH_VECTORS:
+ q = base_q.values(int_search_vector.key)
+ for val in int_search_vector.format(
+ q.all()[0][int_search_vector.key]):
+ self._update_search_number_field(search_vectors, val)
+
+ if not exclude_parent:
+ # copy parent vector fields
+ for PARENT_SEARCH_VECTOR in self.PARENT_SEARCH_VECTORS:
+ parent = getattr(self, PARENT_SEARCH_VECTOR)
+ if hasattr(parent, 'all'): # m2m
+ for p in parent.all():
+ search_vectors.append(p.search_vector)
+ elif parent:
+ search_vectors.append(parent.search_vector)
+
+ for PARENT_ONLY_SEARCH_VECTOR in self.PARENT_ONLY_SEARCH_VECTORS:
+ parent = getattr(self, PARENT_ONLY_SEARCH_VECTOR)
+ if hasattr(parent, 'all'): # m2m
+ for p in parent.all():
+ search_vectors.append(
+ p.update_search_vector(save=False, exclude_parent=True)
+ )
+ elif parent:
+ search_vectors.append(
+ parent.update_search_vector(save=False, exclude_parent=True)
+ )
+
+ if self.BASE_SEARCH_VECTORS:
+ # query "simple" fields
+ q = base_q.values(*[sv.key for sv in self.BASE_SEARCH_VECTORS])
+ res = q.all()[0]
+ for base_search_vector in self.BASE_SEARCH_VECTORS:
+ data = res[base_search_vector.key]
+ data = unidecode(str(data))
+ self._update_search_field(base_search_vector,
+ search_vectors, data)
+
+ if self.PROPERTY_SEARCH_VECTORS:
+ for property_search_vector in self.PROPERTY_SEARCH_VECTORS:
+ data = getattr(self, property_search_vector.key)
+ if callable(data):
+ data = data()
+ if not data:
+ continue
+ data = str(data)
+ self._update_search_field(property_search_vector,
+ search_vectors, data)
+
+ if hasattr(self, 'data') and self.data:
+ content_type = ContentType.objects.get_for_model(self)
+ for json_field in JsonDataField.objects.filter(
+ content_type=content_type,
+ search_index=True).all():
+ data = copy.deepcopy(self.data)
+ no_data = False
+ for key in json_field.key.split('__'):
+ if key not in data:
+ no_data = True
+ break
+ data = data[key]
+ if no_data or not data:
+ continue
+
+ if json_field.value_type == 'B':
+ if data is True:
+ data = json_field.name
+ else:
+ continue
+ elif json_field.value_type in ('I', 'F'):
+ self._update_search_number_field(search_vectors, data)
+ continue
+ elif json_field.value_type == 'D':
+ # only index year
+ self._update_search_number_field(search_vectors, data.year)
+ continue
+ for lang in ("simple", settings.ISHTAR_SEARCH_LANGUAGE):
+ with connection.cursor() as cursor:
+ cursor.execute("SELECT to_tsvector(%s, %s)",
+ [lang, data])
+ row = cursor.fetchone()
+ search_vectors.append(row[0])
+ new_search_vector = merge_tsvectors(search_vectors)
+ changed = old_search != new_search_vector
+ self.search_vector = new_search_vector
+ if save and changed:
+ self.__class__.objects.filter(pk=self.pk).update(
+ search_vector=new_search_vector)
+ elif not save:
+ return new_search_vector
+ return changed
+
+
+class Imported(models.Model):
+ imports = models.ManyToManyField(
+ Import, blank=True,
+ related_name="imported_%(app_label)s_%(class)s")
+
+ class Meta:
+ abstract = True
+
+
+class JsonData(models.Model, CachedGen):
+ data = JSONField(default={}, blank=True)
+
+ class Meta:
+ abstract = True
+
+ def pre_save(self):
+ if not self.data:
+ self.data = {}
+
+ @property
+ def json_sections(self):
+ sections = []
+ try:
+ content_type = ContentType.objects.get_for_model(self)
+ except ContentType.DoesNotExists:
+ return sections
+ JsonDataField = apps.get_model("ishtar_common", "JsonDataField")
+ fields = list(JsonDataField.objects.filter(
+ content_type=content_type, display=True, section__isnull=True
+ ).all()) # no section fields
+
+ fields += list(JsonDataField.objects.filter(
+ content_type=content_type, display=True, section__isnull=False
+ ).order_by('section__order', 'order').all())
+
+ for field in fields:
+ value = None
+ data = self.data.copy()
+ for key in field.key.split('__'):
+ if key in data:
+ value = copy.copy(data[key])
+ data = data[key]
+ else:
+ value = None
+ break
+ if value is None:
+ continue
+ if type(value) in (list, tuple):
+ value = " ; ".join([str(v) for v in value])
+ section_name = field.section.name if field.section else None
+ if not sections or section_name != sections[-1][0]:
+ # if section name is identical it is the same
+ sections.append((section_name, []))
+ sections[-1][1].append((field.name, value))
+ return sections
+
+ @classmethod
+ def refresh_cache(cls):
+ __, refreshed = get_cache(cls, ['cache_refreshed'])
+ if refreshed and time.time() - refreshed < 1:
+ return
+ cache_ckey, current_keys = get_cache(cls, ['_current_keys'])
+ if not current_keys:
+ return
+ for keys in current_keys:
+ if keys[0] == '__get_dynamic_choices':
+ cls._get_dynamic_choices(keys[1], force=True)
+
+ @classmethod
+ def _get_dynamic_choices(cls, key, force=False):
+ """
+ Get choice from existing values
+ :param key: data key
+ :param force: if set to True do not use cache
+ :return: tuple of choices (id, value)
+ """
+ cache_key, value = get_cache(cls, ['__get_dynamic_choices', key])
+ if not force and value:
+ return value
+ choices = set()
+ splitted_key = key[len('data__'):].split('__')
+ q = cls.objects.filter(
+ data__has_key=key[len('data__'):]).values_list('data', flat=True)
+ for value in q.all():
+ for k in splitted_key:
+ value = value[k]
+ choices.add(value)
+ choices = [('', '')] + [(v, v) for v in sorted(list(choices))]
+ cache.set(cache_key, choices, settings.CACHE_SMALLTIMEOUT)
+ return choices
+
+
+class FixAssociated:
+ ASSOCIATED = {}
+
+ def fix_associated(self):
+ for key in self.ASSOCIATED:
+ item = getattr(self, key)
+ if not item:
+ continue
+ dct = self.ASSOCIATED[key]
+ for dct_key in dct:
+ subkey, ctype = dct_key
+ expected_values = dct[dct_key]
+ if not isinstance(expected_values, (list, tuple)):
+ expected_values = [expected_values]
+ if hasattr(ctype, "txt_idx"):
+ try:
+ expected_values = [ctype.objects.get(txt_idx=v)
+ for v in expected_values]
+ except ctype.DoesNotExist:
+ # type not yet initialized
+ return
+ current_vals = getattr(item, subkey)
+ is_many = False
+ if hasattr(current_vals, "all"):
+ is_many = True
+ current_vals = current_vals.all()
+ else:
+ current_vals = [current_vals]
+ is_ok = False
+ for current_val in current_vals:
+ if current_val in expected_values:
+ is_ok = True
+ break
+ if is_ok:
+ continue
+ # the first value is used
+ new_value = expected_values[0]
+ if is_many:
+ getattr(item, subkey).add(new_value)
+ else:
+ setattr(item, subkey, new_value)
+
+
+class CascasdeUpdate:
+ DOWN_MODEL_UPDATE = []
+
+ def cascade_update(self):
+ for down_model in self.DOWN_MODEL_UPDATE:
+ if not settings.USE_BACKGROUND_TASK:
+ rel = getattr(self, down_model)
+ if hasattr(rel.model, "need_update"):
+ rel.update(need_update=True)
+ continue
+ for item in getattr(self, down_model).all():
+ cached_label_changed(item.__class__, instance=item)
+ if hasattr(item, "point_2d"):
+ post_save_geo(item.__class__, instance=item)
+
+
+class SearchAltName(object):
+ def __init__(self, search_key, search_query, extra_query=None,
+ distinct_query=False):
+ self.search_key = search_key
+ self.search_query = search_query
+ self.extra_query = extra_query or {}
+ self.distinct_query = distinct_query
+
+
+class HistoryError(Exception):
+ def __init__(self, value):
+ self.value = value
+
+ def __str__(self):
+ return repr(self.value)
+
+
+class HistoricalRecords(BaseHistoricalRecords):
+ def _save_historic(self, manager, instance, history_date, history_type,
+ history_user, history_change_reason, using, attrs):
+ history_instance = manager.model(
+ history_date=history_date,
+ history_type=history_type,
+ history_user=history_user,
+ history_change_reason=history_change_reason,
+ **attrs
+ )
+
+ pre_create_historical_record.send(
+ sender=manager.model,
+ instance=instance,
+ history_date=history_date,
+ history_user=history_user,
+ history_change_reason=history_change_reason,
+ history_instance=history_instance,
+ using=using,
+ )
+
+ history_instance.save(using=using)
+
+ post_create_historical_record.send(
+ sender=manager.model,
+ instance=instance,
+ history_instance=history_instance,
+ history_date=history_date,
+ history_user=history_user,
+ history_change_reason=history_change_reason,
+ using=using,
+ )
+
+ def create_historical_record(self, instance, history_type, using=None):
+ try:
+ history_modifier = getattr(instance, 'history_modifier', None)
+ assert history_modifier
+ except (User.DoesNotExist, AssertionError):
+ # on batch removing of users, user could have disappeared
+ return
+ history_date = getattr(instance, "_history_date",
+ datetime.datetime.now())
+ history_change_reason = getattr(instance, "changeReason", None)
+ force = getattr(instance, "_force_history", False)
+ manager = getattr(instance, self.manager_name)
+ attrs = {}
+ for field in instance._meta.fields:
+ attrs[field.attname] = getattr(instance, field.attname)
+ q_history = instance.history \
+ .filter(history_modifier_id=history_modifier.pk) \
+ .order_by('-history_date', '-history_id')
+ # instance.skip_history_when_saving = True
+ if not q_history.count():
+ if force:
+ delattr(instance, '_force_history')
+ self._save_historic(
+ manager, instance, history_date, history_type, history_modifier,
+ history_change_reason, using, attrs)
+ return
+ old_instance = q_history.all()[0]
+ # multiple saving by the same user in a very short time are generaly
+ # caused by post_save signals it is not relevant to keep them
+ min_history_date = datetime.datetime.now() \
+ - datetime.timedelta(seconds=5)
+ q = q_history.filter(history_date__isnull=False,
+ history_date__gt=min_history_date) \
+ .order_by('-history_date', '-history_id')
+ if not force and q.count():
+ return
+
+ if force:
+ delattr(instance, '_force_history')
+
+ # record a new version only if data have been changed
+ for field in instance._meta.fields:
+ if getattr(old_instance, field.attname) != attrs[field.attname]:
+ self._save_historic(manager, instance, history_date,
+ history_type, history_modifier,
+ history_change_reason, using, attrs)
+ return
+
+
+class BaseHistorizedItem(StatisticItem, TemplateItem, FullSearch, Imported,
+ JsonData, FixAssociated, CascasdeUpdate):
+ """
+ Historized item with external ID management.
+ All historized items are searchable and have a data json field.
+ Historized items can be "locked" for edition.
+ """
+ IS_BASKET = False
+ SHOW_URL = None
+ EXTERNAL_ID_KEY = ''
+ EXTERNAL_ID_DEPENDENCIES = []
+ HISTORICAL_M2M = []
+
+ history_modifier = models.ForeignKey(
+ User, related_name='+', on_delete=models.SET_NULL,
+ verbose_name=_("Last editor"), blank=True, null=True)
+ history_creator = models.ForeignKey(
+ User, related_name='+', on_delete=models.SET_NULL,
+ verbose_name=_("Creator"), blank=True, null=True)
+ last_modified = models.DateTimeField(auto_now=True)
+ history_m2m = JSONField(default={}, blank=True)
+ need_update = models.BooleanField(
+ verbose_name=_("Need update"), default=False)
+ locked = models.BooleanField(
+ verbose_name=_("Item locked for edition"), default=False)
+ lock_user = models.ForeignKey(
+ User, related_name='+', on_delete=models.SET_NULL,
+ verbose_name=_("Locked by"), blank=True, null=True)
+
+ ALT_NAMES = {
+ 'history_creator': SearchAltName(
+ pgettext_lazy("key for text search", u"created-by"),
+ 'history_creator__ishtaruser__person__cached_label__iexact'
+ ),
+ 'history_modifier': SearchAltName(
+ pgettext_lazy("key for text search", u"modified-by"),
+ 'history_modifier__ishtaruser__person__cached_label__iexact'
+ ),
+ 'modified_before': SearchAltName(
+ pgettext_lazy("key for text search", "modified-before"),
+ 'last_modified__lte'
+ ),
+ 'modified_after': SearchAltName(
+ pgettext_lazy("key for text search", "modified-after"),
+ 'last_modified__gte'
+ ),
+ }
+
+ class Meta:
+ abstract = True
+
+ @classmethod
+ def get_verbose_name(cls):
+ return cls._meta.verbose_name
+
+ def is_locked(self, user=None):
+ if not user:
+ return self.locked
+ return self.locked and (not self.lock_user or self.lock_user != user)
+
+ def merge(self, item, keep_old=False):
+ merge_model_objects(self, item, keep_old=keep_old)
+
+ def public_representation(self):
+ return {}
+
+ def duplicate(self, user=None, data=None):
+ return duplicate_item(self, user, data)
+
+ def update_external_id(self, save=False):
+ if not self.EXTERNAL_ID_KEY or (
+ self.external_id and
+ not getattr(self, 'auto_external_id', False)):
+ return
+ external_id = get_external_id(self.EXTERNAL_ID_KEY, self)
+ if external_id == self.external_id:
+ return
+ self.auto_external_id = True
+ self.external_id = external_id
+ self._cached_label_checked = False
+ if save:
+ self.skip_history_when_saving = True
+ self.save()
+ return external_id
+
+ def get_last_history_date(self):
+ q = self.history.values("history_date").order_by('-history_date')
+ if not q.count():
+ return
+ return q.all()[0]['history_date']
+
+ def get_previous(self, step=None, date=None, strict=False):
+ """
+ Get a "step" previous state of the item
+ """
+ assert step or date
+ historized = self.history.all()
+ item = None
+ if step:
+ if len(historized) <= step:
+ # silently return the last step if too far in the history
+ item = historized[len(historized) - 1]
+ else:
+ item = historized[step]
+ else:
+ for step, item in enumerate(historized):
+ if item.history_date == date:
+ break
+ # ended with no match
+ if item.history_date != date:
+ return
+ item._step = step
+ if len(historized) != (step + 1):
+ item._previous = historized[step + 1].history_date
+ else:
+ item._previous = None
+ if step > 0:
+ item._next = historized[step - 1].history_date
+ else:
+ item._next = None
+ item.history_date = historized[step].history_date
+ model = self.__class__
+ for k in get_all_field_names(model):
+ field = model._meta.get_field(k)
+ if hasattr(field, 'rel') and field.rel:
+ if not hasattr(item, k + '_id'):
+ setattr(item, k, getattr(self, k))
+ continue
+ val = getattr(item, k + '_id')
+ if not val:
+ setattr(item, k, None)
+ continue
+ try:
+ val = field.rel.to.objects.get(pk=val)
+ setattr(item, k, val)
+ except ObjectDoesNotExist:
+ if strict:
+ raise HistoryError("The class %s has no pk %d" % (
+ str(field.rel.to), val))
+ setattr(item, k, None)
+ item.pk = self.pk
+ return item
+
+ @property
+ def last_edition_date(self):
+ try:
+ return self.history.order_by('-history_date').all()[0].history_date
+ except (AttributeError, IndexError):
+ return
+
+ @property
+ def history_creation_date(self):
+ try:
+ return self.history.order_by('history_date').all()[0].history_date
+ except (AttributeError, IndexError):
+ return
+
+ def rollback(self, date):
+ """
+ Rollback to a previous state
+ """
+ to_del, new_item = [], None
+ for item in self.history.all():
+ if item.history_date == date:
+ new_item = item
+ break
+ to_del.append(item)
+ if not new_item:
+ raise HistoryError("The date to rollback to doesn't exist.")
+ try:
+ field_keys = [f.name for f in self._meta.fields]
+ for k in field_keys:
+ if k != 'id' and hasattr(self, k):
+ if not hasattr(new_item, k):
+ k = k + "_id"
+ setattr(self, k, getattr(new_item, k))
+
+ try:
+ self.history_modifier = User.objects.get(
+ pk=new_item.history_modifier_id)
+ except User.ObjectDoesNotExist:
+ pass
+ self.save()
+ saved_m2m = new_item.history_m2m.copy()
+ for hist_key in self.HISTORICAL_M2M:
+ # after each association m2m is rewrite - force the original
+ # to be reset
+ new_item.history_m2m = saved_m2m
+ values = new_item.m2m_listing(hist_key, create=True) or []
+ hist_field = getattr(self, hist_key)
+ hist_field.clear()
+ for val in values:
+ hist_field.add(val)
+ # force label regeneration
+ self._cached_label_checked = False
+ self.save()
+ except ObjectDoesNotExist:
+ raise HistoryError("The rollback has failed.")
+ # clean the obsolete history
+ for historized_item in to_del:
+ historized_item.delete()
+
+ def m2m_listing(self, key):
+ return getattr(self, key).all()
+
+ def values(self):
+ values = {}
+ for f in self._meta.fields:
+ k = f.name
+ if k != 'id':
+ values[k] = getattr(self, k)
+ return values
+
+ def get_absolute_url(self):
+ try:
+ return reverse('display-item', args=[self.SLUG, self.pk])
+ except NoReverseMatch:
+ return
+
+ def get_show_url(self):
+ show_url = self.SHOW_URL
+ if not show_url:
+ show_url = 'show-' + self.__class__.__name__.lower()
+ try:
+ return reverse(show_url, args=[self.pk, ''])
+ except NoReverseMatch:
+ return
+
+ @property
+ def associated_filename(self):
+ if [True for attr in ('get_town_label', 'get_department', 'reference',
+ 'short_class_name') if not hasattr(self, attr)]:
+ return ''
+ items = [slugify(self.get_department()),
+ slugify(self.get_town_label()).upper(),
+ slugify(self.short_class_name),
+ slugify(self.reference),
+ slugify(self.name or '').replace('-', '_').capitalize()]
+ last_edition_date = self.last_edition_date
+ if last_edition_date:
+ items.append(last_edition_date.strftime('%Y%m%d'))
+ else:
+ items.append('00000000')
+ return "-".join([str(item) for item in items])
+
+ def save(self, *args, **kwargs):
+ created = not self.pk
+ if not getattr(self, 'skip_history_when_saving', False):
+ assert hasattr(self, 'history_modifier')
+ if created:
+ self.history_creator = self.history_modifier
+ # external ID can have related item not available before save
+ external_id_updated = kwargs.pop('external_id_updated') \
+ if 'external_id_updated' in kwargs else False
+ if not created and not external_id_updated:
+ self.update_external_id()
+ super(BaseHistorizedItem, self).save(*args, **kwargs)
+ if created and self.update_external_id():
+ # force resave for external ID creation
+ self.skip_history_when_saving = True
+ self._updated_id = True
+ return self.save(external_id_updated=True)
+ for dep in self.EXTERNAL_ID_DEPENDENCIES:
+ for obj in getattr(self, dep).all():
+ obj.update_external_id(save=True)
+ self.fix_associated()
+ return True
+
+
+class LightHistorizedItem(BaseHistorizedItem):
+ history_date = models.DateTimeField(default=datetime.datetime.now)
+
+ class Meta:
+ abstract = True
+
+ def save(self, *args, **kwargs):
+ super(LightHistorizedItem, self).save(*args, **kwargs)
+ return self
+
+
+class OwnPerms(object):
+ """
+ Manage special permissions for object's owner
+ """
+
+ @classmethod
+ def get_query_owns(cls, ishtaruser):
+ """
+ Query object to get own items
+ """
+ return None # implement for each object
+
+ def can_view(self, request):
+ if hasattr(self, "LONG_SLUG"):
+ perm = "view_" + self.LONG_SLUG
+ else:
+ perm = "view_" + self.SLUG
+ return self.can_do(request, perm)
+
+ def can_do(self, request, action_name):
+ """
+ Check permission availability for the current object.
+ :param request: request object
+ :param action_name: action name eg: "change_find" - "own" variation is
+ checked
+ :return: boolean
+ """
+ if not getattr(request.user, 'ishtaruser', None):
+ return False
+ splited = action_name.split('_')
+ action_own_name = splited[0] + '_own_' + '_'.join(splited[1:])
+ user = request.user
+ if action_own_name == "view_own_findbasket":
+ action_own_name = "view_own_find"
+ return user.ishtaruser.has_right(action_name, request.session) or \
+ (user.ishtaruser.has_right(action_own_name, request.session)
+ and self.is_own(user.ishtaruser))
+
+ def is_own(self, user, alt_query_own=None):
+ """
+ Check if the current object is owned by the user
+ """
+ IshtarUser = apps.get_model("ishtar_common", "IshtarUser")
+ if isinstance(user, IshtarUser):
+ ishtaruser = user
+ elif hasattr(user, 'ishtaruser'):
+ ishtaruser = user.ishtaruser
+ else:
+ return False
+ if not alt_query_own:
+ query = self.get_query_owns(ishtaruser)
+ else:
+ query = getattr(self, alt_query_own)(ishtaruser)
+ if not query:
+ return False
+ query &= Q(pk=self.pk)
+ return self.__class__.objects.filter(query).count()
+
+ @classmethod
+ def has_item_of(cls, user):
+ """
+ Check if the user own some items
+ """
+ IshtarUser = apps.get_model("ishtar_common", "IshtarUser")
+ if isinstance(user, IshtarUser):
+ ishtaruser = user
+ elif hasattr(user, 'ishtaruser'):
+ ishtaruser = user.ishtaruser
+ else:
+ return False
+ query = cls.get_query_owns(ishtaruser)
+ if not query:
+ return False
+ return cls.objects.filter(query).count()
+
+ @classmethod
+ def _return_get_owns(cls, owns, values, get_short_menu_class,
+ label_key='cached_label'):
+ if not owns:
+ return []
+ sorted_values = []
+ if hasattr(cls, 'BASKET_MODEL'):
+ owns_len = len(owns)
+ for idx, item in enumerate(reversed(owns)):
+ if get_short_menu_class:
+ item = item[0]
+ if type(item) == cls.BASKET_MODEL:
+ basket = owns.pop(owns_len - idx - 1)
+ sorted_values.append(basket)
+ sorted_values = list(reversed(sorted_values))
+ if not values:
+ if not get_short_menu_class:
+ return sorted_values + list(
+ sorted(owns, key=lambda x: getattr(x, label_key) or ""))
+ return sorted_values + list(
+ sorted(owns, key=lambda x: getattr(x[0], label_key) or ""))
+ if not get_short_menu_class:
+ return sorted_values + list(
+ sorted(owns, key=lambda x: x[label_key] or ""))
+ return sorted_values + list(
+ sorted(owns, key=lambda x: x[0][label_key] or ""))
+
+ @classmethod
+ def get_owns(cls, user, replace_query=None, limit=None, values=None,
+ get_short_menu_class=False, menu_filtr=None):
+ """
+ Get Own items
+ """
+ if not replace_query:
+ replace_query = {}
+ if hasattr(user, 'is_authenticated') and not user.is_authenticated():
+ returned = cls.objects.filter(pk__isnull=True)
+ if values:
+ returned = []
+ return returned
+ IshtarUser = apps.get_model("ishtar_common", "IshtarUser")
+ if isinstance(user, User):
+ try:
+ ishtaruser = IshtarUser.objects.get(user_ptr=user)
+ except IshtarUser.DoesNotExist:
+ returned = cls.objects.filter(pk__isnull=True)
+ if values:
+ returned = []
+ return returned
+ elif isinstance(user, IshtarUser):
+ ishtaruser = user
+ else:
+ if values:
+ return []
+ return cls.objects.filter(pk__isnull=True)
+ items = []
+ if hasattr(cls, 'BASKET_MODEL'):
+ items = list(cls.BASKET_MODEL.objects.filter(user=ishtaruser).all())
+ query = cls.get_query_owns(ishtaruser)
+ if not query and not replace_query:
+ returned = cls.objects.filter(pk__isnull=True)
+ if values:
+ returned = []
+ return returned
+ if query:
+ q = cls.objects.filter(query)
+ else: # replace_query
+ q = cls.objects.filter(replace_query)
+ if values:
+ q = q.values(*values)
+ if limit:
+ items += list(q.order_by('-pk')[:limit])
+ else:
+ items += list(q.order_by(*cls._meta.ordering).all())
+ if get_short_menu_class:
+ if values:
+ if 'id' not in values:
+ raise NotImplementedError(
+ "Call of get_owns with get_short_menu_class option and"
+ " no 'id' in values is not implemented")
+ my_items = []
+ for i in items:
+ if hasattr(cls, 'BASKET_MODEL') and \
+ type(i) == cls.BASKET_MODEL:
+ dct = dict([(k, getattr(i, k)) for k in values])
+ my_items.append(
+ (dct, cls.BASKET_MODEL.get_short_menu_class(i.pk)))
+ else:
+ my_items.append((i, cls.get_short_menu_class(i['id'])))
+ items = my_items
+ else:
+ items = [(i, cls.get_short_menu_class(i.pk)) for i in items]
+ return items
+
+ @classmethod
+ def _get_query_owns_dicts(cls, ishtaruser):
+ """
+ List of query own dict to construct the query.
+ Each dict are join with an AND operator, each dict key, values are
+ joined with OR operator
+ """
+ return []
+
+ @classmethod
+ def _construct_query_own(cls, prefix, dct_list):
+ q = None
+ for subquery_dict in dct_list:
+ subquery = None
+ for k in subquery_dict:
+ subsubquery = Q(**{prefix + k: subquery_dict[k]})
+ if subquery:
+ subquery |= subsubquery
+ else:
+ subquery = subsubquery
+ if not subquery:
+ continue
+ if q:
+ q &= subquery
+ else:
+ q = subquery
+ return q
+
+
+class NumberManager(models.Manager):
+ def get_by_natural_key(self, number):
+ return self.get(number=number)
+
+
+class State(models.Model):
+ label = models.CharField(_("Label"), max_length=30)
+ number = models.CharField(_("Number"), unique=True, max_length=3)
+ objects = NumberManager()
+
+ class Meta:
+ verbose_name = _("State")
+ ordering = ['number']
+
+ def __str__(self):
+ return self.label
+
+ def natural_key(self):
+ return (self.number,)
+
+
+class Department(models.Model):
+ label = models.CharField(_("Label"), max_length=30)
+ number = models.CharField(_("Number"), unique=True, max_length=3)
+ state = models.ForeignKey(
+ 'State', verbose_name=_("State"), blank=True, null=True,
+ on_delete=models.SET_NULL,
+ )
+ objects = NumberManager()
+
+ class Meta:
+ verbose_name = _("Department")
+ verbose_name_plural = _("Departments")
+ ordering = ['number']
+
+ def __str__(self):
+ return self.label
+
+ def natural_key(self):
+ return (self.number,)
+
+ def history_compress(self):
+ return self.number
+
+ @classmethod
+ def history_decompress(cls, full_value, create=False):
+ if not full_value:
+ return []
+ res = []
+ for value in full_value:
+ try:
+ res.append(cls.objects.get(number=value))
+ except cls.DoesNotExist:
+ continue
+ return res
+
+
+class Arrondissement(models.Model):
+ name = models.CharField("Nom", max_length=30)
+ department = models.ForeignKey(Department, verbose_name="Département")
+
+ def __str__(self):
+ return settings.JOINT.join((self.name, str(self.department)))
+
+
+class Canton(models.Model):
+ name = models.CharField("Nom", max_length=30)
+ arrondissement = models.ForeignKey(Arrondissement,
+ verbose_name="Arrondissement")
+
+ def __str__(self):
+ return settings.JOINT.join(
+ (self.name, str(self.arrondissement)))
+
+
+class TownManager(models.GeoManager):
+ def get_by_natural_key(self, numero_insee, year):
+ return self.get(numero_insee=numero_insee, year=year)
+
+
+class Town(Imported, models.Model):
+ name = models.CharField(_("Name"), max_length=100)
+ surface = models.IntegerField(_("Surface (m2)"), blank=True, null=True)
+ center = models.PointField(_("Localisation"), srid=settings.SRID,
+ blank=True, null=True)
+ limit = models.MultiPolygonField(_("Limit"), blank=True, null=True)
+ numero_insee = models.CharField("Code commune (numéro INSEE)",
+ max_length=120)
+ departement = models.ForeignKey(
+ Department, verbose_name=_("Department"),
+ on_delete=models.SET_NULL, null=True, blank=True)
+ year = models.IntegerField(
+ _("Year of creation"), null=True, blank=True,
+ help_text=_("Filling this field is relevant to distinguish old towns "
+ "from new towns."))
+ children = models.ManyToManyField(
+ 'Town', verbose_name=_("Town children"), blank=True,
+ related_name='parents')
+ cached_label = models.CharField(_("Cached name"), max_length=500,
+ null=True, blank=True, db_index=True)
+ objects = TownManager()
+
+ class Meta:
+ verbose_name = _("Town")
+ verbose_name_plural = _("Towns")
+ if settings.COUNTRY == 'fr':
+ ordering = ['numero_insee']
+ unique_together = (('numero_insee', 'year'),)
+
+ def natural_key(self):
+ return (self.numero_insee, self.year)
+
+ def history_compress(self):
+ values = {'numero_insee': self.numero_insee,
+ 'year': self.year or ""}
+ return values
+
+ def get_values(self, prefix='', no_values=False, no_base_finds=True):
+ return {
+ prefix or "label": str(self),
+ prefix + "name": self.name,
+ prefix + "numero_insee": self.numero_insee
+ }
+
+ @classmethod
+ def history_decompress(cls, full_value, create=False):
+ if not full_value:
+ return []
+ res = []
+ for value in full_value:
+ try:
+ res.append(
+ cls.objects.get(numero_insee=value['numero_insee'],
+ year=value['year'] or None))
+ except cls.DoesNotExist:
+ continue
+ return res
+
+ def __str__(self):
+ return self.cached_label or ""
+
+ @property
+ def label_with_areas(self):
+ label = [self.name]
+ if self.numero_insee:
+ label.append("({})".format(self.numero_insee))
+ for area in self.areas.all():
+ label.append(" - ")
+ label.append(area.full_label)
+ return " ".join(label)
+
+ def generate_geo(self, force=False):
+ force = self.generate_limit(force=force)
+ self.generate_center(force=force)
+ self.generate_area(force=force)
+
+ def generate_limit(self, force=False):
+ if not force and self.limit:
+ return
+ parents = None
+ if not self.parents.count():
+ return
+ for parent in self.parents.all():
+ if not parent.limit:
+ return
+ if not parents:
+ parents = parent.limit
+ else:
+ parents = parents.union(parent.limit)
+ # if union is a simple polygon make it a multi
+ if 'MULTI' not in parents.wkt:
+ parents = parents.wkt.replace('POLYGON', 'MULTIPOLYGON(') + ")"
+ if not parents:
+ return
+ self.limit = parents
+ self.save()
+ return True
+
+ def generate_center(self, force=False):
+ if not force and (self.center or not self.limit):
+ return
+ self.center = self.limit.centroid
+ if not self.center:
+ return False
+ self.save()
+ return True
+
+ def generate_area(self, force=False):
+ if not force and (self.surface or not self.limit):
+ return
+ surface = self.limit.transform(settings.SURFACE_SRID,
+ clone=True).area
+ if surface > 214748364 or not surface:
+ return False
+ self.surface = surface
+ self.save()
+ return True
+
+ def update_town_code(self):
+ if not self.numero_insee or not self.children.count() or not self.year:
+ return
+ old_num = self.numero_insee[:]
+ numero = old_num.split('-')[0]
+ self.numero_insee = "{}-{}".format(numero, self.year)
+ if self.numero_insee != old_num:
+ return True
+
+ def _generate_cached_label(self):
+ cached_label = self.name
+ if settings.COUNTRY == "fr" and self.numero_insee:
+ dpt_len = 2
+ if self.numero_insee.startswith('97') or \
+ self.numero_insee.startswith('98') or \
+ self.numero_insee[0] not in ('0', '1', '2', '3', '4', '5',
+ '6', '7', '8', '9'):
+ dpt_len = 3
+ cached_label = "%s - %s" % (self.name, self.numero_insee[:dpt_len])
+ if self.year and self.children.count():
+ cached_label += " ({})".format(self.year)
+ return cached_label
+
+
+def post_save_town(sender, **kwargs):
+ cached_label_changed(sender, **kwargs)
+ town = kwargs['instance']
+ town.generate_geo()
+ if town.update_town_code():
+ town.save()
+
+
+post_save.connect(post_save_town, sender=Town)
+
+
+def town_child_changed(sender, **kwargs):
+ town = kwargs['instance']
+ if town.update_town_code():
+ town.save()
+
+
+m2m_changed.connect(town_child_changed, sender=Town.children.through)
+
+
+class Address(BaseHistorizedItem):
+ FIELDS = (
+ "address", "address_complement", "postal_code", "town",
+ "precise_town", "country",
+ "alt_address", "alt_address_complement", "alt_postal_code", "alt_town",
+ "alt_country",
+ "phone", "phone_desc", "phone2", "phone_desc2", "phone3", "phone_desc3",
+ "raw_phone", "mobile_phone", "email", "alt_address_is_prefered"
+ )
+ address = models.TextField(_("Address"), null=True, blank=True)
+ address_complement = models.TextField(_("Address complement"), null=True,
+ blank=True)
+ postal_code = models.CharField(_("Postal code"), max_length=10, null=True,
+ blank=True)
+ town = models.CharField(_("Town (freeform)"), max_length=150, null=True,
+ blank=True)
+ precise_town = models.ForeignKey(
+ Town, verbose_name=_("Town (precise)"), null=True,
+ blank=True)
+ country = models.CharField(_("Country"), max_length=30, null=True,
+ blank=True)
+ alt_address = models.TextField(_("Other address: address"), null=True,
+ blank=True)
+ alt_address_complement = models.TextField(
+ _("Other address: address complement"), null=True, blank=True)
+ alt_postal_code = models.CharField(_("Other address: postal code"),
+ max_length=10, null=True, blank=True)
+ alt_town = models.CharField(_("Other address: town"), max_length=70,
+ null=True, blank=True)
+ alt_country = models.CharField(_("Other address: country"),
+ max_length=30, null=True, blank=True)
+ phone = models.CharField(_("Phone"), max_length=18, null=True, blank=True)
+ phone_desc = models.CharField(_("Phone description"), max_length=300,
+ null=True, blank=True)
+ phone2 = models.CharField(_("Phone description 2"), max_length=18,
+ null=True, blank=True)
+ phone_desc2 = models.CharField(_("Phone description 2"), max_length=300,
+ null=True, blank=True)
+ phone3 = models.CharField(_("Phone 3"), max_length=18, null=True,
+ blank=True)
+ phone_desc3 = models.CharField(_("Phone description 3"), max_length=300,
+ null=True, blank=True)
+ raw_phone = models.TextField(_("Raw phone"), blank=True, null=True)
+ mobile_phone = models.CharField(_("Mobile phone"), max_length=18,
+ null=True, blank=True)
+ email = models.EmailField(
+ _("Email"), max_length=300, blank=True, null=True)
+ alt_address_is_prefered = models.BooleanField(
+ _("Alternative address is prefered"), default=False)
+ history = HistoricalRecords(inherit=True)
+ SUB_ADDRESSES = []
+
+ class Meta:
+ abstract = True
+
+ def get_short_html_items(self):
+ items = []
+ if self.address:
+ items.append(
+ """<span class="subadress">{}</span>""".format(self.address))
+ if self.address_complement:
+ items.append(
+ """<span class="subadress-complement">{}</span>""".format(
+ self.address_complement))
+ if self.postal_code:
+ items.append(
+ """<span class="postal-code">{}</span>""".format(
+ self.postal_code))
+ if self.precise_town:
+ items.append(
+ """<span class="town">{}</span>""".format(
+ self.precise_town.name))
+ elif self.town:
+ items.append(
+ """<span class="town">{}</span>""".format(
+ self.town))
+ if self.country:
+ items.append(
+ """<span class="country">{}</span>""".format(
+ self.country))
+ return items
+
+ def get_short_html_detail(self):
+ html = """<div class="address">"""
+ items = self.get_short_html_items()
+ if not items:
+ items = [
+ "<span class='no-address'>{}</span>".format(
+ _("No associated address")
+ )
+ ]
+ html += "".join(items)
+ html += """</div>"""
+ return html
+
+ def get_town_centroid(self):
+ if self.precise_town:
+ return self.precise_town.center, self._meta.verbose_name
+ for sub_address in self.SUB_ADDRESSES:
+ sub_item = getattr(self, sub_address)
+ if sub_item and sub_item.precise_town:
+ return sub_item.precise_town.center, sub_item._meta.verbose_name
+
+ def get_town_polygons(self):
+ if self.precise_town:
+ return self.precise_town.limit, self._meta.verbose_name
+ for sub_address in self.SUB_ADDRESSES:
+ sub_item = getattr(self, sub_address)
+ if sub_item and sub_item.precise_town:
+ return sub_item.precise_town.limit, sub_item._meta.verbose_name
+
+ def get_attribute(self, attr):
+ if self.town or self.precise_town:
+ return getattr(self, attr)
+ for sub_address in self.SUB_ADDRESSES:
+ sub_item = getattr(self, sub_address)
+ if not sub_item:
+ continue
+ if sub_item.town or sub_item.precise_town:
+ return getattr(sub_item, attr)
+ return getattr(self, attr)
+
+ def get_address(self):
+ return self.get_attribute("address")
+
+ def get_address_complement(self):
+ return self.get_attribute("address_complement")
+
+ def get_postal_code(self):
+ return self.get_attribute("postal_code")
+
+ def get_town(self):
+ return self.get_attribute("town")
+
+ def get_precise_town(self):
+ return self.get_attribute("precise_town")
+
+ def get_country(self):
+ return self.get_attribute("country")
+
+ def simple_lbl(self):
+ return str(self)
+
+ def full_address(self):
+ lbl = self.simple_lbl()
+ if lbl:
+ lbl += "\n"
+ lbl += self.address_lbl()
+ return lbl
+
+ def address_lbl(self):
+ lbl = u''
+ prefix = ''
+ if self.alt_address_is_prefered:
+ prefix = 'alt_'
+ if getattr(self, prefix + 'address'):
+ lbl += getattr(self, prefix + 'address')
+ if getattr(self, prefix + 'address_complement'):
+ if lbl:
+ lbl += "\n"
+ lbl += getattr(self, prefix + 'address_complement')
+ postal_code = getattr(self, prefix + 'postal_code')
+ town = getattr(self, prefix + 'town')
+ if postal_code or town:
+ if lbl:
+ lbl += "\n"
+ lbl += "{}{}{}".format(
+ postal_code or '',
+ " " if postal_code and town else '',
+ town or '')
+ if self.phone:
+ if lbl:
+ lbl += "\n"
+ lbl += "{} {}".format(str(_("Tel: ")), self.phone)
+ if self.mobile_phone:
+ if lbl:
+ lbl += "\n"
+ lbl += "{} {}".format(str(_("Mobile: ")), self.mobile_phone)
+ if self.email:
+ if lbl:
+ lbl += "\n"
+ lbl += "{} {}".format(str(_("Email: ")), self.email)
+ return lbl
+
+
+class Merge(models.Model):
+ merge_key = models.TextField(_("Merge key"), blank=True, null=True)
+ merge_candidate = models.ManyToManyField("self", blank=True)
+ merge_exclusion = models.ManyToManyField("self", blank=True)
+ archived = models.NullBooleanField(default=False,
+ blank=True, null=True)
+ # 1 for one word similarity, 2 for two word similarity, etc.
+ MERGE_CLEMENCY = None
+ EMPTY_MERGE_KEY = '--'
+ MERGE_ATTRIBUTE = "name"
+
+ class Meta:
+ abstract = True
+
+ def generate_merge_key(self):
+ if self.archived:
+ return
+ merge_attr = getattr(self, self.MERGE_ATTRIBUTE)
+ self.merge_key = slugify(merge_attr if merge_attr else '')
+ if not self.merge_key:
+ self.merge_key = self.EMPTY_MERGE_KEY
+ self.merge_key = self.merge_key
+
+ def generate_merge_candidate(self):
+ if self.archived:
+ return
+ if not self.merge_key:
+ self.generate_merge_key()
+ self.save(merge_key_generated=True)
+ if not self.pk or self.merge_key == self.EMPTY_MERGE_KEY:
+ return
+ q = self.__class__.objects \
+ .exclude(pk=self.pk) \
+ .exclude(merge_exclusion=self) \
+ .exclude(merge_candidate=self) \
+ .exclude(archived=True)
+ if not self.MERGE_CLEMENCY:
+ q = q.filter(merge_key=self.merge_key)
+ else:
+ subkeys_front = "-".join(
+ self.merge_key.split('-')[:self.MERGE_CLEMENCY])
+ subkeys_back = "-".join(
+ self.merge_key.split('-')[-self.MERGE_CLEMENCY:])
+ q = q.filter(Q(merge_key__istartswith=subkeys_front) |
+ Q(merge_key__iendswith=subkeys_back))
+ for item in q.all():
+ self.merge_candidate.add(item)
+
+ def save(self, *args, **kwargs):
+ # prevent circular save
+ merge_key_generated = False
+ if 'merge_key_generated' in kwargs:
+ merge_key_generated = kwargs.pop('merge_key_generated')
+ self.generate_merge_key()
+ item = super(Merge, self).save(*args, **kwargs)
+ if not merge_key_generated:
+ self.merge_candidate.clear()
+ self.generate_merge_candidate()
+ return item
+
+ def archive(self):
+ self.archived = True
+ self.save()
+ self.merge_candidate.clear()
+ self.merge_exclusion.clear()
+
+ def merge(self, item, keep_old=False, exclude_fields=None):
+ merge_model_objects(self, item, keep_old=keep_old,
+ exclude_fields=exclude_fields)
+ self.generate_merge_candidate()
+
+
+
+def __get_stats_cache_values(model_name, model_pk):
+ StatsCache = apps.get_model("ishtar_common", "StatsCache")
+ q = StatsCache.objects.filter(
+ model=model_name, model_pk=model_pk
+ )
+ nb = q.count()
+ if nb >= 1:
+ sc = q.all()[0]
+ for extra in q.order_by("-id").all()[1:]:
+ extra.delete()
+ else:
+ sc = StatsCache.objects.create(
+ model=model_name, model_pk=model_pk
+ )
+ values = sc.values
+ if not values:
+ values = {}
+ return sc, values
+
+
+@task()
+def _update_stats(app, model, model_pk, funcname):
+ model_name = app + "." + model
+ model = apps.get_model(app, model)
+ try:
+ item = model.objects.get(pk=model_pk)
+ except model.DoesNotExist:
+ return
+ value = getattr(item, funcname)()
+ sc, current_values = __get_stats_cache_values(model_name, model_pk)
+ current_values[funcname] = value
+ sc.values = current_values
+ sc.update_requested = None
+ sc.updated = datetime.datetime.now()
+ sc.save()
+
+def update_stats(statscache, item, funcname):
+ if not settings.USE_BACKGROUND_TASK:
+ current_values = statscache.values
+ if not current_values:
+ current_values = {}
+ value = getattr(item, funcname)()
+ current_values[funcname] = value
+ statscache.values = current_values
+ statscache.updated = datetime.datetime.now()
+ statscache.save()
+ return current_values
+
+ now = datetime.datetime.now()
+ app_name = item._meta.app_label
+ model_name = item._meta.model_name
+ statscache.update_requested = now.isoformat()
+ statscache.save()
+ _update_stats.delay(app_name, model_name, item.pk, funcname)
+ return statscache.values
+
+
+class DashboardFormItem:
+ """
+ Provide methods to manage statistics
+ """
+
+ def last_stats_update(self):
+ model_name = self._meta.app_label + "." + self._meta.model_name
+ StatsCache = apps.get_model("ishtar_common", "StatsCache")
+ q = StatsCache.objects.filter(
+ model=model_name, model_pk=self.pk).order_by("-updated")
+ if not q.count():
+ return
+ return q.all()[0].updated
+
+ def _get_or_set_stats(self, funcname, update=False,
+ expected_type=None):
+ model_name = self._meta.app_label + "." + self._meta.model_name
+ StatsCache = apps.get_model("ishtar_common", "StatsCache")
+ sc, __ = StatsCache.objects.get_or_create(
+ model=model_name, model_pk=self.pk
+ )
+ if not update:
+ values = sc.values
+ if funcname not in values:
+ if expected_type is not None:
+ return expected_type()
+ return 0
+ else:
+ values = update_stats(sc, self, funcname)
+ if funcname in values:
+ values = values[funcname]
+ else:
+ values = 0
+ if expected_type is not None and not isinstance(values, expected_type):
+ return expected_type()
+ return values
+
+ @classmethod
+ def get_periods(cls, slice='month', fltr={}, date_source='creation'):
+ date_var = date_source + '_date'
+ q = cls.objects.filter(**{date_var + '__isnull': False})
+ if fltr:
+ q = q.filter(**fltr)
+ if slice == 'year':
+ return [res[date_var].year for res in list(q.values(date_var)
+ .annotate(
+ Count("id")).order_by())]
+ elif slice == 'month':
+ return [(res[date_var].year, res[date_var].month)
+ for res in list(q.values(date_var)
+ .annotate(Count("id")).order_by())]
+ return []
+
+ @classmethod
+ def get_by_year(cls, year, fltr={}, date_source='creation'):
+ date_var = date_source + '_date'
+ q = cls.objects.filter(**{date_var + '__isnull': False})
+ if fltr:
+ q = q.filter(**fltr)
+ return q.filter(
+ **{date_var + '__year': year}).order_by('pk').distinct('pk')
+
+ @classmethod
+ def get_by_month(cls, year, month, fltr={}, date_source='creation'):
+ date_var = date_source + '_date'
+ q = cls.objects.filter(**{date_var + '__isnull': False})
+ if fltr:
+ q = q.filter(**fltr)
+ q = q.filter(
+ **{date_var + '__year': year, date_var + '__month': month})
+ return q.order_by('pk').distinct('pk')
+
+ @classmethod
+ def get_total_number(cls, fltr=None):
+ q = cls.objects
+ if fltr:
+ q = q.filter(**fltr)
+ return q.order_by('pk').distinct('pk').count()
+
+
+class DocumentItem:
+ ALT_NAMES = {
+ 'documents__image__isnull':
+ SearchAltName(
+ pgettext_lazy("key for text search", "has-image"),
+ 'documents__image__isnull'),
+ 'documents__associated_url__isnull':
+ SearchAltName(
+ pgettext_lazy("key for text search", "has-url"),
+ 'documents__associated_url__isnull'),
+ 'documents__associated_file__isnull':
+ SearchAltName(
+ pgettext_lazy("key for text search", "has-attached-file"),
+ 'documents__associated_file__isnull'),
+ }
+
+ def public_representation(self):
+ images = []
+ if getattr(self, "main_image", None):
+ images.append(self.main_image.public_representation())
+ images += [
+ image.public_representation()
+ for image in self.images_without_main_image.all()
+ ]
+ return {"images": images}
+
+ @property
+ def images(self):
+ if not hasattr(self, 'documents'):
+ Document = apps.get_model("ishtar_common", "Document")
+ return Document.objects.none()
+ return self.documents.filter(
+ image__isnull=False).exclude(image="").order_by("pk")
+
+ @property
+ def images_without_main_image(self):
+ if not hasattr(self, 'main_image') or not hasattr(self, 'documents'):
+ return self.images
+ if not self.main_image:
+ return self.documents.filter(
+ image__isnull=False).exclude(
+ image="").order_by("pk")
+ return self.documents.filter(
+ image__isnull=False).exclude(
+ image="").exclude(pk=self.main_image.pk).order_by("pk")
+
+ def get_extra_actions(self, request):
+ """
+ For sheet template: return "Add document / image" action
+ """
+ # url, base_text, icon, extra_text, extra css class, is a quick action
+ try:
+ actions = super(DocumentItem, self).get_extra_actions(request)
+ except AttributeError:
+ actions = []
+
+ if not hasattr(self, 'SLUG'):
+ return actions
+
+ can_add_doc = self.can_do(request, 'add_document')
+ if can_add_doc and (
+ not hasattr(self, "is_locked") or
+ not self.is_locked(request.user)):
+ actions += [
+ (
+ reverse("create-document") + "?{}={}".format(
+ self.SLUG, self.pk),
+ _("Add document/image"),
+ "fa fa-plus",
+ _("doc./image"),
+ "",
+ False
+ )
+ ]
+ return actions
+
+
+def document_attached_changed(sender, **kwargs):
+ # associate a default main image
+ instance = kwargs.get("instance", None)
+ model = kwargs.get("model", None)
+ pk_set = kwargs.get("pk_set", None)
+ if not instance or not model:
+ return
+
+ if hasattr(instance, "documents"):
+ items = [instance]
+ else:
+ if not pk_set:
+ return
+ try:
+ items = [model.objects.get(pk=pk) for pk in pk_set]
+ except model.DoesNotExist:
+ return
+
+ for item in items:
+ q = item.documents.filter(
+ image__isnull=False).exclude(image='')
+ if item.main_image:
+ if q.filter(pk=item.main_image.pk).count():
+ return
+ # the association has disappear not the main image anymore
+ item.main_image = None
+ item.skip_history_when_saving = True
+ item.save()
+ if not q.count():
+ return
+ # by default get the lowest pk
+ item.main_image = q.order_by('pk').all()[0]
+ item.skip_history_when_saving = True
+ item.save()
+
+
+class QuickAction:
+ """
+ Quick action available from tables
+ """
+
+ def __init__(self, url, icon_class='', text='', target=None, rights=None,
+ module=None):
+ self.url = url
+ self.icon_class = icon_class
+ self.text = text
+ self.rights = rights
+ self.target = target
+ self.module = module
+ assert self.target in ('one', 'many', None)
+
+ def is_available(self, user, session=None, obj=None):
+ if self.module and not getattr(get_current_profile(), self.module):
+ return False
+ if not self.rights: # no restriction
+ return True
+ if not user or not hasattr(user, 'ishtaruser') or not user.ishtaruser:
+ return False
+ user = user.ishtaruser
+
+ for right in self.rights:
+ if user.has_perm(right, session=session, obj=obj):
+ return True
+ return False
+
+ @property
+ def rendered_icon(self):
+ if not self.icon_class:
+ return ""
+ return "<i class='{}' aria-hidden='true'></i>".format(self.icon_class)
+
+ @property
+ def base_url(self):
+ if self.target is None:
+ url = reverse(self.url)
+ else:
+ # put arbitrary pk for the target
+ url = reverse(self.url, args=[0])
+ url = url[:-2] # all quick action url have to finish with the
+ # pk of the selected item and a "/"
+ return url
+
+
+class DynamicRequest:
+ def __init__(self, label, app_name, model_name, form_key, search_key,
+ type_query, search_query):
+ self.label = label
+ self.form_key = form_key
+ self.search_key = search_key
+ self.app_name = app_name
+ self.model_name = model_name
+ self.type_query = type_query
+ self.search_query = search_query
+
+ def get_all_types(self):
+ model = apps.get_app_config(self.app_name).get_model(self.model_name)
+ return model.objects.filter(available=True)
+
+ def get_form_fields(self):
+ fields = {}
+ for item in self.get_all_types().all():
+ fields[self.form_key + "-" + item.txt_idx] = forms.CharField(
+ label=str(self.label) + " " + str(item),
+ required=False
+ )
+ return fields
+
+ def get_extra_query(self, slug):
+ return {
+ self.type_query: slug
+ }
+
+ def get_alt_names(self):
+ alt_names = {}
+ for item in self.get_all_types().all():
+ alt_names[self.form_key + "-" + item.txt_idx] = SearchAltName(
+ self.search_key + "-" + item.txt_idx, self.search_query,
+ self.get_extra_query(item.txt_idx), distinct_query=True
+ )
+ return alt_names
+
+
+class SpatialReferenceSystem(GeneralType):
+ order = models.IntegerField(_("Order"), default=10)
+ auth_name = models.CharField(
+ _("Authority name"), default=u'EPSG', max_length=256)
+ srid = models.IntegerField(_("Authority SRID"))
+
+ class Meta:
+ verbose_name = _("Spatial reference system")
+ verbose_name_plural = _("Spatial reference systems")
+ ordering = ('label',)
+
+
+post_save.connect(post_save_cache, sender=SpatialReferenceSystem)
+post_delete.connect(post_save_cache, sender=SpatialReferenceSystem)
+
+
+class GeoItem(models.Model):
+ GEO_SOURCE = (
+ ('T', _("Town")), ('P', _("Precise")), ('M', _("Polygon"))
+ )
+
+ # gis
+ x = models.FloatField(_(u'X'), blank=True, null=True)
+ y = models.FloatField(_(u'Y'), blank=True, null=True)
+ z = models.FloatField(_(u'Z'), blank=True, null=True)
+ estimated_error_x = models.FloatField(_(u'Estimated error for X'),
+ blank=True, null=True)
+ estimated_error_y = models.FloatField(_(u'Estimated error for Y'),
+ blank=True, null=True)
+ estimated_error_z = models.FloatField(_(u'Estimated error for Z'),
+ blank=True, null=True)
+ spatial_reference_system = models.ForeignKey(
+ SpatialReferenceSystem, verbose_name=_("Spatial Reference System"),
+ blank=True, null=True)
+ point = models.PointField(_("Point"), blank=True, null=True, dim=3)
+ point_2d = models.PointField(_("Point (2D)"), blank=True, null=True)
+ point_source = models.CharField(
+ _("Point source"), choices=GEO_SOURCE, max_length=1, blank=True,
+ null=True)
+ point_source_item = models.CharField(
+ _("Point source item"), max_length=100, blank=True, null=True)
+ multi_polygon = models.MultiPolygonField(_("Multi polygon"), blank=True,
+ null=True)
+ multi_polygon_source = models.CharField(
+ _("Multi-polygon source"), choices=GEO_SOURCE, max_length=1,
+ blank=True, null=True)
+ multi_polygon_source_item = models.CharField(
+ _("Multi polygon source item"), max_length=100, blank=True, null=True)
+
+ GEO_LABEL = ""
+
+ class Meta:
+ abstract = True
+
+ def get_town_centroid(self):
+ raise NotImplementedError
+
+ def get_town_polygons(self):
+ raise NotImplementedError
+
+ @property
+ def display_coordinates(self):
+ if not self.point_2d:
+ return ""
+ profile = get_current_profile()
+ if not profile.display_srs or not profile.display_srs.srid:
+ return self.x, self.y
+ point = self.point_2d.transform(profile.display_srs.srid, clone=True)
+ return round(point.x, 5), round(point.y, 5)
+
+ @property
+ def display_spatial_reference_system(self):
+ profile = get_current_profile()
+ if not profile.display_srs or not profile.display_srs.srid:
+ return self.spatial_reference_system
+ return profile.display_srs
+
+ def get_precise_points(self):
+ if self.point_source == 'P' and self.point_2d:
+ return self.point_2d, self.point, self.point_source_item
+
+ def get_precise_polygons(self):
+ if self.multi_polygon_source == 'P' and self.multi_polygon:
+ return self.multi_polygon, self.multi_polygon_source_item
+
+ def most_precise_geo(self):
+ if self.point_source == 'M':
+ return 'multi_polygon'
+ current_source = str(self.__class__._meta.verbose_name)
+ if self.multi_polygon_source_item == current_source \
+ and (self.multi_polygon_source == "P" or
+ self.point_source_item != current_source):
+ return 'multi_polygon'
+ if self.point_source_item == current_source \
+ and self.point_source == 'P':
+ return 'point'
+ if self.multi_polygon_source == 'P':
+ return 'multi_polygon'
+ if self.point_source == 'P':
+ return 'point'
+ if self.multi_polygon:
+ return 'multi_polygon'
+ if self.point_2d:
+ return 'point'
+
+ def geo_point_source(self):
+ if not self.point_source:
+ return ""
+ src = "{} - {}".format(
+ dict(self.GEO_SOURCE)[self.point_source],
+ self.point_source_item
+ )
+ return src
+
+ def geo_polygon_source(self):
+ if not self.multi_polygon_source:
+ return ""
+ src = "{} - {}".format(
+ dict(self.GEO_SOURCE)[self.multi_polygon_source],
+ self.multi_polygon_source_item
+ )
+ return src
+
+ def _geojson_serialize(self, geom_attr):
+ if not hasattr(self, geom_attr):
+ return ""
+ cached_label_key = 'cached_label'
+ if self.GEO_LABEL:
+ cached_label_key = self.GEO_LABEL
+ if getattr(self, "CACHED_LABELS", None):
+ cached_label_key = self.CACHED_LABELS[-1]
+ geojson = serialize(
+ 'geojson',
+ self.__class__.objects.filter(pk=self.pk),
+ geometry_field=geom_attr, fields=(cached_label_key,))
+ geojson_dct = json.loads(geojson)
+ profile = get_current_profile()
+ precision = profile.point_precision
+
+ features = geojson_dct.pop('features')
+ for idx in range(len(features)):
+ feature = features[idx]
+ lbl = feature['properties'].pop(cached_label_key)
+ feature['properties']['name'] = lbl
+ feature['properties']['id'] = self.pk
+ if precision is not None:
+ geom_type = feature["geometry"].get("type", None)
+ if geom_type == "Point":
+ feature["geometry"]["coordinates"] = [
+ round(coord, precision)
+ for coord in feature["geometry"]["coordinates"]
+ ]
+ geojson_dct['features'] = features
+ geojson_dct['link_template'] = simple_link_to_window(self).replace(
+ '999999', '<pk>'
+ )
+ geojson = json.dumps(geojson_dct)
+ return geojson
+
+ @property
+ def point_2d_geojson(self):
+ return self._geojson_serialize('point_2d')
+
+ @property
+ def multi_polygon_geojson(self):
+ return self._geojson_serialize('multi_polygon')
+
+
+class ImageContainerModel:
+ def _get_image_path(self, filename):
+ return "{}/{}".format(self._get_base_image_path(), filename)
+
+ def _get_base_image_path(self):
+ n = datetime.datetime.now()
+ return "upload/{}/{:02d}/{:02d}".format(n.year, n.month, n.day)
+
+
+class QRCodeItem(models.Model, ImageContainerModel):
+ HAS_QR_CODE = True
+ qrcode = models.ImageField(upload_to=get_image_path, blank=True, null=True,
+ max_length=255)
+
+ class Meta:
+ abstract = True
+
+ @property
+ def qrcode_path(self):
+ if not self.qrcode:
+ self.generate_qrcode()
+ if not self.qrcode: # error on qrcode generation
+ return ""
+ return self.qrcode.path
+
+ def generate_qrcode(self, request=None, secure=True, tmpdir=None):
+ url = self.get_absolute_url()
+ site = Site.objects.get_current()
+ if request:
+ scheme = request.scheme
+ else:
+ if secure:
+ scheme = "https"
+ else:
+ scheme = "http"
+ url = scheme + "://" + site.domain + url
+ TinyUrl = apps.get_model("ishtar_common", "TinyUrl")
+ tiny_url = TinyUrl()
+ tiny_url.link = url
+ tiny_url.save()
+ short_url = scheme + "://" + site.domain + reverse(
+ 'tiny-redirect', args=[tiny_url.get_short_id()])
+ qr = pyqrcode.create(short_url, version=settings.ISHTAR_QRCODE_VERSION)
+ tmpdir_created = False
+ if not tmpdir:
+ tmpdir = tempfile.mkdtemp("-qrcode")
+ tmpdir_created = True
+ filename = tmpdir + os.sep + 'qrcode.png'
+ qr.png(filename, scale=settings.ISHTAR_QRCODE_SCALE)
+ with open(filename, 'rb') as qrfile:
+ self.qrcode.save("qrcode.png", File(qrfile))
+ self.skip_history_when_saving = True
+ self._no_move = True
+ self.save()
+ if tmpdir_created:
+ shutil.rmtree(tmpdir)
+
+
+class SearchVectorConfig:
+ def __init__(self, key, language=None, func=None):
+ self.key = key
+ if language:
+ self.language = language
+ if language == "local":
+ self.language = settings.ISHTAR_SEARCH_LANGUAGE
+ else:
+ self.language = "simple"
+ self.func = func
+
+ def format(self, value):
+ if value == 'None':
+ value = ''
+ if not self.func:
+ return [value]
+ return self.func(value)
+
+
+class ShortMenuItem:
+ """
+ Item available in the short menu
+ """
+ UP_MODEL_QUERY = {}
+
+ @classmethod
+ def get_short_menu_class(cls, pk):
+ return ''
+
+ @property
+ def short_class_name(self):
+ return ""
+
+
+class MainItem(ShortMenuItem):
+ """
+ Item with quick actions available from tables
+ Extra actions are available from sheets
+ """
+ QUICK_ACTIONS = []
+
+ @classmethod
+ def get_quick_actions(cls, user, session=None, obj=None):
+ """
+ Get a list of (url, title, icon, target) actions for an user
+ """
+ qas = []
+ for action in cls.QUICK_ACTIONS:
+ if not action.is_available(user, session=session, obj=obj):
+ continue
+ qas.append([action.base_url,
+ mark_safe(action.text),
+ mark_safe(action.rendered_icon),
+ action.target or ""])
+ return qas
+
+ @classmethod
+ def get_quick_action_by_url(cls, url):
+ for action in cls.QUICK_ACTIONS:
+ if action.url == url:
+ return action
+
+ def regenerate_external_id(self):
+ if not hasattr(self, "external_id"):
+ return
+ self.skip_history_when_saving = True
+ self._no_move = True
+ if hasattr(self, "auto_external_id"):
+ self.external_id = None
+ self.save()
+
+ def get_extra_actions(self, request):
+ if not hasattr(self, 'SLUG'):
+ return []
+
+ actions = []
+ if request.user.is_superuser and hasattr(self, "auto_external_id"):
+ actions += [
+ (
+ reverse("regenerate-external-id") + "?{}={}".format(
+ self.SLUG, self.pk),
+ _("Regenerate ID"),
+ "fa fa-key",
+ _("regen."),
+ "",
+ True
+ )
+ ]
+
+ return actions
diff --git a/ishtar_common/serializers.py b/ishtar_common/serializers.py
index 84108c135..55989adcb 100644
--- a/ishtar_common/serializers.py
+++ b/ishtar_common/serializers.py
@@ -13,6 +13,7 @@ from django.contrib.contenttypes.models import ContentType
from django.contrib.auth.models import Group, Permission
from . import models
+from .models_common import State, Department
from archaeological_operations.models import ActType
from ishtar_common.serializers_utils import generic_get_results, \
@@ -102,7 +103,7 @@ def importer_serialization(archive=False, return_empty_types=False,
GEO_MODEL_LIST = [
- models.State, models.Department, models.Town, models.Area
+ State, Department, models.Town, models.Area
]
diff --git a/ishtar_common/tasks.py b/ishtar_common/tasks.py
index 223eded06..96db07b1b 100644
--- a/ishtar_common/tasks.py
+++ b/ishtar_common/tasks.py
@@ -27,7 +27,7 @@ from django.core.files import File
from django.db.models import Q
from django.utils.translation import ugettext_lazy as _
-from ishtar_common.models import Town, Department
+from ishtar_common.models_common import Town, Department
from ishtar_common.utils import task
from ishtar_common.serializers import full_serialization, restore_serialized
diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py
index 6d112f2d1..94a636b7b 100644
--- a/ishtar_common/tests.py
+++ b/ishtar_common/tests.py
@@ -47,7 +47,7 @@ from django.test import TestCase as BaseTestCase
from django.test.client import Client
from django.test.runner import DiscoverRunner
-from ishtar_common import models
+from ishtar_common import models, models_common
from ishtar_common import views
from ishtar_common.apps import admin_site
from ishtar_common.serializers import type_serialization, \
@@ -792,7 +792,7 @@ class SerializationTest(GenericSerializationTest, TestCase):
self.generic_serialization_test(importer_serialization)
def create_geo_default(self):
- s = models.State.objects.create(label="test", number="999")
+ s = models_common.State.objects.create(label="test", number="999")
d = models.Department.objects.create(label="test", number="999",
state=s)
t1 = models.Town.objects.create(
diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py
index 41a844026..31459f861 100644
--- a/ishtar_common/utils.py
+++ b/ishtar_common/utils.py
@@ -37,6 +37,7 @@ import subprocess
import sys
import tempfile
import time
+import uuid
import xmltodict
import zipfile
@@ -48,7 +49,7 @@ from django.contrib.contenttypes.models import ContentType
from django.contrib.gis.geos import GEOSGeometry
from django.contrib.sessions.backends.db import SessionStore
from django.core.cache import cache
-from django.core.exceptions import SuspiciousOperation
+from django.core.exceptions import SuspiciousOperation, ObjectDoesNotExist
from django.core.files import File
from django.core.validators import EMPTY_VALUES
from django.core.urlresolvers import reverse
@@ -713,9 +714,6 @@ def _post_save_geo(sender, **kwargs):
"""
Convert raw x, y, z point to real geo field
"""
- from ishtar_common.models import get_current_profile # not clean but utils
- # is loaded before models
-
profile = get_current_profile()
if not profile.mapping:
return
@@ -1758,3 +1756,115 @@ def try_fix_file(filename, make_copy=True, hard=False):
if make_copy:
shutil.copy2(full_file, filename)
return f
+
+
+def get_current_profile(force=False):
+ IshtarSiteProfile = apps.get_model("ishtar_common", "IshtarSiteProfile")
+ return IshtarSiteProfile.get_current_profile(force=force)
+
+
+PARSE_FORMULA = re.compile("{([^}]*)}")
+
+
+def _deduplicate(value):
+ new_values = []
+ for v in value.split('-'):
+ if v not in new_values:
+ new_values.append(v)
+ return '-'.join(new_values)
+
+
+FORMULA_FILTERS = {
+ 'upper': lambda x: x.upper(),
+ 'lower': lambda x: x.lower(),
+ 'capitalize': lambda x: x.capitalize(),
+ 'slug': lambda x: slugify(x),
+ 'deduplicate': _deduplicate
+}
+
+
+def get_external_id(key, item):
+ profile = get_current_profile()
+ if not hasattr(profile, key):
+ return
+ formula = getattr(profile, key)
+ dct = {}
+ for fkey in PARSE_FORMULA.findall(formula):
+ filtered = fkey.split('|')
+ initial_key = fkey[:]
+ fkey = filtered[0]
+ filters = []
+ for filtr in filtered[1:]:
+ if filtr in FORMULA_FILTERS:
+ filters.append(FORMULA_FILTERS[filtr])
+ if fkey.startswith('settings__'):
+ dct[fkey] = getattr(settings, fkey[len('settings__'):]) or ''
+ continue
+ obj = item
+ for k in fkey.split('__'):
+ try:
+ obj = getattr(obj, k)
+ except ObjectDoesNotExist:
+ obj = None
+ if hasattr(obj, 'all') and hasattr(obj, 'count'): # query manager
+ if not obj.count():
+ break
+ obj = obj.all()[0]
+ elif callable(obj):
+ obj = obj()
+ if obj is None:
+ break
+ if obj is None:
+ dct[initial_key] = ''
+ else:
+ dct[initial_key] = str(obj)
+ for filtr in filters:
+ dct[initial_key] = filtr(dct[initial_key])
+ values = formula.format(**dct).split('||')
+ value = values[0]
+ for filtr in values[1:]:
+ if filtr not in FORMULA_FILTERS:
+ value += '||' + filtr
+ continue
+ value = FORMULA_FILTERS[filtr](value)
+ return value
+
+
+PRIVATE_FIELDS = ('id', 'history_modifier', 'order', 'uuid')
+
+
+def duplicate_item(item, user=None, data=None):
+ model = item.__class__
+ new = model.objects.get(pk=item.pk)
+
+ for field in model._meta.fields:
+ # pk is in PRIVATE_FIELDS so: new.pk = None and a new
+ # item will be created on save
+ if field.name == "uuid":
+ new.uuid = uuid.uuid4()
+ elif field.name in PRIVATE_FIELDS:
+ setattr(new, field.name, None)
+ if user:
+ new.history_user = user
+ if data:
+ for k in data:
+ setattr(new, k, data[k])
+ new.save()
+
+ # m2m fields
+ m2m = [field.name for field in model._meta.many_to_many
+ if field.name not in PRIVATE_FIELDS]
+ for field in m2m:
+ for val in getattr(item, field).all():
+ if val not in getattr(new, field).all():
+ getattr(new, field).add(val)
+ return new
+
+
+def get_image_path(instance, filename):
+ # when using migrations instance is not a real ImageModel instance
+ if not hasattr(instance, '_get_image_path'):
+ n = datetime.datetime.now()
+ return "upload/{}/{:02d}/{:02d}/{}".format(
+ n.year, n.month, n.day, filename)
+ return instance._get_image_path(filename)
diff --git a/ishtar_common/views_item.py b/ishtar_common/views_item.py
index 8b066194f..056385918 100644
--- a/ishtar_common/views_item.py
+++ b/ishtar_common/views_item.py
@@ -33,9 +33,9 @@ from weasyprint import HTML, CSS
from weasyprint.fonts import FontConfiguration
from ishtar_common.utils import check_model_access_control, CSV_OPTIONS, \
- get_all_field_names, Round
+ get_all_field_names, Round, PRIVATE_FIELDS
from ishtar_common.models import HistoryError, get_current_profile, \
- PRIVATE_FIELDS, GeneralType, SearchAltName
+ GeneralType, SearchAltName
from .menus import Menu
from . import models
@@ -507,21 +507,21 @@ def _parse_query_string(string, query_parameters, current_dct, exc_dct,
string = string.strip().lower()
match = RE_FACET.search(string)
- if match or u"=" in string:
+ if match or "=" in string:
queries = []
if match:
for idx, gp in enumerate(match.groups()):
if not idx:
base_term = gp
elif gp:
- queries.append(u'"{}"'.format(gp))
+ queries.append('"{}"'.format(gp))
else:
- splited = string.split(u"=")
+ splited = string.split("=")
if len(splited) == 2:
base_term = splited[0]
queries.append(splited[1])
if queries:
- excluded = base_term.startswith(u"-")
+ excluded = base_term.startswith("-")
if excluded:
base_term = base_term[1:]
if base_term in query_parameters:
@@ -687,7 +687,7 @@ def _search_manage_search_vector(model, dct, exc_dct, distinct_queries,
if search_query:
# remove inside parenthesis
search_query = \
- search_query.replace(u'(', u'').replace(u')', u'').strip()
+ search_query.replace('(', '').replace(')', '').strip()
if search_query:
if 'extras' not in dct:
dct['extras'] = []
@@ -830,10 +830,14 @@ def _manage_facet_search(model, dct, and_reqs):
if not hasattr(model, "general_types"):
return
general_types = model.general_types()
+ hierarchic_fields = HIERARCHIC_FIELDS[:]
for base_k in general_types:
- if base_k in HIERARCHIC_FIELDS: # already managed
+ if base_k in hierarchic_fields: # already managed
continue
- k = base_k + "__pk"
+ if base_k.endswith("_id"):
+ k = base_k
+ else:
+ k = base_k + "__pk"
if k not in dct or not dct[k].startswith('"') \
or not dct[k].startswith('"'):
continue
diff --git a/ishtar_common/wizards.py b/ishtar_common/wizards.py
index 63f7ee30a..9ee0c5896 100644
--- a/ishtar_common/wizards.py
+++ b/ishtar_common/wizards.py
@@ -1983,7 +1983,7 @@ class SourceWizard(Wizard):
DOCUMENT_EXCLUDED = models.Document.RELATED_MODELS + [
"id", "history_creator", "history_modifier", "search_vector", "imports",
- "last_modified"
+ "last_modified", "document"
]