#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2010-2017 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. """ Models description """ from bs4 import BeautifulSoup import copy import datetime import inspect from importlib import import_module from jinja2 import TemplateSyntaxError, UndefinedError import json import logging import os import re import string import tempfile import time from io import BytesIO from subprocess import Popen, PIPE from PIL import Image from ooopy.OOoPy import OOoPy from ooopy.Transformer import Transformer as OOTransformer import ooopy.Transforms as OOTransforms import uuid import zipfile from urllib.parse import urlencode from xml.etree import ElementTree as ET from django.apps import apps from django.conf import settings from django.contrib.auth.models import User, Group from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.contenttypes.models import ContentType from django.contrib.gis.db import models from django.contrib.postgres.fields import JSONField from django.contrib.postgres.indexes import GinIndex from django.contrib.sites.models import Site from django.core.cache import cache from django.core.exceptions import ( ObjectDoesNotExist, ValidationError, MultipleObjectsReturned, ) from django.core.files.base import ContentFile from django.core.files.uploadedfile import SimpleUploadedFile from django.core.urlresolvers import reverse from django.db.models import Q, Max, Count from django.db.models.signals import post_save, post_delete, m2m_changed from django.db.utils import DatabaseError from django.template import Context, Template from django.template.defaultfilters import slugify from django.utils.functional import lazy from ishtar_common.utils import ( ugettext_lazy as _, ugettext, pgettext_lazy, get_generated_id, get_current_profile, duplicate_item, get_image_path, ) from ishtar_common.utils_secretary import IshtarSecretaryRenderer from ishtar_common.alternative_configs import ( ALTERNATE_CONFIGS, ALTERNATE_CONFIGS_CHOICES, ) from ishtar_common.data_importer import pre_importer_action from ishtar_common.model_managers import ( SlugModelManager, ExternalIdManager, UUIDModelManager, ) from ishtar_common.model_merging import merge_model_objects from ishtar_common.models_imports import ( ImporterModel, ImporterType, ImporterDefault, ImporterDefaultValues, ImporterColumn, ImporterDuplicateField, Regexp, ImportTarget, TargetKey, FormaterType, Import, TargetKeyGroup, ValueFormater, ) from ishtar_common.utils import ( get_cache, create_slug, get_all_field_names, cached_label_changed, generate_relation_graph, max_size_help, ) from ishtar_common.models_common import ( GeneralType, HierarchicalType, BaseHistorizedItem, LightHistorizedItem, FullSearch, SearchAltName, OwnPerms, Cached, Address, post_save_cache, TemplateItem, SpatialReferenceSystem, DashboardFormItem, document_attached_changed, SearchAltName, DynamicRequest, GeoItem, CompleteIdentifierItem, SearchVectorConfig, DocumentItem, QuickAction, MainItem, Merge, ShortMenuItem, Town, ImageContainerModel, StatisticItem, CachedGen, CascasdeUpdate, Department, State, ) __all__ = [ "ImporterModel", "ImporterType", "ImporterDefault", "ImporterDefaultValues", "ImporterColumn", "ImporterDuplicateField", "Regexp", "ImportTarget", "TargetKey", "FormaterType", "Import", "TargetKeyGroup", "ValueFormater", "Organization", "Person", "valid_id", "Town", "SpatialReferenceSystem", "OrganizationType", "Document", "GeneralType", "get_generated_id", "LightHistorizedItem", "OwnPerms", "Address", "post_save_cache", "DashboardFormItem", "ShortMenuItem", "document_attached_changed", "SearchAltName", "DynamicRequest", "GeoItem", "SearchVectorConfig", "DocumentItem", "CachedGen", "StatisticItem", "CascasdeUpdate", "Department", "State", "CompleteIdentifierItem", ] logger = logging.getLogger(__name__) def post_save_user(sender, **kwargs): user = kwargs["instance"] if kwargs["created"]: try: IshtarUser.create_from_user(user) except DatabaseError: # manage when db is not synced pass IshtarUser.set_superuser(user) post_save.connect(post_save_user, sender=User) class ValueGetter(object): _prefix = "" COL_LABELS = {} GET_VALUES_EXTRA = [] GET_VALUES_EXCLUDE_FIELDS = [ "search_vector", "id", "multi_polygon", "point_2d", "point", "history_m2m", ] GET_VALUES_ = [ "preservation_to_considers", "alterations", "alteration_causes", ] GET_VALUES_EXTRA_TYPES = [ "preservation_to_considers", "alterations", "alteration_causes", ] def _get_values_documents(self, prefix="", filtr=None): values = {} if not hasattr(self, "documents"): return values if not filtr or prefix + "documents" in filtr: values[prefix + "documents"] = [ doc.get_values(no_values=True) for doc in self.documents.all() ] if filtr and prefix + "main_image" not in filtr: return values if ( hasattr(self, "main_image") and self.main_image and hasattr(self.main_image, "get_values") ): values[prefix + "main_image"] = self.main_image.get_values( no_values=True ) return values def _get_values_update_sub_filter(self, filtr, prefix): if not filtr: return return [k[len(prefix) :] for k in filtr if k.startswith(prefix)] def get_values(self, prefix="", no_values=False, filtr=None, **kwargs): if not prefix: prefix = self._prefix exclude = kwargs.get("exclude", []) values = {} if ( hasattr(self, "qrcode") and (not filtr or prefix + "qrcode_path" in filtr) and prefix + "qrcode_path" not in exclude ): values[prefix + "qrcode_path"] = self.qrcode_path for field_name in get_all_field_names(self): try: value = getattr(self, field_name) except (AttributeError, MultipleObjectsReturned): continue if ( field_name in self.GET_VALUES_EXCLUDE_FIELDS or prefix + field_name in exclude ): continue if filtr and not any( field_name for f in filtr if f.startswith(prefix + field_name) ): continue if hasattr(value, "get_values"): new_prefix = prefix + field_name + "_" values.update( value.get_values(new_prefix, filtr=filtr, **kwargs) ) if hasattr(self, "get_values_for_" + field_name): values[prefix + field_name] = getattr( self, "get_values_for_" + field_name )() else: values[prefix + field_name] = value values.update(self._get_values_documents(prefix=prefix, filtr=filtr)) for extra_field in self.GET_VALUES_EXTRA: values[prefix + extra_field] = getattr(self, extra_field) or "" for key, val in values.items(): if val is None: val = "" elif (key in self.GET_VALUES_EXTRA_TYPES or "type" in key) and ( val.__class__.__name__.split(".")[0] == "ManyRelatedManager" ): val = " ; ".join(str(v) for v in val.all()) elif not isinstance(val, (tuple, list, dict)): val = str(val) if val.endswith(".None"): val = "" values[key] = val if (prefix and prefix != self._prefix) or no_values: # do not provide KEYS and VALUES for sub-items return values value_list = [] for key, value_ in values.items(): if key in ("KEYS", "VALUES"): continue value_list.append((key, str(value_))) for global_var in GlobalVar.objects.all(): values[global_var.slug] = global_var.value or "" return values @classmethod def get_empty_values(cls, prefix=""): if not prefix: prefix = cls._prefix return { prefix + field_name: "" for field_name in get_all_field_names(cls) } class HistoryModel(models.Model): class Meta: abstract = True def m2m_listing(self, key, create=False): if not self.history_m2m or key not in self.history_m2m: return models = self.__class__.__module__ if not models.endswith(".models"): models += ".models" models = import_module(models) model = getattr(models, self.__class__.__name__[len("Historical") :]) related_model = getattr(model, key).rel.model return related_model.history_decompress( self.history_m2m[key], create=create ) def valid_id(cls): # valid ID validator for models def func(value): try: cls.objects.get(pk=value) except ObjectDoesNotExist: raise ValidationError(_("Not a valid item.")) return func def valid_ids(cls): def func(value): if "," in value: value = value.split(",") if type(value) not in (list, tuple): value = [value] for v in value: try: cls.objects.get(pk=v) except ObjectDoesNotExist: raise ValidationError(_("A selected item is not a valid item.")) return func def is_unique(cls, field): # unique validator for models def func(value): query = {field: value} try: assert cls.objects.filter(**query).count() == 0 except AssertionError: raise ValidationError(_("This item already exists.")) return func def get_general_type_label(model, slug): obj = model.get_cache(slug) if not obj: return "" return str(obj) class TinyUrl(models.Model): CHAR_MAP = string.ascii_letters + string.digits CHAR_MAP_LEN = len(CHAR_MAP) link = models.URLField() @classmethod def index_to_char(cls, seq): return "".join(cls.CHAR_MAP[x] for x in seq) def get_short_id(self): c_id = self.id digits = [] while c_id > 0: digits.append(c_id % self.CHAR_MAP_LEN) c_id //= self.CHAR_MAP_LEN digits.reverse() return self.index_to_char(digits) @classmethod def decode_id(cls, value): i = 0 for c in value: i = i * cls.CHAR_MAP_LEN + cls.CHAR_MAP.index(c) return i class ItemKey(models.Model): key = models.TextField(_("Key")) content_type = models.ForeignKey(ContentType) object_id = models.PositiveIntegerField() content_object = GenericForeignKey("content_type", "object_id") importer = models.ForeignKey( Import, null=True, blank=True, help_text=_("Specific key to an import") ) user = models.ForeignKey("IshtarUser", blank=True, null=True) group = models.ForeignKey(TargetKeyGroup, blank=True, null=True) def __str__(self): return self.key class ImageModel(models.Model, ImageContainerModel): image = models.ImageField( upload_to=get_image_path, blank=True, null=True, max_length=255, help_text=max_size_help(), ) thumbnail = models.ImageField( upload_to=get_image_path, blank=True, null=True, max_length=255, help_text=max_size_help(), ) IMAGE_MAX_SIZE = settings.IMAGE_MAX_SIZE THUMB_MAX_SIZE = settings.THUMB_MAX_SIZE IMAGE_PREFIX = "" class Meta: abstract = True def has_changed(self, field): if not self.pk: return True manager = getattr(self.__class__, "objects") old = getattr(manager.get(pk=self.pk), field) return getattr(self, field) != old def create_thumb(self, image, size): """Returns the image resized to fit inside a box of the given size""" image.thumbnail(size, Image.ANTIALIAS) temp = BytesIO() image.save(temp, "jpeg") temp.seek(0) return SimpleUploadedFile("temp", temp.read()) def save(self, *args, **kwargs): if "force_copy" in kwargs: kwargs.pop("force_copy") super(ImageModel, self).save(*args, **kwargs) return # manage images if not self.has_changed("image"): return super(ImageModel, self).save(*args, **kwargs) if not self.image: self.thumbnail = None return super(ImageModel, self).save(*args, **kwargs) # # generate thumbnail # convert to jpg filename = os.path.splitext(os.path.split(self.image.name)[-1])[0] old_path = self.image.path filename = "%s.jpg" % filename image = None try: image = Image.open(self.image.file) # convert to RGB if image.mode not in ("L", "RGB"): image = image.convert("RGB") # resize if necessary if self.IMAGE_MAX_SIZE: self.image.save( filename, self.create_thumb(image, self.IMAGE_MAX_SIZE), save=False, ) if old_path != self.image.path: try: os.remove(old_path) except OSError: # already clean pass # save the thumbnail thumb_filename = self._get_thumb_name(filename) self.thumbnail.save( thumb_filename, self.create_thumb(image, self.THUMB_MAX_SIZE), save=False, ) except (IOError, ValueError): self.thumbnail = None self.image = None finally: if image: image.close() return super(ImageModel, self).save(*args, **kwargs) def _get_thumb_name(self, filename): splited = filename.split(".") return "{}-thumb.{}".format(".".join(splited[:-1]), splited[-1]) class BulkUpdatedItem(object): @classmethod def bulk_recursion(cls, transaction_id, extra_args): """ Prevent infinite recursion. Should not happen but wrong manipulation in the database or messy imports can generate circular relations :param transaction_id: current transaction ID (unix time) - if null a transaction ID is generated :param extra_args: arguments dealing with :return: (transaction ID, is a recursion) """ if not transaction_id: transaction_id = str(time.time()) args = ["cached_label_bulk_update", transaction_id] + extra_args key, val = get_cache(cls, args) if val: return transaction_id, True cache.set(key, 1, settings.CACHE_SMALLTIMEOUT) return transaction_id, False class RelationItem(models.Model): """ Items with relation between them """ MAIN_UP_MODEL_QUERY = "" relation_image = models.FileField( _("Generated relation image (SVG)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help(), ) relation_bitmap_image = models.FileField( _("Generated relation image (PNG)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help(), ) relation_dot = models.FileField( _("Generated relation image (DOT)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help(), ) relation_image_above = models.FileField( _("Generated above relation image (SVG)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help(), ) relation_dot_above = models.FileField( _("Generated above relation image (DOT)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help(), ) relation_bitmap_image_above = models.FileField( _("Generated above relation image (PNG)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help(), ) relation_image_below = models.FileField( _("Generated below relation image (SVG)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help(), ) relation_dot_below = models.FileField( _("Generated below relation image (DOT)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help(), ) relation_bitmap_image_below = models.FileField( _("Generated below relation image (PNG)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help(), ) class Meta: abstract = True def generate_relation_image( self, highlight_current=True, render_above=True, render_below=True, full=False, ): generate_relation_graph( self, highlight_current=highlight_current, render_above=render_above, render_below=render_below, full=full, ) class JsonDataSectionManager(models.Manager): def get_by_natural_key(self, name, app_label, model): return self.get( name=name, content_type__app_label=app_label, content_type__model=model, ) class JsonDataSection(models.Model): content_type = models.ForeignKey(ContentType) name = models.CharField(_("Name"), max_length=200) order = models.IntegerField(_("Order"), default=10) objects = JsonDataSectionManager() class Meta: verbose_name = _("Json data - Menu") verbose_name_plural = _("Json data - Menus") ordering = ["order", "name"] unique_together = ("name", "content_type") def natural_key(self): return (self.name, self.content_type.app_label, self.content_type.model) def __str__(self): return "{} - {}".format(self.content_type, self.name) JSON_VALUE_TYPES = ( ("T", _("Text")), ("LT", _("Long text")), ("I", _("Integer")), ("B", _("Boolean")), ("F", _("Float")), ("D", _("Date")), ("C", _("Choices")), ) class JsonDataFieldManager(models.Manager): def get_by_natural_key(self, key, app_label, model): return self.get( key=key, content_type__app_label=app_label, content_type__model=model, ) class JsonDataField(models.Model): name = models.CharField(_("Name"), max_length=200) content_type = models.ForeignKey(ContentType) key = models.CharField( _("Key"), max_length=200, help_text=_( "Value of the key in the JSON schema. For hierarchical " 'key use "__" to explain it. For instance for the key ' "'my_subkey' with data such as {'my_key': {'my_subkey': " "'value'}}, its value will be reached with " "my_key__my_subkey." ), ) display = models.BooleanField(_("Display"), default=True) value_type = models.CharField( _("Type"), default="T", max_length=10, choices=JSON_VALUE_TYPES ) order = models.IntegerField(_("Order"), default=10) search_index = models.BooleanField( _("Use in search indexes"), default=False ) section = models.ForeignKey( JsonDataSection, blank=True, null=True, on_delete=models.SET_NULL ) custom_forms = models.ManyToManyField( "CustomForm", blank=True, through="CustomFormJsonField" ) objects = JsonDataFieldManager() class Meta: verbose_name = _("Json data - Field") verbose_name_plural = _("Json data - Fields") ordering = ["order", "name"] unique_together = ("content_type", "key") def natural_key(self): return (self.key, self.content_type.app_label, self.content_type.model) def __str__(self): return "{} - {}".format(self.content_type, self.name) def clean(self): if not self.section: return if self.section.content_type != self.content_type: raise ValidationError( _("Content types of the field and of the menu do not match") ) LOGICAL_TYPES = ( ("above", _("Above")), ("below", _("Below")), ("equal", _("Equal")), ("include", _("Include")), ("included", _("Is included")), ) class GeneralRelationType(GeneralType): order = models.IntegerField(_("Order"), default=1) symmetrical = models.BooleanField(_("Symmetrical")) tiny_label = models.CharField( _("Tiny label"), max_length=50, blank=True, null=True ) inverse_relation = models.ForeignKey( "self", verbose_name=_("Inverse relation"), blank=True, null=True ) logical_relation = models.CharField( verbose_name=_("Logical relation"), max_length=10, choices=LOGICAL_TYPES, blank=True, null=True, ) class Meta: abstract = True def clean(self): # cannot have symmetrical and an inverse_relation if self.symmetrical and self.inverse_relation: raise ValidationError( _("Cannot have symmetrical and an inverse_relation") ) def get_tiny_label(self): return self.tiny_label or self.label or "" def save(self, *args, **kwargs): obj = super(GeneralRelationType, self).save(*args, **kwargs) # after saving check that the inverse_relation of the inverse_relation # point to the saved object if self.inverse_relation and ( not self.inverse_relation.inverse_relation or self.inverse_relation.inverse_relation != self ): self.inverse_relation.inverse_relation = self self.inverse_relation.symmetrical = False self.inverse_relation.save() return obj class GeneralRecordRelations(object): @classmethod def general_types(cls): return ["relation_type"] def save(self, *args, **kwargs): super(GeneralRecordRelations, self).save(*args, **kwargs) # after saving create the symetric or the inverse relation sym_rel_type = self.relation_type if not self.relation_type.symmetrical: sym_rel_type = self.relation_type.inverse_relation # no symetric/inverse is defined if not sym_rel_type: return dct = { "right_record": self.left_record, "left_record": self.right_record, "relation_type": sym_rel_type, } self.__class__.objects.get_or_create(**dct) return self def post_delete_record_relation(sender, instance, **kwargs): # delete symmetrical or inverse relation sym_rel_type = instance.relation_type if not instance.relation_type.symmetrical: sym_rel_type = instance.relation_type.inverse_relation # no symetric/inverse is defined if not sym_rel_type: return dct = { "right_record_id": instance.left_record_id, "left_record_id": instance.right_record_id, "relation_type": sym_rel_type, } q = instance.__class__.objects.filter(**dct) if q.count(): q.delete() class SearchQuery(models.Model): label = models.TextField(_("Label"), blank=True, default="") query = models.TextField(_("Query"), blank=True, default="") content_type = models.ForeignKey( ContentType, verbose_name=_("Content type") ) profile = models.ForeignKey("UserProfile", verbose_name=_("Profile")) is_alert = models.BooleanField(_("Is an alert"), default=False) class Meta: verbose_name = _("Search query") verbose_name_plural = _("Search queries") ordering = ["label"] def __str__(self): return str(self.label) class Language(GeneralType): iso_code = models.CharField( _("ISO code"), null=True, blank=True, max_length=2 ) class Meta: verbose_name = _("Language") verbose_name_plural = _("Languages") CURRENCY = (("€", _("Euro")), ("$", _("US dollar"))) FIND_INDEX_SOURCE = (("O", _("Operations")), ("CR", _("Context records"))) SITE_LABELS = [("site", _("Site")), ("entity", _("Archaeological entity"))] TRANSLATED_SITE_LABELS = { "site": { "search": _("Site search"), "new": _("New site"), "modification": _("Site modification"), "deletion": _("Site deletion"), "attached-to-operation": _("Site (attached to the operation)"), "name-attached-to-operation": _( "Site name (attached to the operation)" ), "attached-to-cr": _("Site (attached to the context record)"), "name-attached-to-cr": _("Site name (attached to the context record)"), }, "entity": { "search": _("Archaeological entity search"), "new": _("New archaeological entity"), "modification": _("Archaeological entity modification"), "deletion": _("Archaeological entity deletion"), "attached-to-operation": _( "Archaeological entity (attached to the " "operation)" ), "name-attached-to-operation": _( "Archaeological entity name (attached " "to the operation)" ), "attached-to-cr": _( "Archaeological entity (attached to the context " "record)" ), "name-attached-to-cr": _( "Archaeological entity name (attached to the context record)" ), }, } ACCOUNT_NAMING_STYLE = ( ("NF", _("name.firstname")), ("FN", _("firstname.name")), ) class IshtarSiteProfile(models.Model, Cached): slug_field = "slug" label = models.TextField(_("Name")) slug = models.SlugField(_("Slug"), unique=True) active = models.BooleanField(_("Current active"), default=False) experimental_feature = models.BooleanField( _("Activate experimental feature"), default=False ) description = models.TextField(_("Description"), blank=True, default="") warning_name = models.TextField(_("Warning name"), blank=True, default="") warning_message = models.TextField( _("Warning message"), blank=True, default="" ) delete_image_zip_on_archive = models.BooleanField( _("Import - Delete image/document zip on archive"), default=False ) clean_redundant_document_association = models.BooleanField( _("Document - Remove redundant association"), default=False, help_text=_( "For instance, remove operation association of a " "document also associated to a find of this operation. " "Only manage association of operations, context records " "and finds." ), ) calculate_weight_on_full = models.BooleanField( _("Container - calculate weight only when all find has a weight"), default=False, ) config = models.CharField( _("Alternate configuration"), max_length=200, choices=ALTERNATE_CONFIGS_CHOICES, help_text=_( "Choose an alternate configuration for label, " "index management" ), null=True, blank=True, ) files = models.BooleanField(_("Files module"), default=False) archaeological_site = models.BooleanField( _("Archaeological site module"), default=False ) archaeological_site_label = models.CharField( _("Archaeological site type"), max_length=200, choices=SITE_LABELS, default="site", ) context_record = models.BooleanField( _("Context records module"), default=False ) find = models.BooleanField( _("Finds module"), default=False, help_text=_("Need context records module"), ) find_index = models.CharField( _("Find index is based on"), default="O", max_length=2, choices=FIND_INDEX_SOURCE, help_text=_( "To prevent irrelevant indexes, change this parameter " "only if there is no find in the database" ), ) warehouse = models.BooleanField( _("Warehouses module"), default=False, help_text=_("Need finds module") ) preservation = models.BooleanField(_("Preservation module"), default=False) mapping = models.BooleanField(_("Mapping module"), default=False) point_precision = models.IntegerField( _("Point precision (search and sheets)"), null=True, blank=True, help_text=_( "Number of digit to round from the decimal point for coordinates " "in WGS84 (latitude, longitude). Empty value means no round." ), ) locate_warehouses = models.BooleanField( _("Locate warehouse and containers"), default=False, help_text=_( "Mapping module must be activated. With many containers and " "background task not activated, activating this option may " "consume many resources." ), ) use_town_for_geo = models.BooleanField( _("Use town to locate when coordinates are missing"), default=True ) relation_graph = models.BooleanField( _("Generate relation graph"), default=False ) underwater = models.BooleanField(_("Underwater module"), default=False) parcel_mandatory = models.BooleanField( _("Parcel are mandatory for context records"), default=True ) homepage = models.TextField( _("Home page"), blank=True, default="", help_text=_( "Homepage of Ishtar - if not defined a default homepage " "will appear. Use the markdown syntax. {random_image} " "can be used to display a random image." ), ) operation_prefix = models.CharField( _("Main operation code prefix"), default="OA", null=True, blank=True, max_length=20, ) default_operation_prefix = models.CharField( _("Default operation code prefix"), default="OP", null=True, blank=True, max_length=20, ) operation_region_code = models.CharField( _("Operation region code"), null=True, blank=True, max_length=5 ) operation_complete_identifier = models.TextField( _("Operation complete identifier"), default="", blank=True, help_text=_("Formula to manage operation complete identifier."), ) operation_custom_index = models.TextField( _("Operation custom index key"), default="", blank=True, help_text=_( "Keys to be used to manage operation custom index. " "Separate keys with a semicolon." ), ) site_complete_identifier = models.TextField( _("Archaeological site complete identifier"), default="", blank=True, help_text=_( "Formula to manage archaeological site complete" " identifier." ), ) site_custom_index = models.TextField( _("Archaeological site custom index key"), default="", blank=True, help_text=_( "Keys to be used to manage archaeological site custom " "index. Separate keys with a semicolon." ), ) file_external_id = models.TextField( _("File external id"), default="{year}-{numeric_reference}", help_text=_( "Formula to manage file external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive." ), ) file_complete_identifier = models.TextField( _("Archaeological file complete identifier"), default="", blank=True, help_text=_( "Formula to manage archaeological file complete " "identifier." ), ) file_custom_index = models.TextField( _("Archaeological file custom index key"), default="", blank=True, help_text=_( "Keys to be used to manage archaeological file custom " "index. Separate keys with a semicolon." ), ) parcel_external_id = models.TextField( _("Parcel external id"), default="{associated_file__external_id}{operation__code_patriarche}-" "{town__numero_insee}-{section}{parcel_number}", help_text=_( "Formula to manage parcel external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive." ), ) context_record_external_id = models.TextField( _("Context record external id"), default="{parcel__external_id}-{label}", help_text=_( "Formula to manage context record external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive." ), ) contextrecord_complete_identifier = models.TextField( _("Context record complete identifier"), default="", blank=True, help_text=_("Formula to manage context record complete identifier."), ) contextrecord_custom_index = models.TextField( _("Context record custom index key"), default="", blank=True, help_text=_( "Keys to be used to manage context record custom index. " "Separate keys with a semicolon." ), ) base_find_external_id = models.TextField( _("Base find external id"), default="{context_record__external_id}-{label}", help_text=_( "Formula to manage base find external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive." ), ) basefind_complete_identifier = models.TextField( _("Base find complete identifier"), default="", blank=True, help_text=_("Formula to manage base find complete identifier."), ) basefind_custom_index = models.TextField( _("Base find custom index key"), default="", blank=True, help_text=_( "Keys to be used to manage base find custom index. " "Separate keys with a semicolon." ), ) find_external_id = models.TextField( _("Find external id"), default="{get_first_base_find__context_record__external_id}-{label}", help_text=_( "Formula to manage find external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive." ), ) find_complete_identifier = models.TextField( _("Find complete identifier"), default="", blank=True, help_text=_("Formula to manage find complete identifier."), ) find_custom_index = models.TextField( _("Find custom index key"), default="", blank=True, help_text=_( "Keys to be used to manage find custom index. " "Separate keys with a semicolon." ), ) container_external_id = models.TextField( _("Container external id"), default="{parent_external_id}-{container_type__txt_idx}-" "{reference}", help_text=_( "Formula to manage container external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive." ), ) container_complete_identifier = models.TextField( _("Container complete identifier"), default="", blank=True, help_text=_("Formula to manage container complete identifier."), ) container_custom_index = models.TextField( _("Container custom index key"), default="", blank=True, help_text=_( "Keys to be used to manage container custom index. " "Separate keys with a semicolon." ), ) warehouse_external_id = models.TextField( _("Warehouse external id"), default="{name|slug}", help_text=_( "Formula to manage warehouse external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive." ), ) warehouse_complete_identifier = models.TextField( _("Warehouse complete identifier"), default="", blank=True, help_text=_("Formula to manage warehouse complete identifier."), ) warehouse_custom_index = models.TextField( _("Warehouse custom index key"), default="", blank=True, help_text=_( "Keys to be used to manage warehouse custom index. " "Separate keys with a semicolon." ), ) document_external_id = models.TextField( _("Document external id"), default="{index}", help_text=_( "Formula to manage document external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive." ), ) document_complete_identifier = models.TextField( _("Document complete identifier"), default="", blank=True, help_text=_("Formula to manage document complete identifier."), ) document_custom_index = models.TextField( _("Document custom index key"), default="", blank=True, help_text=_( "Keys to be used to manage document custom index. " "Separate keys with a semicolon." ), ) person_raw_name = models.TextField( _("Raw name for person"), default="{name|upper} {surname}", help_text=_( "Formula to manage person raw_name. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive." ), ) find_use_index = models.BooleanField( _("Use auto index for finds"), default=True ) currency = models.CharField( _("Currency"), default="€", choices=CURRENCY, max_length=5 ) account_naming_style = models.CharField( _("Naming style for accounts"), max_length=2, default="NF", choices=ACCOUNT_NAMING_STYLE, ) default_center = models.PointField( _("Maps - default center"), default="SRID=4326;POINT(2.4397 46.5528)" ) default_zoom = models.IntegerField(_("Maps - default zoom"), default=6) display_srs = models.ForeignKey( SpatialReferenceSystem, verbose_name=_("Spatial Reference System for display"), blank=True, null=True, help_text=_( "Spatial Reference System used for display when no SRS is " "defined" ), ) default_language = models.ForeignKey( Language, verbose_name=_("Default language for documentation"), blank=True, null=True, help_text=_( "If set, by default the selected language will be set for " "localized documents." ), ) objects = SlugModelManager() class Meta: verbose_name = _("Ishtar site profile") verbose_name_plural = _("Ishtar site profiles") ordering = ["label"] def __str__(self): return str(self.label) def natural_key(self): return (self.slug,) def has_overload(self, key): return ( self.config and self.config in ALTERNATE_CONFIGS and hasattr(ALTERNATE_CONFIGS[self.config], key) ) @classmethod def get_current_profile(cls, force=False): cache_key, value = get_cache(cls, ["is-current-profile"]) if value and not force: return value q = cls.objects.filter(active=True) if not q.count(): obj = cls.objects.create( label="Default profile", slug="default", active=True ) else: obj = q.all()[0] cache.set(cache_key, obj, settings.CACHE_TIMEOUT) return obj @classmethod def get_default_site_label(cls, key=None): return cls.get_current_profile().get_site_label(key) def get_site_label(self, key=None): if not key: return str(dict(SITE_LABELS)[self.archaeological_site_label]) return str(TRANSLATED_SITE_LABELS[self.archaeological_site_label][key]) def save(self, *args, **kwargs): raw = False if "raw" in kwargs: raw = kwargs.pop("raw") super(IshtarSiteProfile, self).save(*args, **kwargs) obj = self if raw: return obj q = self.__class__.objects.filter(active=True).exclude(slug=self.slug) if obj.active and q.count(): for profile in q.all(): profile.active = False profile.save(raw=True) changed = False if not obj.active and not q.count(): obj.active = True changed = True if obj.warehouse and not obj.find: obj.find = True changed = True if obj.find and not obj.context_record: obj.context_record = True changed = True if changed: obj = obj.save(raw=True) return obj def _profile_mapping(): return get_current_profile().mapping profile_mapping = lazy(_profile_mapping) def cached_site_changed(sender, **kwargs): get_current_profile(force=True) from ishtar_common.menus import Menu MAIN_MENU = Menu(None) MAIN_MENU.init() MAIN_MENU.reinit_menu_for_all_user() post_save.connect(cached_site_changed, sender=IshtarSiteProfile) post_delete.connect(cached_site_changed, sender=IshtarSiteProfile) class CustomFormManager(models.Manager): def get_by_natural_key(self, name, form): return self.get(name=name, form=form) class CustomForm(models.Model): name = models.CharField(_("Name"), max_length=250) form = models.CharField(_("Form"), max_length=250) available = models.BooleanField(_("Available"), default=True) enabled = models.BooleanField( _("Enable this form"), default=True, help_text=_( "Disable with caution: disabling a form with mandatory " "fields may lead to database errors." ), ) apply_to_all = models.BooleanField( _("Apply to all"), default=False, help_text=_( "Apply this form to all users. If set to True, selecting " "user and user type is useless." ), ) users = models.ManyToManyField("IshtarUser", blank=True) user_types = models.ManyToManyField( "PersonType", blank=True, help_text=_("Deprecated - use profile types") ) profile_types = models.ManyToManyField("ProfileType", blank=True) objects = CustomFormManager() SERIALIZATION_EXCLUDE = ("users",) class Meta: verbose_name = _("Custom form") verbose_name_plural = _("Custom forms") ordering = ["name", "form"] unique_together = (("name", "form"),) def natural_key(self): return (self.name, self.form) def __str__(self): return "{} - {}".format(self.name, self.form) def users_lbl(self): users = [str(user) for user in self.users.all()] return " ; ".join(users) users_lbl.short_description = _("Users") def user_types_lbl(self): user_types = [str(u) for u in self.user_types.all()] return " ; ".join(user_types) user_types_lbl.short_description = _("User types") @classmethod def register(cls): if hasattr(cls, "_register") and hasattr(cls, "_register_fields"): return cls._register, cls._register_fields cache_key, value = get_cache( cls.__class__, ["dct-forms"], app_label="ishtar_common" ) cache_key_fields, value_fields = get_cache( cls.__class__, ["dct-fields"], app_label="ishtar_common" ) if value and value_fields: cls._register = value cls._register_fields = value_fields return cls._register, cls._register_fields cls._register, cls._register_fields = {}, {} # ideally should be improved but only used in admin from ishtar_common.admin import ISHTAR_FORMS from ishtar_common.forms import CustomForm as CustomFormForm for app_form in ISHTAR_FORMS: app_name = app_form.__package__ if app_name == "archaeological_files_pdl": app_name = "archaeological_files" for form in dir(app_form): if "Form" not in form and "Select" not in form: # not very clean... but do not treat inappropriate items continue form = getattr(app_form, form) if ( not inspect.isclass(form) or not issubclass(form, CustomFormForm) or not getattr(form, "form_slug", None) ): continue model_name = form.form_slug.split("-")[0].replace("_", "") if app_name not in cls._register_fields: cls._register_fields[app_name] = [] if model_name not in cls._register_fields[app_name]: cls._register_fields[app_name].append(model_name) cls._register[form.form_slug] = form return cls._register, cls._register_fields def get_form_class(self): register, register_fields = self.register() if self.form not in self._register: return return register[self.form] def get_available_json_fields(self): register, register_fields = self.register() if self.form not in self._register: return [] current_form = register[self.form] app_name = current_form.__module__.split(".")[0] if app_name == "archaeological_files_pdl": app_name = "archaeological_files" if app_name not in register_fields: return [] res = [] for model_name in register_fields[app_name]: q = ContentType.objects.filter(app_label=app_name, model=model_name) if not q.count(): continue ct = q.all()[0] for json_field in JsonDataField.objects.filter( content_type=ct ).all(): res.append( ( json_field.pk, "{} ({})".format( json_field.name, dict(JSON_VALUE_TYPES)[json_field.value_type], ), ) ) return res class ExcludedFieldManager(models.Manager): def get_by_natural_key(self, custom_form_name, custom_form_form, field): return self.get( custom_form__name=custom_form_name, custom_form__form=custom_form_form, field=field, ) class ExcludedField(models.Model): custom_form = models.ForeignKey(CustomForm, related_name="excluded_fields") field = models.CharField(_("Field"), max_length=250) objects = ExcludedFieldManager() class Meta: verbose_name = _("Excluded field") verbose_name_plural = _("Excluded fields") unique_together = ("custom_form", "field") def natural_key(self): return (self.custom_form.name, self.custom_form.form, self.field) class CustomFormJsonFieldManager(models.Manager): def get_by_natural_key( self, custom_form_name, custom_form_form, json_field_key, json_field_app_label, json_field_model, ): return self.get( custom_form__name=custom_form_name, custom_form__form=custom_form_form, json_field__key=json_field_key, json_field__content_type__app_label=json_field_app_label, json_field__content_type__model=json_field_model, ) class CustomFormJsonField(models.Model): custom_form = models.ForeignKey(CustomForm, related_name="json_fields") json_field = models.ForeignKey( JsonDataField, related_name="custom_form_details" ) label = models.CharField(_("Label"), max_length=200, blank=True, default="") order = models.IntegerField(verbose_name=_("Order"), default=1) help_text = models.TextField(_("Help"), blank=True, default="") objects = CustomFormJsonFieldManager() class Meta: verbose_name = _("Custom form - Json data field") verbose_name_plural = _("Custom form - Json data fields") unique_together = ("custom_form", "json_field") def natural_key(self): return ( self.custom_form.name, self.custom_form.form, self.json_field.key, self.json_field.content_type.app_label, self.json_field.content_type.model, ) class GlobalVar(models.Model, Cached): slug = models.SlugField(_("Variable name"), unique=True) description = models.TextField( _("Description of the variable"), blank=True, default="" ) value = models.TextField(_("Value"), blank=True, default="") objects = SlugModelManager() class Meta: verbose_name = _("Global variable") verbose_name_plural = _("Global variables") ordering = ["slug"] def natural_key(self): return (self.slug,) def __str__(self): return str(self.slug) def cached_globalvar_changed(sender, **kwargs): if not kwargs["instance"]: return var = kwargs["instance"] cache_key, value = get_cache(GlobalVar, var.slug) cache.set(cache_key, var.value, settings.CACHE_TIMEOUT) post_save.connect(cached_globalvar_changed, sender=GlobalVar) class UserDashboard: def __init__(self): types = IshtarUser.objects.values( "person__person_types", "person__person_types__label" ) self.types = types.annotate(number=Count("pk")).order_by( "person__person_types" ) class StatsCache(models.Model): model = models.CharField(_("Model name"), max_length=200) model_pk = models.IntegerField(_("Associated primary key")) values = JSONField(default={}, blank=True) updated = models.DateTimeField(default=datetime.datetime.now) update_requested = models.DateTimeField(blank=True, null=True) class Meta: verbose_name = _("Cache for stats") verbose_name_plural = _("Caches for stats") class Dashboard(object): def __init__( self, model, slice="year", date_source=None, show_detail=None, fltr=None ): if not fltr: fltr = {} # don't provide date_source if it is not relevant self.model = model self.total_number = model.get_total_number(fltr) self.show_detail = show_detail history_model = self.model.history.model # last edited - created self.recents, self.lasts = [], [] for last_lst, modif_type in ((self.lasts, "+"), (self.recents, "~")): last_ids = history_model.objects.values("id").annotate( hd=Max("history_date") ) last_ids = last_ids.filter(history_type=modif_type) from archaeological_finds.models import Find if self.model == Find: last_ids = last_ids.filter(downstream_treatment_id__isnull=True) if modif_type == "+": last_ids = last_ids.filter( upstream_treatment_id__isnull=True ) last_ids = last_ids.order_by("-hd").distinct().all()[:5] for idx in last_ids: try: obj = self.model.objects.get(pk=idx["id"]) except self.model.DoesNotExist: # deleted object are always referenced in history continue obj.history_date = idx["hd"] last_lst.append(obj) # years base_kwargs = {"fltr": fltr.copy()} if date_source: base_kwargs["date_source"] = date_source periods_kwargs = copy.deepcopy(base_kwargs) periods_kwargs["slice"] = slice self.periods = model.get_periods(**periods_kwargs) self.periods = list(set(self.periods)) self.periods.sort() if not self.total_number or not self.periods: return kwargs_num = copy.deepcopy(base_kwargs) self.serie_labels = [_("Total")] # numbers if slice == "year": self.values = [("year", "", list(reversed(self.periods)))] self.numbers = [ model.get_by_year(year, **kwargs_num).count() for year in self.periods ] self.values += [ ("number", _("Number"), list(reversed(self.numbers))) ] if slice == "month": periods = list(reversed(self.periods)) self.periods = [ "%d-%s-01" % (p[0], ("0" + str(p[1])) if len(str(p[1])) == 1 else p[1]) for p in periods ] self.values = [("month", "", self.periods)] if show_detail: for dpt, lbl in settings.ISHTAR_DPTS: self.serie_labels.append(str(dpt)) idx = "number_" + str(dpt) kwargs_num["fltr"]["towns__numero_insee__startswith"] = str( dpt ) numbers = [ model.get_by_month( *p.split("-")[:2], **kwargs_num ).count() for p in self.periods ] self.values += [(idx, dpt, list(numbers))] # put "Total" at the end self.serie_labels.append(self.serie_labels.pop(0)) kwargs_num = base_kwargs.copy() self.numbers = [ model.get_by_month(*p.split("-")[:2], **kwargs_num).count() for p in self.periods ] self.values += [("number", _("Total"), list(self.numbers))] # calculate self.average = self.get_average() self.variance = self.get_variance() self.standard_deviation = self.get_standard_deviation() self.median = self.get_median() self.mode = self.get_mode() # by operation if not hasattr(model, "get_by_operation"): return operations = model.get_operations() operation_numbers = [ model.get_by_operation(op).count() for op in operations ] # calculate self.operation_average = self.get_average(operation_numbers) self.operation_variance = self.get_variance(operation_numbers) self.operation_standard_deviation = self.get_standard_deviation( operation_numbers ) self.operation_median = self.get_median(operation_numbers) operation_mode_pk = self.get_mode( dict(zip(operations, operation_numbers)) ) if operation_mode_pk: from archaeological_operations.models import Operation self.operation_mode = str( Operation.objects.get(pk=operation_mode_pk) ) def get_average(self, vals=None): if not vals: vals = self.numbers[:] return sum(vals) / len(vals) def get_variance(self, vals=None): if not vals: vals = self.numbers[:] avrg = self.get_average(vals) return self.get_average([(x - avrg) ** 2 for x in vals]) def get_standard_deviation(self, vals=None): if not vals: vals = self.numbers[:] return round(self.get_variance(vals) ** 0.5, 3) def get_median(self, vals=None): if not vals: vals = self.numbers[:] len_vals = len(vals) vals.sort() if (len_vals % 2) == 1: return vals[int(len_vals / 2)] else: return (vals[int(len_vals / 2) - 1] + vals[int(len_vals / 2)]) / 2.0 def get_mode(self, vals=None): if not vals: vals = dict(zip(self.periods, self.numbers[:])) mx = max(vals.values()) for v in vals: if vals[v] == mx: return v class DocumentTemplate(models.Model): name = models.CharField(_("Name"), max_length=100) slug = models.SlugField(_("Slug"), max_length=100, unique=True) associated_model = models.ForeignKey(ImporterModel) template = models.FileField( _("Template"), upload_to="templates/%Y/", blank=True, null=True, help_text=max_size_help(), ) label_template = models.FileField( _("Base template for labels"), upload_to="templates/%Y/", blank=True, null=True, help_text=max_size_help(), ) label_targets = models.TextField( _("Labels: targets for labels in the LibreOffice file"), blank=True, null=True, help_text=_( "Each target is separated by a semi-colon. The first " "target is the name of the object including the data in " "base template. Following targets will be filled with the " "content of the first target. For instance: " '"Cadre1;Cadre2;Cadre3;Cadre4;Cadre5;Cadre6" for a ' "sheet with 6 labels." ), ) available = models.BooleanField(_("Available"), default=True) for_labels = models.BooleanField(_("Used for labels"), default=False) label_per_page = models.IntegerField( _("Number of label per page"), blank=True, null=True, help_text=_("Only relevant for label template"), ) objects = SlugModelManager() SERIALIZATION_FILES = ("template",) class Meta: verbose_name = _("Document template") verbose_name_plural = _("Document templates") ordering = ["associated_model", "name"] def __str__(self): return self.name def natural_key(self): return (self.slug,) def clean(self): if self.for_labels and not self.label_per_page: raise ValidationError( _( "For label template, you must provide " "number of label per page." ) ) def generate_label_template(self): if not self.label_template.name or not self.label_targets: return targets = self.label_targets.split(";") base_target = targets[0] try: with zipfile.ZipFile(self.label_template.path) as zip: with zip.open("content.xml") as content: soup = BeautifulSoup(content.read(), "xml") base_content = soup.find( "draw:frame", attrs={"draw:name": base_target} ) if not base_content: return base_content = base_content.contents except (FileNotFoundError, zipfile.BadZipFile, KeyError): base_content = None if not base_content: return for idx, target in enumerate(targets[1:]): replace_str = "items." + str(idx + 1) new_content = [] for content in base_content: content = copy.copy(content) for text in content.find_all(text=re.compile("items.0")): fixed_text = text.replace("items.0", replace_str) text.replace_with(fixed_text) for image in content.find_all( attrs={"draw:name": re.compile("items.0")} ): image["draw:name"] = image["draw:name"].replace( "items.0", replace_str ) new_content.append(content) next_target = soup.find("draw:frame", attrs={"draw:name": target}) if next_target: next_target.contents = new_content with tempfile.TemporaryDirectory() as tmp: sp = self.label_template.name.split(os.sep)[-1].split(".") if len(sp) == 1: # no extension? sp.append("odt") sp[-2] += "-label" new_filename = ".".join(sp) new_file = os.path.join(tmp, new_filename) with zipfile.ZipFile(new_file, "w") as zip_out: with zipfile.ZipFile(self.label_template.path, "r") as zip_in: zip_out.comment = zip_in.comment for item in zip_in.infolist(): if item.filename != "content.xml": zip_out.writestr(item, zip_in.read(item.filename)) with zipfile.ZipFile( new_file, mode="a", compression=zipfile.ZIP_DEFLATED ) as zf: zf.writestr("content.xml", str(soup)) media_dir = "templates/{}/".format(datetime.date.today().year) full_media_dir = os.path.join(settings.MEDIA_ROOT, media_dir) if not os.path.exists(full_media_dir): os.mkdir(full_media_dir) media_file = new_filename idx = 0 while os.path.exists(os.path.join(settings.MEDIA_ROOT, media_file)): idx += 1 sp = media_file.split(".") sub_sp = sp[-2].split("-label") sub_sp[-1] += str(idx) sp[-2] = "-label".join(sub_sp) media_file = ".".join(sp) with open(new_file, "rb") as file: with ContentFile(file.read()) as file_content: self.template.save(media_file, file_content) self.save() def save(self, *args, **kwargs): if not self.slug: self.slug = create_slug(DocumentTemplate, self.name) super(DocumentTemplate, self).save(*args, **kwargs) if ( self.label_template.name and self.label_targets and not self.template ): self.generate_label_template() @classmethod def get_tuples(cls, dct=None, empty_first=True): if not dct: dct = {} dct["available"] = True if empty_first: yield "", "----------" items = cls.objects.filter(**dct) for item in items.distinct().order_by(*cls._meta.ordering).all(): yield item.pk, _(str(item)) def get_baselink_for_labels(self): return reverse("generate-labels", args=[self.slug]) def _exclude_filter(self, value): """ value is excluded from values to fetch? """ if not value or value in ("in", "not", "el") or value.startswith("|"): return True try: int(value) return True except ValueError: # not a single int pass return False def get_filter(self, template, regexp_list=None): if not regexp_list: return None z = zipfile.ZipFile(template) content = z.open("content.xml") full_content = content.read().decode("utf-8") filtr = [] for regexp in regexp_list: iter = re.finditer(regexp, full_content) for s in iter: key = s.groups()[0] if key not in filtr: filtr.append(key) new_filter = [] OPERATORS = [ "==", "not", "in", ">", "<", "!=", ">", "<", ">=", "<=", "or", ">=", "<=", ] for fil in filtr: if not fil: continue new_filter += [ f for f in fil.split(" ") if f and f not in OPERATORS ] filtr = new_filter new_filter = [] for fil in filtr: keys = fil.strip().split("|")[0].split(".") new_filter += [k for k in keys if not self._exclude_filter(k)] prefix = "" for k in keys: if self._exclude_filter(k): continue if prefix: prefix += "_" if prefix + k in new_filter: continue new_filter.append(prefix + k) prefix += k return list(set(new_filter)) ITEM_RE = r"([A-Za-z0-9_.]*)(?:[\[\]0-9-:])*(?:\|[^}]+)*" BASE_RE = [ # {{ key1.key2 }} r"{{ *" + ITEM_RE + " *}}", # {% for item in key1.key2 %} r"{% *for +[A-Za-z0-9_]+ +in +" + ITEM_RE + r" *%}", # {% if ** %} r"{% (?:el)*if ([^}]*)%}", ] def publish(self, c_object): tempdir = tempfile.mkdtemp("-ishtardocs") output_name = ( tempdir + os.path.sep + slugify(self.name.replace(" ", "_").lower()) + "-" + datetime.date.today().strftime("%Y-%m-%d") + "." + self.template.name.split(".")[-1] ) filtr = self.get_filter(self.template, self.BASE_RE) # values = c_object.get_values(filtr=[]) if "VALUES" in filtr: filtr = [] values = c_object.get_values(filtr=filtr) if not filtr or "VALUES" in filtr: values["VALUES"] = json.dumps( values, indent=4, sort_keys=True, skipkeys=True, ensure_ascii=False, separators=("", " : "), ).replace(" " * 4, "\t") engine = IshtarSecretaryRenderer() try: result = engine.render(self.template, **values) except TemplateSyntaxError as e: raise TemplateSyntaxError(str(e), e.lineno) except UndefinedError as e: raise TemplateSyntaxError(str(e), 0) except Exception as e: raise TemplateSyntaxError(str(e), 0) with open(output_name, "wb") as output: output.write(result) return output_name LABEL_ITEM_RE = r"items\.\d\.([A-Za-z0-9_.]*)(?:[\[\]0-9-:])*(?:\|[^}]+)*" LABEL_RE = [ # {{items.4.key}} r"{{ *" + LABEL_ITEM_RE + r" *}}", # {% if ** %} r"{% (?:el)*if ([^}]*)%}", # {% for item in items.42.another_keys %} r"{% *for +[A-Za-z0-9_]+ +in +" + LABEL_ITEM_RE + r" *%}", ] def publish_labels(self, objects): if not objects: return tempdir = tempfile.mkdtemp("-ishtarlabels") main_output_name = ( tempdir + os.path.sep + slugify(self.name.replace(" ", "_").lower()) + "-" + datetime.datetime.now().strftime("%Y-%m-%d-%H%M%S") ) suffix = "." + self.template.name.split(".")[-1] len_objects = len(objects) names = [] filtr = self.get_filter(self.template, self.LABEL_RE) for idx in range(int(len(objects) / self.label_per_page) + 1): if idx * self.label_per_page >= len_objects: break values = {"items": []} for subidx in range(self.label_per_page): c_idx = idx * self.label_per_page + subidx if c_idx >= len_objects: break obj = objects[c_idx] values["items"].append(obj.get_values(filtr=filtr)) engine = IshtarSecretaryRenderer() try: result = engine.render(self.template, **values) except TemplateSyntaxError as e: raise TemplateSyntaxError(str(e), e.lineno) output_name = main_output_name + "-" + str(idx) + suffix names.append(output_name) with open(output_name, "wb") as output: output.write(result) output_name = main_output_name + suffix o = OOoPy(infile=names[0], outfile=output_name) if len(names) > 1: t = OOTransformer( o.mimetype, OOTransforms.get_meta(o.mimetype), OOTransforms.Concatenate(*(names[1:])), OOTransforms.renumber_all(o.mimetype), OOTransforms.set_meta(o.mimetype), OOTransforms.Fix_OOo_Tag(), OOTransforms.Manifest_Append(), ) t.transform(o) o.close() return output_name class Area(HierarchicalType): towns = models.ManyToManyField( Town, verbose_name=_("Towns"), blank=True, related_name="areas" ) reference = models.CharField( _("Reference"), max_length=200, blank=True, null=True ) parent = models.ForeignKey( "self", blank=True, null=True, verbose_name=_("Parent"), help_text=_("Only four level of parent are managed."), related_name="children", on_delete=models.SET_NULL, ) class Meta: verbose_name = _("Area") verbose_name_plural = _("Areas") ordering = ("label",) def __str__(self): if not self.reference: return self.label return "{} ({})".format(self.label, self.reference) @property def full_label(self): label = [str(self)] if self.parent and self.parent.pk != self.pk: label.append(self.parent.full_label) return " / ".join(label) GENDER = ( ("M", _("Male")), ("F", _("Female")), ("N", _("Neutral")), ) def documentation_get_gender_values(): doc = "" for idx, gender in enumerate(GENDER): key, label = gender if idx: doc += " ;" doc += ' "{}": {}'.format(key, label) return doc class BaseGenderedType(ValueGetter): def get_values(self, prefix="", **kwargs): dct = super(BaseGenderedType, self).get_values(prefix=prefix, **kwargs) assert hasattr(self, "grammatical_gender") dct[prefix + "grammatical_gender"] = self.grammatical_gender return dct class GenderedType(BaseGenderedType, GeneralType): grammatical_gender = models.CharField( _("Grammatical gender"), max_length=1, choices=GENDER, blank=True, default="", help_text=documentation_get_gender_values, ) class Meta: abstract = True @classmethod def get_documentation_string(cls): """ Used for automatic documentation generation """ doc = super(GenderedType, cls).get_documentation_string() doc += ", **grammatical_gender** {} -".format(_("Grammatical gender")) doc += documentation_get_gender_values() return doc class OrganizationType(GenderedType): class Meta: verbose_name = _("Organization type") verbose_name_plural = _("Organization types") ordering = ("label",) def get_orga_planning_service_label(): if apps.ready: lbl = get_general_type_label(OrganizationType, "planning_service") if lbl: return lbl return _("Error: planning_service type is missing") def get_orga_general_contractor_label(): if apps.ready: lbl = get_general_type_label(OrganizationType, "general_contractor") if lbl: return lbl return _("Error: general_contractor type is missing") post_save.connect(post_save_cache, sender=OrganizationType) post_delete.connect(post_save_cache, sender=OrganizationType) organization_type_pk_lazy = lazy(OrganizationType.get_or_create_pk, str) organization_type_pks_lazy = lazy(OrganizationType.get_or_create_pks, str) class Organization( Address, Merge, OwnPerms, BaseGenderedType, ValueGetter, MainItem ): TABLE_COLS = ("name", "organization_type", "town") SLUG = "organization" SHOW_URL = "show-organization" DELETE_URL = "delete-organization" # search parameters EXTRA_REQUEST_KEYS = {} BASE_SEARCH_VECTORS = [ SearchVectorConfig("name"), SearchVectorConfig("town"), ] # alternative names of fields for searches ALT_NAMES = { "name": SearchAltName( pgettext_lazy("key for text search", "name"), "name__iexact" ), "organization_type": SearchAltName( pgettext_lazy("key for text search", "type"), "organization_type__label__iexact", ), } QA_EDIT = QuickAction( url="organization-qa-bulk-update", icon_class="fa fa-pencil", text=_("Bulk update"), target="many", rights=["change_organization"], ) QUICK_ACTIONS = [QA_EDIT] objects = UUIDModelManager() # fields uuid = models.UUIDField(default=uuid.uuid4) name = models.CharField(_("Name"), max_length=500) organization_type = models.ForeignKey( OrganizationType, verbose_name=_("Type") ) url = models.URLField(verbose_name=_("Web address"), blank=True, null=True) grammatical_gender = models.CharField( _("Grammatical gender"), max_length=1, choices=GENDER, blank=True, default="", help_text=documentation_get_gender_values, ) cached_label = models.TextField( _("Cached name"), blank=True, default="", db_index=True ) DOWN_MODEL_UPDATE = ["members"] class Meta: verbose_name = _("Organization") verbose_name_plural = _("Organizations") permissions = ( ("view_organization", "Can view all Organizations"), ("view_own_organization", "Can view own Organization"), ("add_own_organization", "Can add own Organization"), ("change_own_organization", "Can change own Organization"), ("delete_own_organization", "Can delete own Organization"), ) indexes = [ GinIndex(fields=["data"]), ] def simple_lbl(self): if self.name: return self.name return "{} - {}".format(self.organization_type, self.town or "") def natural_key(self): return (self.uuid,) def __str__(self): return self.cached_label or "" def _generate_cached_label(self): if self.name: return self.name attrs = ["organization_type", "address", "town"] items = [ str(getattr(self, attr)) for attr in attrs if getattr(self, attr) ] if not items: items = [str(_("unknown organization"))] return " - ".join(items) def generate_merge_key(self): self.merge_key = slugify(self.name or "") if not self.merge_key: self.merge_key = self.EMPTY_MERGE_KEY if self.town: self.merge_key += "-" + slugify(self.town or "") if self.address: self.merge_key += "-" + slugify(self.address or "") @property def associated_filename(self): values = [ str(getattr(self, attr)) for attr in ("organization_type", "name") if getattr(self, attr) ] return slugify("-".join(values)) @classmethod @pre_importer_action def import_get_publisher_type(cls, context, value): if context["name"]: q = OrganizationType.objects.filter(txt_idx="publisher") if not q.count(): return context["organization_type"] = q.all()[0] post_save.connect(cached_label_changed, sender=Organization) class PersonType(GeneralType): class Meta: verbose_name = _("Person type") verbose_name_plural = _("Person types") ordering = ("label",) post_save.connect(post_save_cache, sender=PersonType) post_delete.connect(post_save_cache, sender=PersonType) person_type_pk_lazy = lazy(PersonType.get_or_create_pk, str) person_type_pks_lazy = lazy(PersonType.get_or_create_pks, str) def get_sra_agent_head_scientist_label(): if apps.ready: lbl = get_general_type_label(PersonType, "head_scientist") if lbl: lbl += str(_("/")) lbl2 = get_general_type_label(PersonType, "sra_agent") if lbl2: lbl += lbl2 if lbl: return lbl return _("Error: sra_agent and head_scientist types are missing") def get_sra_agent_label(): if apps.ready: lbl = get_general_type_label(PersonType, "sra_agent") if lbl: return lbl return _("Error: sra_agent type is missing") def get_general_contractor_label(): if apps.ready: lbl = get_general_type_label(PersonType, "general_contractor") if lbl: return lbl return _("Error: general_contractor type is missing") def get_responsible_planning_service_label(): if apps.ready: lbl = get_general_type_label(PersonType, "responsible_planning_service") if lbl: return lbl return _("Error: responsible_planning_service type is missing") def get_publisher_label(): if apps.ready: lbls = [] for key in settings.ISHTAR_SLUGS["document-publisher"]: lbl = get_general_type_label(OrganizationType, key) if lbl: lbls.append(lbl) if lbls: return " ; ".join(lbls) return _("Error: publisher type is missing") def get_operator_label(): if apps.ready: lbl = get_general_type_label(OrganizationType, "operator") if lbl: return lbl return _("Error: operator type is missing") class TitleType(GenderedType): long_title = models.TextField(_("Long title"), default="", blank=True) class Meta: verbose_name = _("Title type") verbose_name_plural = _("Title types") ordering = ("label",) @classmethod def get_documentation_string(cls): """ Used for automatic documentation generation """ doc = super(TitleType, cls).get_documentation_string() doc += ", **long_title** {}".format(_("Long title")) return doc def get_values(self, prefix="", **kwargs): dct = super(TitleType, self).get_values(prefix=prefix, **kwargs) dct[prefix + "long_title"] = self.long_title return dct post_save.connect(post_save_cache, sender=TitleType) post_delete.connect(post_save_cache, sender=TitleType) class Person(Address, Merge, OwnPerms, ValueGetter, MainItem): SLUG = "person" _prefix = "person_" TYPE = ( ("Mr", _("Mr")), ("Ms", _("Miss")), ("Mr and Miss", _("Mr and Mrs")), ("Md", _("Mrs")), ("Dr", _("Doctor")), ) TABLE_COLS = ( "name", "surname", "raw_name", "email", "person_types_list", "attached_to", "town", ) TABLE_COLS_ACCOUNT = ( "name", "surname", "raw_name", "email", "profiles_list", "attached_to", "town", ) SHOW_URL = "show-person" MODIFY_URL = "person_modify" DELETE_URL = "person_delete" BASE_SEARCH_VECTORS = [ SearchVectorConfig("name"), SearchVectorConfig("surname"), SearchVectorConfig("raw_name"), SearchVectorConfig("town"), SearchVectorConfig("attached_to__name"), SearchVectorConfig("email"), ] # search parameters REVERSED_BOOL_FIELDS = ["ishtaruser__isnull"] EXTRA_REQUEST_KEYS = { "ishtaruser__isnull": "ishtaruser__isnull", "attached_to": "attached_to", } COL_LABELS = {"attached_to": _("Organization")} # alternative names of fields for searches ALT_NAMES = { "name": SearchAltName( pgettext_lazy("key for text search", "name"), "name__iexact" ), "surname": SearchAltName( pgettext_lazy("key for text search", "surname"), "surname__iexact" ), "email": SearchAltName( pgettext_lazy("key for text search", "email"), "email__iexact" ), "person_types": SearchAltName( pgettext_lazy("key for text search", "type"), "person_types__label__iexact", ), "attached_to": SearchAltName( pgettext_lazy("key for text search", "organization"), "attached_to__cached_label__iexact", ), "ishtaruser__isnull": SearchAltName( pgettext_lazy("key for text search", "has-account"), "ishtaruser__isnull", ), } QA_EDIT = QuickAction( url="person-qa-bulk-update", icon_class="fa fa-pencil", text=_("Bulk update"), target="many", rights=["change_person"], ) QUICK_ACTIONS = [QA_EDIT] objects = UUIDModelManager() # fields uuid = models.UUIDField(default=uuid.uuid4) old_title = models.CharField( _("Title"), max_length=100, choices=TYPE, blank=True, null=True ) title = models.ForeignKey( TitleType, verbose_name=_("Title"), on_delete=models.SET_NULL, blank=True, null=True, ) salutation = models.CharField( _("Salutation"), max_length=200, blank=True, null=True ) surname = models.CharField( _("Surname"), max_length=50, blank=True, null=True, help_text=_( "Attention, historical and unfortunate residue in the " "code of an initial translation error." ), ) name = models.CharField(_("Name"), max_length=200, blank=True, null=True) raw_name = models.CharField( _("Raw name"), max_length=300, blank=True, null=True ) contact_type = models.CharField( _("Contact type"), max_length=300, blank=True, null=True ) comment = models.TextField(_("Comment"), blank=True, default="") person_types = models.ManyToManyField(PersonType, verbose_name=_("Types")) attached_to = models.ForeignKey( "Organization", related_name="members", on_delete=models.SET_NULL, verbose_name=_("Is attached to"), blank=True, null=True, ) cached_label = models.TextField( _("Cached name"), blank=True, default="", db_index=True ) DOWN_MODEL_UPDATE = ["author"] class Meta: verbose_name = _("Person") verbose_name_plural = _("Persons") indexes = [ GinIndex(fields=["data"]), ] permissions = ( ("view_person", "Can view all Persons"), ("view_own_person", "Can view own Person"), ("add_own_person", "Can add own Person"), ("change_own_person", "Can change own Person"), ("delete_own_person", "Can delete own Person"), ) def natural_key(self): return (self.uuid,) @property def first_name(self): return self.surname @property def full_title(self): return " ".join( str(getattr(self, attr)) for attr in ["title", "salutation"] if getattr(self, attr) ) @property def current_profile(self): q = self.profiles.filter(current=True) if q.count(): return q.all()[0] q = self.profiles nb = q.count() if not nb: return # arbitrary set the first one as the current profile = q.all()[0] profile.current = True profile.save() return profile def get_short_html_items(self): items = super(Person, self).get_short_html_items() if items or not self.attached_to: return items orga_address = self.attached_to.get_short_html_items() if not orga_address: return [] items.append( """{}""".format( self.attached_to.name ) ) items += orga_address return items def simple_lbl(self): values = [ str(getattr(self, attr)) for attr in ("surname", "name") if getattr(self, attr) ] if not values and self.raw_name: values = [self.raw_name] return " ".join(values) def __str__(self): return self.cached_label or "" def _generate_cached_label(self): lbl = get_generated_id("person_raw_name", self) if not lbl: return "-" if self.attached_to: attached_to = str(self.attached_to) lbl += " ({})".format(attached_to) return lbl def fancy_str(self): values = [""] values += [ str(getattr(self, attr)) for attr in ("surname", "name") if getattr(self, attr) ] if not values and self.raw_name: values += [self.raw_name] values += [""] if self.attached_to: attached_to = str(self.attached_to) if values: values.append("-") values.append(attached_to) return " ".join(values) def get_values(self, prefix="", no_values=False, filtr=None, **kwargs): values = super(Person, self).get_values( prefix=prefix, no_values=no_values, filtr=filtr, **kwargs ) if not self.attached_to: values.update( Person.get_empty_values(prefix=prefix + "attached_to_") ) return values person_types_list_lbl = _("Types") @property def person_types_list(self): return ", ".join(str(pt) for pt in self.person_types.all()) profiles_list_lbl = _("Profiles") @property def profiles_list(self): return ", ".join(str(p) for p in self.profiles.all()) def generate_merge_key(self): if self.name and self.name.strip(): self.merge_key = slugify(self.name.strip()) + ( ("-" + slugify(self.surname.strip())) if self.surname else "" ) elif self.raw_name and self.raw_name.strip(): self.merge_key = slugify(self.raw_name.strip()) elif self.attached_to: self.merge_key = self.attached_to.merge_key else: self.merge_key = self.EMPTY_MERGE_KEY if self.merge_key != self.EMPTY_MERGE_KEY and self.attached_to: self.merge_key += "-" + self.attached_to.merge_key def is_natural(self): return not self.attached_to def has_right(self, right_name, session=None, obj=None): if "." in right_name: right_name = right_name.split(".")[-1] res, cache_key = "", "" if session: cache_key = "session-{}-{}".format(session.session_key, right_name) res = cache.get(cache_key) if res in (True, False): return res # list all cache key in order to clean them on profile change cache_key_list = "sessionlist-{}".format(session.session_key) key_list = cache.get(cache_key_list, []) key_list.append(cache_key) cache.set(cache_key_list, key_list, settings.CACHE_TIMEOUT) if type(right_name) in (list, tuple): res = ( bool( self.profiles.filter( current=True, profile_type__txt_idx__in=right_name ).count() ) or bool( self.profiles.filter( current=True, profile_type__groups__permissions__codename__in=right_name, ).count() ) or bool( self.ishtaruser.user_ptr.groups.filter( permissions__codename__in=right_name ).count() ) or bool( self.ishtaruser.user_ptr.user_permissions.filter( codename__in=right_name ).count() ) ) else: res = ( bool( self.profiles.filter( current=True, profile_type__txt_idx=right_name ).count() ) or bool( self.profiles.filter( current=True, profile_type__groups__permissions__codename=right_name, ).count() ) or bool( self.ishtaruser.user_ptr.groups.filter( permissions__codename__in=[right_name] ).count() ) or bool( self.ishtaruser.user_ptr.user_permissions.filter( codename__in=[right_name] ).count() ) ) if session: cache.set(cache_key, res, settings.CACHE_TIMEOUT) return res def full_label(self): values = [] if self.title: values = [self.title.label] values += [ str(getattr(self, attr)) for attr in ("salutation", "surname", "name") if getattr(self, attr) ] if not values and self.raw_name: values = [self.raw_name] if self.attached_to: values.append("- " + str(self.attached_to)) return " ".join(values) @property def associated_filename(self): values = [ str(getattr(self, attr)) for attr in ("surname", "name", "attached_to") if getattr(self, attr) ] return slugify("-".join(values)) def docs_q(self): return Document.objects.filter(authors__person=self) def operation_docs_q(self): return Document.objects.filter( authors__person=self, operations__pk__isnull=False ) def contextrecord_docs_q(self): return Document.objects.filter( authors__person=self, context_records__pk__isnull=False ) def find_docs_q(self): return Document.objects.filter( authors__person=self, finds__pk__isnull=False ) def save(self, *args, **kwargs): super(Person, self).save(*args, **kwargs) raw_name = get_generated_id("person_raw_name", self) if raw_name and self.raw_name != raw_name: self.raw_name = raw_name self.save() if hasattr(self, "responsible_town_planning_service"): for fle in self.responsible_town_planning_service.all(): fle.save() # force update of raw_town_planning_service if hasattr(self, "general_contractor"): for fle in self.general_contractor.all(): fle.save() # force update of raw_general_contractor @classmethod def get_query_owns(cls, ishtaruser): return ( Q( operation_scientist_responsability__collaborators__ishtaruser=ishtaruser ) | Q( operation_scientist_responsability__scientist__ishtaruser=ishtaruser ) | Q(operation_collaborator__collaborators__ishtaruser=ishtaruser) | Q(operation_collaborator__scientist__ishtaruser=ishtaruser) ) post_save.connect(cached_label_changed, sender=Person) class ProfileType(GeneralType): groups = models.ManyToManyField(Group, verbose_name=_("Groups"), blank=True) class Meta: verbose_name = _("Profile type") verbose_name_plural = _("Profile types") ordering = ("label",) post_save.connect(post_save_cache, sender=ProfileType) post_delete.connect(post_save_cache, sender=ProfileType) class ProfileTypeSummary(ProfileType): class Meta: proxy = True verbose_name = _("Profile type summary") verbose_name_plural = _("Profile types summary") class UserProfile(models.Model): name = models.CharField(_("Name"), blank=True, default="", max_length=100) profile_type = models.ForeignKey( ProfileType, verbose_name=_("Profile type") ) areas = models.ManyToManyField( "Area", verbose_name=_("Areas"), blank=True, related_name="profiles" ) current = models.BooleanField(_("Current profile"), default=False) show_field_number = models.BooleanField( _("Show field number"), default=False ) auto_pin = models.BooleanField(_("Automatically pin"), default=False) display_pin_menu = models.BooleanField(_("Display pin menu"), default=False) person = models.ForeignKey( Person, verbose_name=_("Person"), related_name="profiles" ) class Meta: verbose_name = _("User profile") verbose_name_plural = _("User profiles") unique_together = (("name", "profile_type", "person"),) def __str__(self): lbl = self.name or str(self.profile_type) if not self.areas.count(): return lbl return "{} ({})".format( lbl, ", ".join(str(area) for area in self.areas.all()) ) @property def query_towns(self): return Town.objects.filter( Q(areas__profiles=self) | Q(areas__parent__profiles=self) | Q(areas__parent__parent__profiles=self) | Q(areas__parent__parent__parent__profiles=self) | Q(areas__parent__parent__parent__parent__profiles=self) ) @property def area_labels(self): return ", ".join(str(area) for area in self.areas.all()) def duplicate(self, **kwargs): areas = [area for area in self.areas.all()] new_item = self new_item.pk = None name = self.name for key in kwargs: if key == "name": name = kwargs[key] setattr(new_item, key, kwargs[key]) while UserProfile.objects.filter( name=name, profile_type=self.profile_type, person=self.person ).count(): name += str(_(" - duplicate")) new_item.name = name new_item.save() for area in areas: new_item.areas.add(area) return new_item def save( self, force_insert=False, force_update=False, using=None, update_fields=None, ): super(UserProfile, self).save( force_insert=force_insert, force_update=force_update, using=using, update_fields=update_fields, ) # only one current profile per user if not self.current: return q = UserProfile.objects.filter( person=self.person, current=True ).exclude(pk=self.pk) if not q.count(): return for p in q.all(): p.current = False p.save() def post_save_userprofile(sender, **kwargs): if not kwargs.get("instance"): return instance = kwargs.get("instance") try: instance.person.ishtaruser.show_field_number(update=True) except IshtarUser.DoesNotExist: return post_save.connect(post_save_userprofile, sender=UserProfile) class IshtarUser(FullSearch): SLUG = "ishtaruser" TABLE_COLS = ( "username", "person__name", "person__surname", "person__email", "person__person_types_list", "person__attached_to__name", ) BASE_SEARCH_VECTORS = [ SearchVectorConfig("user_ptr__username"), SearchVectorConfig("person__name"), SearchVectorConfig("person__surname"), SearchVectorConfig("person__email"), SearchVectorConfig("person__town"), SearchVectorConfig("person__attached_to__name"), ] CACHED_LABELS = [] # needed to force search vector update # search parameters EXTRA_REQUEST_KEYS = { "person__person_types_list": "person__person_types__label" } COL_LABELS = { "person__attached_to__name": _("Organization"), "username": _("Username"), } # alternative names of fields for searches ALT_NAMES = { "username": SearchAltName( pgettext_lazy("key for text search", "username"), "user_ptr__username__iexact", ), "name": SearchAltName( pgettext_lazy("key for text search", "name"), "person__name__iexact" ), "surname": SearchAltName( pgettext_lazy("key for text search", "surname"), "person__surname__iexact", ), "email": SearchAltName( pgettext_lazy("key for text search", "email"), "person__email__iexact", ), "person_types": SearchAltName( pgettext_lazy("key for text search", "type"), "person__person_types__label__iexact", ), "attached_to": SearchAltName( pgettext_lazy("key for text search", "organization"), "person__attached_to__cached_label__iexact", ), } # fields user_ptr = models.OneToOneField( User, primary_key=True, related_name="ishtaruser" ) person = models.OneToOneField( Person, verbose_name=_("Person"), related_name="ishtaruser" ) advanced_shortcut_menu = models.BooleanField( _("Advanced shortcut menu"), default=False ) class Meta: verbose_name = _("Ishtar user") verbose_name_plural = _("Ishtar users") def __str__(self): return str(self.person) def show_field_number(self, update=False): cache_key, value = get_cache(self.__class__, ["show_field_number"]) if not update and value is not None: return value value = False if self.current_profile: value = self.current_profile.show_field_number cache.set(cache_key, value, settings.CACHE_TIMEOUT) return value @property def current_profile_name(self): q = UserProfile.objects.filter(current=True, person__ishtaruser=self) if q.count(): vals = q.values("profile_type__label", "name").all()[0] return vals["name"] or vals["profile_type__label"] profile = self.person.current_profile if not profile: return "" return str(profile) @property def current_profile(self): return self.person.current_profile @classmethod def set_superuser(cls, user): q = cls.objects.filter(user_ptr=user) if not q.count(): return ishtaruser = q.all()[0] person = ishtaruser.person admin, created = ProfileType.objects.get_or_create( txt_idx="administrator" ) if user.is_superuser: if UserProfile.objects.filter( profile_type=admin, person=person ).count(): return UserProfile.objects.get_or_create( profile_type=admin, person=person, defaults={"current": True} ) @classmethod def create_from_user(cls, user): default = user.username surname = user.first_name or default name = user.last_name or default email = user.email person = Person.objects.create( surname=surname, name=name, email=email, history_modifier=user ) return cls.objects.create(user_ptr=user, person=person) def has_right(self, right_name, session=None): return self.person.has_right(right_name, session=session) def has_perm(self, perm, model=None, session=None, obj=None): return self.person.has_right(perm, session=session, obj=None) def full_label(self): return self.person.full_label() post_save.connect(cached_label_changed, sender=IshtarUser) class Basket(FullSearch, OwnPerms, ValueGetter, TemplateItem): """ Abstract class for a basket Subclass must be defined with an "items" ManyToManyField """ IS_BASKET = True uuid = models.UUIDField(default=uuid.uuid4) label = models.CharField(_("Label"), max_length=1000) comment = models.TextField(_("Comment"), blank=True, default="") slug = models.SlugField(_("Slug"), blank=True, null=True) public = models.BooleanField(_("Public"), default=False) user = models.ForeignKey( IshtarUser, blank=True, null=True, related_name="%(class)ss", on_delete=models.SET_NULL, verbose_name=_("Owner"), ) available = models.BooleanField(_("Available"), default=True) shared_with = models.ManyToManyField( IshtarUser, verbose_name=_("Shared (read) with"), blank=True, related_name="shared_%(class)ss", ) shared_write_with = models.ManyToManyField( IshtarUser, verbose_name=_("Shared (read/edit) with"), blank=True, related_name="shared_write_%(class)ss", ) objects = UUIDModelManager() TABLE_COLS = ["label", "user"] BASE_SEARCH_VECTORS = [ SearchVectorConfig("label"), SearchVectorConfig("comment", "local"), ] PARENT_SEARCH_VECTORS = ["user"] # M2M_SEARCH_VECTORS = [SearchVectorConfig('items')] CACHED_LABELS = [] # needed to force search vector update class Meta: abstract = True ordering = ("label",) unique_together = (("label", "user"),) def __str__(self): return self.label def natural_key(self): return (self.uuid,) @classmethod def BASE_REQUEST(cls, request): if not request.user or not getattr(request.user, "ishtaruser", None): return Q(pk=None) ishtaruser = request.user.ishtaruser return ( Q(user=ishtaruser) | Q(shared_with=ishtaruser) | Q(shared_write_with=ishtaruser) ) @property def cached_label(self): return str(self) @property def full_label(self): return "{} - {} ({})".format(self.label, self.user, self.items.count()) @classmethod def get_short_menu_class(cls, pk): return "basket" @property def associated_filename(self): return "{}-{}".format( datetime.date.today().strftime("%Y-%m-%d"), slugify(self.label) ) @classmethod def get_query_owns(cls, ishtaruser): return ( Q(user=ishtaruser) | Q(shared_with=ishtaruser) | Q(shared_write_with=ishtaruser) ) @classmethod def get_write_query_owns(cls, ishtaruser): return Q(user=ishtaruser) | Q(shared_write_with=ishtaruser) def duplicate(self, label=None, ishtaruser=None): """ Duplicate the basket. Items in basket are copied but not shared users :param label: if provided use the name :param ishtaruser: if provided an alternate user is used :return: the new basket """ through = self.items.through basket_pk = "{}_id".format(self.SLUG) item_pk = "{}_id".format(self.items.model.SLUG) q = through.objects.filter(**{basket_pk: self.pk}) items = [ r[item_pk] for r in q.values("pk", item_pk).order_by("pk").all() ] new_item = self new_item.pk = None if ishtaruser: new_item.user = ishtaruser if not label: label = new_item.label while self.__class__.objects.filter( label=label, user=new_item.user ).count(): label += str(_(" - duplicate")) new_item.label = label new_item.save() for item in items: through.objects.create(**{basket_pk: new_item.pk, item_pk: item}) return new_item class AuthorType(GeneralType): order = models.IntegerField(_("Order"), default=1) class Meta: verbose_name = _("Author type") verbose_name_plural = _("Author types") ordering = ["order", "label"] post_save.connect(post_save_cache, sender=AuthorType) post_delete.connect(post_save_cache, sender=AuthorType) class Author(FullSearch): SLUG = "author" PARENT_SEARCH_VECTORS = ["person"] uuid = models.UUIDField(default=uuid.uuid4) person = models.ForeignKey( Person, verbose_name=_("Person"), related_name="author" ) author_type = models.ForeignKey(AuthorType, verbose_name=_("Author type")) cached_label = models.TextField( _("Cached name"), blank=True, default="", db_index=True ) objects = UUIDModelManager() class Meta: verbose_name = _("Author") verbose_name_plural = _("Authors") ordering = ("author_type__order", "person__name") permissions = ( ("view_author", "Can view all Authors"), ("view_own_author", "Can view own Author"), ("add_own_author", "Can add own Author"), ("change_own_author", "Can change own Author"), ("delete_own_author", "Can delete own Author"), ) def __str__(self): return self.cached_label or "" def natural_key(self): return (self.uuid,) def _generate_cached_label(self): return str(self.person) + settings.JOINT + str(self.author_type) def fancy_str(self): return self.person.fancy_str() + settings.JOINT + str(self.author_type) def related_sources(self): return ( list(self.treatmentsource_related.all()) + list(self.operationsource_related.all()) + list(self.findsource_related.all()) + list(self.contextrecordsource_related.all()) ) def public_representation(self): return {"type": str(self.author_type), "person": str(self.person)} def merge(self, item, keep_old=False): merge_model_objects(self, item, keep_old=keep_old) def author_post_save(sender, **kwargs): if not kwargs.get("instance"): return cached_label_changed(sender, **kwargs) instance = kwargs.get("instance") q = Author.objects.filter( person=instance.person, author_type=instance.author_type ) if q.count() <= 1: return authors = list(q.all()) for author in authors[1:]: authors[0].merge(author) post_save.connect(author_post_save, sender=Author) class SourceType(HierarchicalType): coins_type = models.CharField( _("COInS export - type"), default="document", max_length=100 ) coins_genre = models.CharField( _("COInS export - genre"), blank=True, default="", max_length=100 ) is_localized = models.BooleanField( _("Is localized"), default=False, help_text=_("Setting a language for this type of document is relevant"), ) code = models.CharField(_("Code"), blank=True, default="", max_length=100) class Meta: verbose_name = _("Document type") verbose_name_plural = _("Document types") ordering = ["label"] post_save.connect(post_save_cache, sender=SourceType) post_delete.connect(post_save_cache, sender=SourceType) class SupportType(GeneralType): document_types = models.ManyToManyField( "SourceType", blank=True, related_name="supports", help_text=_("Only available for these document types"), ) class Meta: verbose_name = _("Support type") verbose_name_plural = _("Support types") post_save.connect(post_save_cache, sender=SupportType) post_delete.connect(post_save_cache, sender=SupportType) class Format(GeneralType): iframe_template = models.TextField( _("Iframe template"), blank=True, default="", help_text=_( "Template to insert an iframe for this format. Use django " "template with a {{document}} variable matching the " "current document." ), ) document_types = models.ManyToManyField( "SourceType", blank=True, related_name="formats", help_text=_("Only available for these document types"), ) class Meta: verbose_name = _("Format type") verbose_name_plural = _("Format types") ordering = ["label"] post_save.connect(post_save_cache, sender=Format) post_delete.connect(post_save_cache, sender=Format) class LicenseType(GeneralType): url = models.URLField(_("URL"), blank=True, null=True) class Meta: verbose_name = _("License type") verbose_name_plural = _("License types") ordering = ("label",) class DocumentTag(GeneralType): SLUG = "documenttag" class Meta: verbose_name = _("Document tag") verbose_name_plural = _("Document tags") ordering = ("label",) post_save.connect(post_save_cache, sender=LicenseType) post_delete.connect(post_save_cache, sender=LicenseType) class Document( BaseHistorizedItem, CompleteIdentifierItem, OwnPerms, ImageModel, ValueGetter, MainItem, ): APP = "ishtar-common" MODEL = "document" EXTERNAL_ID_KEY = "document_external_id" DELETE_URL = "delete-document" # order is important: put the image in the first match found # other will be symbolic links RELATED_MODELS = [ "treatment_files", "treatments", "finds", "context_records", "operations", "sites", "warehouses", "containers", "files", "administrativeacts", ] # same fields but in order for forms RELATED_MODELS_ALT = [ "finds", "context_records", "operations", "sites", "files", "administrativeacts", "warehouses", "containers", "treatments", "treatment_files", ] SLUG = "document" LINK_SPLIT = "<||>" GET_VALUES_EXCLUDE_FIELDS = ValueGetter.GET_VALUES_EXCLUDE_FIELDS + [ "warehouses", "operations", "treatments", "files", "treatment_files", "administrativeacts", "id", "associated_links", "source_type_id", "history_creator_id", "containers", "sites", "main_image_warehouses", "main_image_operations", "main_image_treatments", "main_image_files", "main_image_treatment_files", "main_image_id", "main_image_associated_links", "main_image_source_type_id", "main_image_history_creator_id", "main_image_containers", "main_image_sites", ] _TABLE_COLS = [ "title", "source_type", "cache_related_label", "authors__cached_label", "associated_url", ] COL_LINK = ["associated_url"] BASE_SEARCH_VECTORS = [ SearchVectorConfig("title"), SearchVectorConfig("source_type__label"), SearchVectorConfig("external_id"), SearchVectorConfig("reference"), SearchVectorConfig("internal_reference"), SearchVectorConfig("description", "local"), SearchVectorConfig("comment", "local"), SearchVectorConfig("additional_information", "local"), ] BASE_SEARCH_VECTORS += [ SearchVectorConfig("treatment_files__name"), SearchVectorConfig("treatments__cached_label"), SearchVectorConfig("finds__cached_label"), SearchVectorConfig("context_records__cached_label"), SearchVectorConfig("operations__cached_label"), SearchVectorConfig("sites__cached_label"), SearchVectorConfig("warehouses__name"), SearchVectorConfig("containers__cached_label"), SearchVectorConfig("files__cached_label"), ] PARENT_SEARCH_VECTORS = [ "authors", ] M2M_SEARCH_VECTORS = [ SearchVectorConfig("tags__label"), ] BOOL_FIELDS = ["duplicate"] COL_LABELS = { "authors__cached_label": _("Authors"), "complete_identifier": _("Identifier"), } CACHED_LABELS = ["cache_related_label"] CACHED_COMPLETE_ID = "" EXTRA_REQUEST_KEYS = { "operations": "operations__pk", "context_records": "context_records__pk", "context_records__operation": "context_records__operation__pk", "finds": "finds__pk", "finds__base_finds__context_record": "finds__base_finds__context_record__pk", "finds__base_finds__context_record__operation": "finds__base_finds__context_record__operation__pk", "authors__cached_label": "authors__cached_label", "complete_identifier": "complete_identifier", "authors__person__pk": "authors__person__pk", "container_id": "container_id", "publisher__pk": "publisher__pk", } # alternative names of fields for searches ALT_NAMES = { "authors": SearchAltName( pgettext_lazy("key for text search", "author"), "authors__cached_label__iexact", ), "publisher": SearchAltName( pgettext_lazy("key for text search", "publisher"), "publisher__name__iexact", ), "publishing_year": SearchAltName( pgettext_lazy("key for text search", "publishing-year"), "publishing_year", ), "title": SearchAltName( pgettext_lazy("key for text search", "title"), "title__iexact" ), "source_type": SearchAltName( pgettext_lazy("key for text search", "type"), "source_type__label__iexact", ), "reference": SearchAltName( pgettext_lazy("key for text search", "reference"), "reference__iexact", ), "internal_reference": SearchAltName( pgettext_lazy("key for text search", "internal-reference"), "internal_reference__iexact", ), "description": SearchAltName( pgettext_lazy("key for text search", "description"), "description__iexact", ), "tag": SearchAltName( pgettext_lazy("key for text search", "tag"), "tags__label__iexact" ), "format": SearchAltName( pgettext_lazy("key for text search", "format"), "format_type__label__iexact", ), "support": SearchAltName( pgettext_lazy("key for text search", "medium"), "support_type__label__iexact", ), "language": SearchAltName( pgettext_lazy("key for text search", "language"), "language__label__iexact", ), "licenses": SearchAltName( pgettext_lazy("key for text search", "license"), "licenses__label__iexact", ), "scale": SearchAltName( pgettext_lazy("key for text search", "scale"), "scale__iexact" ), "associated_url": SearchAltName( pgettext_lazy("key for text search", "url"), "associated_url__iexact", ), "isbn": SearchAltName( pgettext_lazy("key for text search", "isbn"), "isbn__iexact" ), "issn": SearchAltName( pgettext_lazy("key for text search", "issn"), "issn__iexact" ), "source": SearchAltName( pgettext_lazy("key for text search", "source"), "source__title__iexact", ), "source_free_input": SearchAltName( pgettext_lazy("key for text search", "source-free-input"), "source_free_input__iexact", ), "warehouse_container": SearchAltName( pgettext_lazy("key for text search", "warehouse-container"), "container__cached_label__iexact", ), "warehouse_container_ref": SearchAltName( pgettext_lazy( "key for text search", "warehouse-container-reference" ), "container_ref__cached_label__iexact", ), "comment": SearchAltName( pgettext_lazy("key for text search", "comment"), "comment__iexact" ), "additional_information": SearchAltName( pgettext_lazy("key for text search", "additional-information"), "additional_information__iexact", ), "duplicate": SearchAltName( pgettext_lazy("key for text search", "has-duplicate"), "duplicate" ), "operation": SearchAltName( pgettext_lazy("key for text search", "operation"), "operations__cached_label__iexact", ), "context_record": SearchAltName( pgettext_lazy("key for text search", "context-record"), "context_records__cached_label__iexact", ), "find_basket": SearchAltName( pgettext_lazy("key for text search", "basket-finds"), "finds__basket__label__iexact", ), "find": SearchAltName( pgettext_lazy("key for text search", "find"), "finds__cached_label__iexact", ), "find__denomination": SearchAltName( pgettext_lazy("key for text search", "find-denomination"), "finds__denomination__iexact", ), "file": SearchAltName( pgettext_lazy("key for text search", "file"), "files__cached_label__iexact", ), "containers": SearchAltName( pgettext_lazy("key for text search", "container"), "containers__cached_label__iexact", ), "site": SearchAltName( pgettext_lazy("key for text search", "site"), "sites__cached_label__iexact", ), "warehouse": SearchAltName( pgettext_lazy("key for text search", "warehouse"), "warehouses__name__iexact", ), "image__isnull": SearchAltName( pgettext_lazy("key for text search", "has-image"), "image__isnull" ), "associated_file__isnull": SearchAltName( pgettext_lazy("key for text search", "has-file"), "associated_file__isnull", ), "receipt_date__before": SearchAltName( pgettext_lazy("key for text search", "receipt-date-before"), "receipt_date__lte", ), "receipt_date__after": SearchAltName( pgettext_lazy("key for text search", "receipt-date-after"), "receipt_date__gte", ), "receipt_date_in_documentation__before": SearchAltName( pgettext_lazy( "key for text search", "receipt-in-documentation-date-before" ), "receipt_date_in_documentation__lte", ), "receipt_date_in_documentation__after": SearchAltName( pgettext_lazy( "key for text search", "receipt-in-documentation-date-after" ), "receipt_date_in_documentation__gte", ), "creation_date__before": SearchAltName( pgettext_lazy("key for text search", "creation-date-before"), "creation_date__lte", ), "creation_date__after": SearchAltName( pgettext_lazy("key for text search", "creation-date-after"), "creation_date__gte", ), } ALT_NAMES.update(BaseHistorizedItem.ALT_NAMES) # search parameters REVERSED_BOOL_FIELDS = ["image__isnull", "associated_file__isnull"] DATED_FIELDS = [ "receipt_date__lte", "receipt_date__gte", "receipt_date_in_documentation__lte", "receipt_date_in_documentation__gte", "creation_date__lte", "creation_date__gte", ] objects = ExternalIdManager() RELATIVE_SESSION_NAMES = [ ("find", "finds__pk"), ("contextrecord", "context_records__pk"), ("operation", "operations__pk"), ("site", "sites__pk"), ("file", "files__pk"), ("warehouse", "warehouses__pk"), ("treatment", "treatments__pk"), ("treatmentfile", "treatment_files__pk"), ("administrativeact", "administrativeacts__pk"), ] UP_MODEL_QUERY = { "operation": ( pgettext_lazy("key for text search", "operation"), "cached_label", ), "contextrecord": ( pgettext_lazy("key for text search", "context-record"), "cached_label", ), "file": (pgettext_lazy("key for text search", "file"), "cached_label"), "find": (pgettext_lazy("key for text search", "find"), "cached_label"), "site": (pgettext_lazy("key for text search", "site"), "cached_label"), "warehouse": ( pgettext_lazy("key for text search", "warehouse"), "cached_label", ), "treatment": ( pgettext_lazy("key for text search", "treatment"), "cached_label", ), "treatmentfile": ( pgettext_lazy("key for text search", "treatment-file"), "cached_label", ), } QA_EDIT = QuickAction( url="document-qa-bulk-update", icon_class="fa fa-pencil", text=_("Bulk update"), target="many", rights=["change_document", "change_own_document"], ) QUICK_ACTIONS = [ QA_EDIT, QuickAction( url="document-qa-duplicate", icon_class="fa fa-clone", text=_("Duplicate"), target="one", rights=["change_document", "change_own_document"], ), QuickAction( url="document-qa-packaging", icon_class="fa fa-gift", text=_("Packaging"), target="many", rights=["change_document", "change_own_document"], module="warehouse", ), ] SERIALIZATION_FILES = ["image", "thumbnail", "associated_file"] title = models.TextField(_("Title"), blank=True, default="") associated_file = models.FileField( verbose_name=_("Associated file"), upload_to=get_image_path, blank=True, null=True, max_length=255, help_text=max_size_help(), ) index = models.IntegerField(verbose_name=_("Index"), blank=True, null=True) external_id = models.TextField(_("External ID"), blank=True, default="") reference = models.TextField(_("Ref."), blank=True, default="") internal_reference = models.TextField( _("Internal ref."), blank=True, default="" ) source_type = models.ForeignKey( SourceType, verbose_name=_("Type"), on_delete=models.SET_NULL, null=True, blank=True, ) publisher = models.ForeignKey( Organization, verbose_name=_("Publisher"), blank=True, null=True, related_name="publish", ) publishing_year = models.PositiveIntegerField( _("Year of publication"), blank=True, null=True ) licenses = models.ManyToManyField( LicenseType, verbose_name=_("License"), blank=True ) tags = models.ManyToManyField( DocumentTag, verbose_name=_("Tags"), blank=True ) language = models.ForeignKey( Language, verbose_name=_("Language"), blank=True, null=True ) issn = models.CharField(_("ISSN"), blank=True, null=True, max_length=10) isbn = models.CharField(_("ISBN"), blank=True, null=True, max_length=17) source = models.ForeignKey( "Document", verbose_name=_("Source"), blank=True, null=True, related_name="children", ) source_free_input = models.CharField( verbose_name=_("Source - free input"), blank=True, null=True, max_length=500, ) source_page_range = models.CharField( verbose_name=_("Source - page range"), blank=True, null=True, max_length=500, ) support_type = models.ForeignKey( SupportType, verbose_name=_("Medium"), on_delete=models.SET_NULL, blank=True, null=True, ) format_type = models.ForeignKey( Format, verbose_name=_("Format"), on_delete=models.SET_NULL, blank=True, null=True, ) scale = models.CharField(_("Scale"), max_length=30, null=True, blank=True) authors = models.ManyToManyField( Author, verbose_name=_("Authors"), related_name="documents" ) authors_raw = models.CharField( verbose_name=_("Authors (raw)"), blank=True, null=True, max_length=250 ) associated_url = models.URLField( blank=True, null=True, max_length=1000, verbose_name=_("Numerical ressource (web address)"), ) receipt_date = models.DateField( blank=True, null=True, verbose_name=_("Receipt date") ) creation_date = models.DateField( blank=True, null=True, verbose_name=_("Creation date") ) receipt_date_in_documentation = models.DateField( blank=True, null=True, verbose_name=_("Receipt date in documentation") ) item_number = models.IntegerField(_("Number of items"), default=1) description = models.TextField(_("Description"), blank=True, default="") container_id = models.PositiveIntegerField( verbose_name=_("Container ID"), blank=True, null=True ) # container = models.ForeignKey("archaeological_warehouse.Container") container_ref_id = models.PositiveIntegerField( verbose_name=_("Container ID"), blank=True, null=True ) # container_ref = models.ForeignKey("archaeological_warehouse.Container") comment = models.TextField(_("Comment"), blank=True, default="") additional_information = models.TextField( _("Additional information"), blank=True, default="" ) duplicate = models.NullBooleanField( _("Has a duplicate"), blank=True, null=True ) associated_links = models.TextField( _("Symbolic links"), blank=True, default="" ) cache_related_label = models.TextField( _("Related"), blank=True, default="", db_index=True, help_text=_("Cached value - do not edit"), ) class Meta: verbose_name = _("Document") verbose_name_plural = _("Documents") ordering = ("title",) permissions = ( ("view_document", ugettext("Can view all Documents")), ("view_own_document", ugettext("Can view own Document")), ("add_own_document", ugettext("Can add own Document")), ("change_own_document", ugettext("Can change own Document")), ("delete_own_document", ugettext("Can delete own Document")), ) indexes = [ GinIndex(fields=["data"]), ] def __str__(self): return self.title @classmethod def TABLE_COLS(cls): cols = cls._TABLE_COLS[:] profile = get_current_profile() if profile.document_complete_identifier: cols = ["complete_identifier"] + cols return cols @property def operation_codes(self): Operation = apps.get_model("archaeological_operations", "Operation") return "|".join( sorted( [ Operation.objects.get(pk=ope_id).code_patriarche for ope_id in self.get_related_operation_ids() ] ) ) def get_related_operation_ids(self): operations = list(self.operations.values_list("id", flat=True).all()) operations += list( self.context_records.values_list("operation_id", flat=True).all() ) operations += list( self.finds.values_list( "base_finds__context_record__operation_id", flat=True ).all() ) return list(set(operations)) def get_index_operation(self): operations = self.get_related_operation_ids() if len(operations) != 1: return current_operation = operations[0] q = ( Document.objects.exclude(pk=self.pk) .filter( Q(operations__id=current_operation) | Q(context_records__operation_id=current_operation) | Q( finds__base_finds__context_record__operation_id=current_operation ) ) .order_by("-custom_index") ) current_index = None for doc in q.all(): if not doc.custom_index: continue if len(doc.get_related_operation_ids()) != 1: continue current_index = doc.custom_index break # ordered by "-custom_index" so max current index is reached if not current_index: return 1 else: return current_index + 1 def natural_key(self): return (self.external_id,) def sheet_header(self): headers = [] if self.complete_identifier: headers.append(self.complete_identifier) if self.title: headers.append(self.title) return " - ".join(headers) @property def has_iframe(self): return self.format_type and self.format_type.iframe_template def get_iframe(self): if not self.has_iframe: return "" return Template(self.format_type.iframe_template).render( Context({"document": self}) ) @property def container(self): if not self.container_id: return Container = apps.get_model("archaeological_warehouse", "Container") try: return Container.objects.get(pk=self.container_id) except Container.DoesNotExist: return @property def container_ref(self): if not self.container_ref_id: return Container = apps.get_model("archaeological_warehouse", "Container") try: return Container.objects.get(pk=self.container_ref_id) except Container.DoesNotExist: return @property def pdf_attached(self): if not self.associated_file and ( not self.source or not self.source.associated_file ): return extra = "" if self.associated_file: url = self.associated_file.url else: url = self.source.associated_file.url if self.source_page_range: extra = "#page=" extra += self.source_page_range.split("-")[0].split(";")[0] if not url.lower().endswith(".pdf"): return return url + extra """ @property def code(self): if not self.index: return "{}-".format(self.operation.code_patriarche or '') return "{}-{:04d}".format(self.operation.code_patriarche or '', self.index) """ def duplicate_item(self, user=None, data=None): return duplicate_item(self, user, data) @property def source_type_html(self): source_types = [] source_type = self.source_type while source_type: source_types.append(str(source_type)) source_type = source_type.parent return " / ".join(reversed(source_types)) def public_representation(self): site = Site.objects.get_current() scheme = "https" if settings.ISHTAR_SECURE else "http" base_url = scheme + "://" + site.domain + "/" image = None if self.image: image = self.image.url if not image.startswith("http"): if not image.startswith("/"): image = "/" + image image = base_url + image thumbnail = None if self.thumbnail: thumbnail = self.thumbnail.url if not thumbnail.startswith("http"): if not thumbnail.startswith("/"): thumbnail = "/" + thumbnail thumbnail = base_url + thumbnail return { "title": self.title, "reference": self.reference, "type": self.source_type and str(self.source_type), "authors": [a.public_representation() for a in self.authors.all()], "image": image, "thumbnail": thumbnail, } def get_extra_actions(self, request): """ For sheet template """ # url, base_text, icon, extra_text, extra css class, is a quick action actions = super(Document, self).get_extra_actions(request) # is_locked = self.is_locked(request.user) can_edit_document = self.can_do(request, "change_document") if not can_edit_document: return actions actions += [ ( reverse("document-qa-duplicate", args=[self.pk]), _("Duplicate"), "fa fa-clone", "", "", True, ), ] if get_current_profile().warehouse: actions.append( ( reverse("document-qa-packaging", args=[self.pk]), _("Packaging"), "fa fa-gift", "", "", True, ) ) return actions @property def thumbnail_path(self): if not self.thumbnail: return "" return self.thumbnail.path @property def image_path(self): if not self.image: return "" return self.image.path def get_values(self, prefix="", no_values=False, filtr=None, **kwargs): values = super(Document, self).get_values( prefix=prefix, no_values=no_values, filtr=filtr, **kwargs ) if not filtr or prefix + "image_path" in filtr: values[prefix + "image_path"] = self.image_path if not filtr or prefix + "thumbnail_path" in filtr: values[prefix + "thumbnail_path"] = self.thumbnail_path return values @property def images_without_main_image(self): return [] @property def associated_file_name(self): if not self.associated_file: return "" return os.path.basename(self.associated_file.name) @property def images(self): # mimic a queryset pointing to himself return Document.objects.filter(pk=self.pk, image__isnull=False).exclude( image="" ) @property def main_image(self): if self.images.count(): return self.images.all()[0] @property def has_related(self): return any(getattr(self, rel).count() for rel in self.RELATED_MODELS) @classmethod def get_query_owns(cls, ishtaruser): query_own_list = [] for rel_model in cls.RELATED_MODELS: klass = getattr(cls, rel_model).rel.related_model q_own_dct = klass._get_query_owns_dicts(ishtaruser) if q_own_dct: query_own_list.append((rel_model + "__", q_own_dct)) q = None for prefix, owns in query_own_list: subq = cls._construct_query_own(prefix, owns) if subq: if not q: q = subq else: q |= subq q |= cls._construct_query_own( "", [{"history_creator": ishtaruser.user_ptr}] ) return q def get_associated_operation(self): raise NotImplementedError() @property def associated_filename(self): values = [ str(getattr(self, attr)) for attr in ("source_type", "title") if getattr(self, attr) ] return slugify("-".join(values)) def _get_base_image_paths(self): if self.pk: # m2m not available if not created... for related_model in self.RELATED_MODELS: q = getattr(self, related_model).all() if q.count(): item = q.all()[0] yield item._get_base_image_path() def _get_base_image_path(self): for path in self._get_base_image_paths(): if path: return path n = datetime.datetime.now() return "upload/{}/{:02d}/{:02d}".format(n.year, n.month, n.day) def _get_available_filename(self, path, test_link=None): """ Get a filename not used If name already used - generate a name with schema: [base]-[current_number + 1].[suffix] :param path: base path :param test_link: test if an existing path match with this link :return: if test_link is not None, (new_path, link_match) otherwise the new_path """ file_split = path.split(".") suffix, base = "", "" if len(file_split) > 1: base = ".".join(file_split[0:-1]) suffix = file_split[-1] else: base = path base_split = base.split("-") current_nb = 0 if len(base_split) > 1: try: current_nb = int(base_split[-1]) base = "-".join(base_split[0:-1]) + "-" except ValueError: pass while os.path.exists(path): if ( test_link and os.path.islink(path) and os.readlink(path) == test_link ): return path, True current_nb += 1 path = "{}-{}.{}".format(base, current_nb, suffix) if test_link: return path, False return path def _move_image(self): """ Move to the relevant path and create appropriate symbolic links :return: list of associated links """ if getattr(self, "_no_move", False): return reference_path = self.image.path filename = os.path.basename(reference_path) links = [] for related_model in self.RELATED_MODELS: q = getattr(self, related_model).all() if q.count(): item = q.all()[0] base_path = item._get_base_image_path() new_path = base_path + "/" + filename # create a link new_path = settings.MEDIA_ROOT + new_path if not os.path.exists(os.path.dirname(new_path)): os.makedirs(os.path.dirname(new_path)) new_path, match = self._get_available_filename( new_path, test_link=reference_path ) links.append(new_path) if match: # the current link is correct continue try: os.symlink(reference_path, new_path) except FileExistsError: pass return links def related_label(self): items = [] for rel_attr in reversed(self.RELATED_MODELS): for item in getattr(self, rel_attr).all(): items.append(str(item)) return " ; ".join(items) def _generate_cache_related_label(self): return self.related_label()[:1000] @classmethod def get_next_index(cls): q = ( cls.objects.values("index") .filter(index__isnull=False) .order_by("-index") ) if not q.count(): return 1 cid = q.all()[0]["index"] if not cid: cid = 0 return cid + 1 @classmethod def get_import_defaults(cls): return {"index": cls.get_next_index()} def set_index(self): if self.index: return self.index = self.get_next_index() @classmethod @pre_importer_action def import_get_next_index(cls, context, value): context["index"] = cls.get_next_index() @property def dublin_core_identifier(self): identifier = None if self.isbn: identifier = self.isbn elif self.issn: identifier = self.issn elif self.associated_url: identifier = self.associated_url elif Site.objects.count(): identifier = "http://{}{}".format( Site.objects.all()[0].domain, self.get_absolute_url() ) return identifier def dublin_core_tags(self): if not self.title: return "" tags = [ ( "link", { "rel": "schema.DC", "href": "http://purl.org/dc/elements/1.1/", }, ), ( "link", {"rel": "schema.DCTERMS", "href": "http://purl.org/dc/terms/"}, ), ] title = {"name": "DC.title", "content": self.title} tags.append(("meta", title)) if self.creation_date: date = { "name": "DC.date", "scheme": "DCTERMS.W3CDTF", "content": self.creation_date.strftime("%Y-%m-%d"), } tags.append(("meta", date)) if self.tags.count(): content = ", ".join(str(t) for t in self.tags.all()) tg = {"name": "DC.subject", "content": content} tags.append(("meta", tg)) if self.description: tags.append( ( "meta", {"name": "DC.description", "content": self.description}, ) ) if self.publisher: tags.append( ( "meta", {"name": "DC.publisher", "content": self.publisher.name}, ) ) if self.authors.count(): content = ", ".join( str(t.person.raw_name) for t in self.authors.all() ) tags.append(("meta", {"name": "DC.creator", "content": content})) if self.source_type: tags.append( ("meta", {"name": "DC.type", "content": str(self.source_type)}) ) if self.format_type: tags.append( ( "meta", {"name": "DC.format", "content": str(self.format_type)}, ) ) identifier = self.dublin_core_identifier if identifier: tags.append( ("meta", {"name": "DC.identifier", "content": identifier}) ) if self.language: lang = self.language.iso_code tags.append(("meta", {"name": "DC.language", "content": lang})) if self.licenses.count(): licences = ", ".join(str(l) for l in self.licenses.all()) tags.append(("meta", {"name": "DC.rights", "content": licences})) src = None if self.source: src = self.source.dublin_core_identifier if src: tags.append(("meta", {"name": "DC.relation", "content": src})) tags.append(("meta", {"name": "DC.source", "content": src})) elif self.source_free_input: tags.append( ( "meta", {"name": "DC.source", "content": self.source_free_input}, ) ) html = "" for tag, attrs in tags: et = ET.Element(tag, attrib=attrs) html += ET.tostring(et, encoding="unicode", method="html") return html @property def extra_meta(self): return self.dublin_core_tags() def coins_tag(self): if not self.title: return info = [ ("url_ver", "Z39.88-2004"), ("ctx_ver", "Z39.88-2004"), ("rft_val_fmt", "info:ofi/fmt:kev:mtx:dc"), ("rft.title", self.title), ("rft.btitle", self.title), ] if self.associated_url: info.append(("rft.identifier", self.associated_url)) elif Site.objects.count(): info.append( ( "rft.identifier", "http://{}{}".format( Site.objects.all()[0].domain, self.get_absolute_url() ), ) ) for author in self.authors.all(): person = author.person if not person.raw_name: continue if person.first_name and person.name: info.append(("rft.aulast", person.name)) info.append(("rft.aufirst", person.first_name)) info.append( ("rft.au", "{}+{}".format(person.first_name, person.name)) ) else: info.append(("rft.au", person.raw_name)) if self.source_type: info.append(("rft.type", self.source_type.coins_type)) if self.source_type.coins_genre: info.append(("rft.genre", self.source_type.coins_genre)) if self.description: info.append(("rft.description", self.description)) if self.publisher: info.append(("rft.publisher", self.publisher.name)) if self.creation_date: if self.creation_date.day == 1 and self.creation_date.month == 1: info.append(("rft.date", self.creation_date.year)) else: info.append( ("rft.date", self.creation_date.strftime("%Y-%m-%d")) ) if self.source and self.source.title: info.append(("rft.source", self.source.title)) elif self.source_free_input: info.append(("rft.source", self.source_free_input)) if self.issn: info.append(("rft.issn", self.issn)) if self.isbn: info.append(("rft.isbn", self.isbn)) if self.licenses.count(): licenses = ";".join(str(l) for l in self.licenses.all()) info.append(("rft.rights", licenses)) if self.language: info.append(("rft.language", self.language.iso_code)) return ''.format(urlencode(info)) def save(self, *args, **kwargs): no_path_change = "no_path_change" in kwargs and kwargs.pop( "no_path_change" ) self.set_index() if not self.associated_url: self.associated_url = None container = self.container if self.container_id and not container: self.container_id = None if container and container.pk != self.container_id: self.container_id = container.pk container_ref = self.container_ref if self.container_ref_id and not container_ref: self.container_ref_id = None if container_ref and container_ref.pk != self.container_ref_id: self.container_ref_id = container_ref.pk super(Document, self).save(*args, **kwargs) if ( self.image and not no_path_change and not getattr(self, "_no_path_change", False) ): links = self._move_image() if not links: return links = self.LINK_SPLIT.join(links) if links != self.associated_links: self.associated_links = links self.save(no_path_change=True) post_save.connect(cached_label_changed, sender=Document) class OperationType(GeneralType): order = models.IntegerField(_("Order"), default=1) preventive = models.BooleanField(_("Is preventive"), default=True) judiciary = models.BooleanField(_("Is judiciary"), default=False) class Meta: verbose_name = _("Operation type") verbose_name_plural = _("Operation types") ordering = ["judiciary", "-preventive", "order", "label"] @classmethod def get_types( cls, dct=None, instances=False, exclude=None, empty_first=True, default=None, initial=None, ): dct = dct or {} exclude = exclude or [] initial = initial or [] tuples = [] dct["available"] = True if not instances and empty_first and not default: tuples.append(("", "--")) if default and not instances: try: default = cls.objects.get(txt_idx=default) tuples.append((default.pk, _(str(default)))) except cls.DoesNotExist: pass items = cls.objects.filter(**dct) if default and not instances: exclude.append(default.txt_idx) if exclude: items = items.exclude(txt_idx__in=exclude) current_preventive, current_judiciary, current_lst = None, None, None item_list = list(items.order_by(*cls._meta.ordering).all()) new_vals = cls._get_initial_types( initial, [i.pk for i in item_list], instance=True ) item_list += new_vals for item in item_list: item.rank = 0 if instances: return item_list for item in item_list: if ( not current_lst or item.preventive != current_preventive or item.judiciary != current_judiciary ): if current_lst: tuples.append(current_lst) if item.judiciary: gp_lbl = _("Judiciary") elif item.preventive: gp_lbl = _("Preventive") else: gp_lbl = _("Research") current_lst = [gp_lbl, []] current_preventive = item.preventive current_judiciary = item.judiciary current_lst[1].append((item.pk, _(str(item)))) if current_lst: tuples.append(current_lst) return tuples @classmethod def is_preventive(cls, ope_type_id, key=""): try: op_type = cls.objects.get(pk=ope_type_id) except cls.DoesNotExist: return False if not key: return op_type.preventive return key == op_type.txt_idx @classmethod def is_judiciary(cls, ope_type_id): try: op_type = cls.objects.get(pk=ope_type_id) except cls.DoesNotExist: return False return op_type.judiciary post_save.connect(post_save_cache, sender=OperationType) post_delete.connect(post_save_cache, sender=OperationType) class AdministrationScript(models.Model): path = models.CharField(_("Filename"), max_length=30) name = models.TextField(_("Name"), blank=True, default="") class Meta: verbose_name = _("Administration script") verbose_name_plural = _("Administration scripts") ordering = ["name"] def __str__(self): return str(self.name) SCRIPT_STATE = ( ("S", _("Scheduled")), ("P", _("In progress")), ("FE", _("Finished with errors")), ("F", _("Finished")), ) SCRIPT_STATE_DCT = dict(SCRIPT_STATE) class AdministrationTask(models.Model): script = models.ForeignKey(AdministrationScript) state = models.CharField( _("State"), max_length=2, choices=SCRIPT_STATE, default="S" ) creation_date = models.DateTimeField(default=datetime.datetime.now) launch_date = models.DateTimeField(null=True, blank=True) finished_date = models.DateTimeField(null=True, blank=True) result = models.TextField(_("Result"), blank=True, default="") class Meta: verbose_name = _("Administration task") verbose_name_plural = _("Administration tasks") ordering = ["script"] def __str__(self): state = _("Unknown") if self.state in SCRIPT_STATE_DCT: state = str(SCRIPT_STATE_DCT[self.state]) return "{} - {} - {}".format(self.script, self.creation_date, state) def execute(self): if self.state != "S": return self.launch_date = datetime.datetime.now() script_dir = settings.ISHTAR_SCRIPT_DIR if not script_dir: self.result = str( _( "ISHTAR_SCRIPT_DIR is not set in your " "local_settings. Contact your administrator." ) ) self.state = "FE" self.finished_date = datetime.datetime.now() self.save() return if ".." in script_dir: self.result = str( _( "Your ISHTAR_SCRIPT_DIR is containing " 'dots "..". As it can refer to relative ' "paths, it can be a security issue and this is " "not allowed. Only put a full path." ) ) self.state = "FE" self.finished_date = datetime.datetime.now() self.save() return if not os.path.isdir(script_dir): self.result = str( _('Your ISHTAR_SCRIPT_DIR: "{}" is not a valid directory.') ).format(script_dir) self.state = "FE" self.finished_date = datetime.datetime.now() self.save() return script_name = None # only script inside the script directory can be executed for name in os.listdir(script_dir): if name == self.script.path: if os.path.isfile(os.path.join(script_dir, name)): script_name = os.path.join(script_dir, name) break if not script_name: self.result = str( _( 'Script "{}" is not available in your script directory. ' "Check your configuration." ) ).format(self.script.path) self.state = "FE" self.finished_date = datetime.datetime.now() self.save() return self.state = "P" self.save() self.finished_date = datetime.datetime.now() try: session = Popen([script_name], stdout=PIPE, stderr=PIPE) stdout, stderr = session.communicate() except OSError as e: self.state = "FE" self.result = 'Error executing "{}" script: {}'.format( self.script.path, e ) self.save() return self.finished_date = datetime.datetime.now() if stderr: self.state = "FE" self.result = "Error: {}".format(stderr.decode("utf-8")) else: self.state = "F" self.result = "{}".format(stdout.decode("utf-8")) self.save() ITEM_TYPES = ( ("O", _("Operation")), ("S", _("Archaeological site")), ("CR", _("Context record")), ("F", _("Find")), ("W", _("Warehouse")), ) EXPORT_STATE = (("C", _("Created")),) + SCRIPT_STATE class ExportTask(models.Model): filter_type = models.CharField( _("Filter on"), max_length=2, choices=ITEM_TYPES, null=True, blank=True ) filter_text = models.TextField( _("Filter query"), blank=True, default="", help_text=_( "Textual query on this item (try it on the main " "interface)" ), ) geo = models.BooleanField( _("Export geographic data"), default=True, help_text=_( "Geographic data can represent large volume of " "information. Geographic data can be excluded from the " "export" ), ) state = models.CharField( _("State"), max_length=2, choices=EXPORT_STATE, default="C" ) put_locks = models.BooleanField( _("Put locks on associated items"), default=False ) lock_user = models.ForeignKey( User, related_name="+", on_delete=models.SET_NULL, verbose_name=_("Lock user"), blank=True, null=True, help_text=_( "Owner of the lock if item are locked. Warning: if no " "user is provided the locks can be remove by any user " "with the permission to edit." ), ) export_types = models.BooleanField(_("Export types"), default=True) export_conf = models.BooleanField(_("Export configuration"), default=True) export_importers = models.BooleanField(_("Export importers"), default=True) export_geo = models.BooleanField(_("Export towns, areas..."), default=True) export_dir = models.BooleanField(_("Export directory"), default=True) export_docs = models.BooleanField(_("Export documents"), default=True) export_items = models.BooleanField(_("Export main items"), default=True) creation_date = models.DateTimeField(default=datetime.datetime.now) launch_date = models.DateTimeField(null=True, blank=True) finished_date = models.DateTimeField(null=True, blank=True) result = models.FileField( _("Result"), null=True, blank=True, upload_to="exports/%Y/%m/" ) result_info = models.TextField( _("Result information"), blank=True, default="" ) class Meta: verbose_name = _("Archive - Export") verbose_name_plural = _("Archive - Exports") ordering = ["creation_date"] def __str__(self): state = _("Unknown") if self.state in SCRIPT_STATE_DCT: state = str(SCRIPT_STATE_DCT[self.state]) return "Export - {} - {}".format(self.creation_date, state) @property def label(self): fltr = _("Whole database") if self.filter_type and self.filter_text: dct = dict(ITEM_TYPES) if self.filter_type in dct: fltr = '{} "{}"'.format(dct[self.filter_type], self.filter_text) return "{} - {}".format(fltr, self.creation_date) def clean(self): if (self.filter_text and not self.filter_type) or ( self.filter_type and not self.filter_text ): raise ValidationError( _("To filter filter type and filter text must be filled.") ) class ImportTask(models.Model): creation_date = models.DateTimeField(default=datetime.datetime.now) launch_date = models.DateTimeField(null=True, blank=True) finished_date = models.DateTimeField(null=True, blank=True) import_user = models.ForeignKey( User, related_name="+", on_delete=models.SET_NULL, verbose_name=_("Import user"), blank=True, null=True, help_text=_( 'If set the "Import user" will be the editor for last ' "version. If the field is left empty no history will be " "recorded." ), ) state = models.CharField( _("State"), max_length=2, choices=EXPORT_STATE, default="C" ) delete_before = models.BooleanField( _("Delete before adding"), default=False, help_text=_("Delete existing items before adding"), ) releasing_locks = models.BooleanField( _("Releasing locks on associated items"), default=False ) source = models.FileField(_("Source"), upload_to="imports/%Y/%m/") class Meta: verbose_name = _("Archive - Import") verbose_name_plural = _("Archive - Imports") ordering = ["creation_date"] def __str__(self): state = _("Unknown") if self.state in SCRIPT_STATE_DCT: state = str(SCRIPT_STATE_DCT[self.state]) return "Import - {} - {}".format(self.creation_date, state)