#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2010-2017 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. """ Models description """ from ipware import get_client_ip import sys from bs4 import BeautifulSoup import bleach import copy import datetime import inspect from importlib import import_module from jinja2 import TemplateSyntaxError, UndefinedError import json import logging import os import re import string import tempfile from io import BytesIO # nosec: only script inside the script directory can be executed # script directory is not web available from subprocess import Popen, PIPE # nosec from PIL import Image from markdown import markdown from ooopy.OOoPy import OOoPy from ooopy.Transformer import Transformer as OOTransformer import ooopy.Transforms as OOTransforms import uuid import zipfile from urllib.parse import urlencode # nosec: ElementTree used to create XML not for parsing from xml.etree import ElementTree as ET # nosec from django.apps import apps from django.conf import settings from django.contrib.auth.models import User, Group from django.contrib.contenttypes.models import ContentType from django.contrib.gis.db import models from django.contrib.gis.db.models.aggregates import Union from django.contrib.gis.geos.polygon import Polygon from django.contrib.gis.geos.collections import MultiPolygon from django.contrib.postgres.fields import JSONField, ArrayField from django.contrib.postgres.indexes import GinIndex from django.contrib.sites.models import Site from django.core.cache import cache from django.core.exceptions import ( ObjectDoesNotExist, ValidationError, MultipleObjectsReturned, ) from django.core.files.base import ContentFile from django.core.files.uploadedfile import SimpleUploadedFile from django.db import connection from django.db.models import Q, Max, Count from django.db.models.signals import post_save, post_delete, m2m_changed from django.db.utils import DatabaseError from django.template import Context, Template from django.template.defaultfilters import slugify from django.urls import reverse from django.utils.functional import lazy from django.utils.safestring import mark_safe from ishtar_common.data_importer import post_importer_action from ishtar_common.utils import ( ugettext_lazy as _, ugettext, pgettext_lazy, format_date, get_generated_id, get_current_profile, duplicate_item, get_image_path, serialize_args_for_tasks, task, generate_pdf_preview, revoke_old_task, ) from ishtar_common.utils_secretary import IshtarSecretaryRenderer from ishtar_common.alternative_configs import ( ALTERNATE_CONFIGS, ALTERNATE_CONFIGS_CHOICES, ) from ishtar_common.data_importer import pre_importer_action from ishtar_common.model_managers import ( SlugModelManager, ExternalIdManager, UUIDModelManager, ) from ishtar_common.model_merging import merge_model_objects from ishtar_common.models_imports import ( ImportLineError, ImporterModel, ImporterType, ImporterGroup, ImporterGroupImporter, ImporterDefault, ImporterDefaultValues, ImporterColumn, ImporterDuplicateField, Regexp, ImportTarget, TargetKey, FormaterType, Import, ImportGroup, TargetKeyGroup, ValueFormater, ItemKey, ImportColumnValue, ) from ishtar_common.utils import ( get_cache, create_slug, get_all_field_names, cached_label_changed, generate_relation_graph, max_size_help, ) from ishtar_common.models_common import ( GeneralType, HierarchicalType, OrderedHierarchicalType, OrderedType, BaseHistorizedItem, LightHistorizedItem, HistoricalRecords, FullSearch, SearchAltName, OwnPerms, Cached, Imported, Address, post_save_cache, TemplateItem, SpatialReferenceSystem, DashboardFormItem, document_attached_changed, SearchAltName, DynamicRequest, GeoItem, GeoDataType, GeoOriginType, GeoProviderType, GeoVectorData, GeoBufferType, CompleteIdentifierItem, SearchVectorConfig, DocumentItem, QuickAction, MainItem, Merge, ShortMenuItem, Town, ImageContainerModel, StatisticItem, CachedGen, Department, State, ) __all__ = [ "ImportColumnValue", "ImporterModel", "ImporterType", "ImporterGroup", "ImportGroup", "ImportLineError", "ImporterGroupImporter", "ImporterDefault", "ImporterDefaultValues", "ImporterColumn", "ImporterDuplicateField", "Imported", "Regexp", "ImportTarget", "ItemKey", "OrderedHierarchicalType", "OrderedType", "TargetKey", "FormaterType", "Import", "TargetKeyGroup", "ValueFormater", "Organization", "Person", "valid_id", "Town", "SpatialReferenceSystem", "OrganizationType", "Document", "GeneralType", "get_generated_id", "get_current_profile", "LightHistorizedItem", "OwnPerms", "Address", "post_save_cache", "DashboardFormItem", "ShortMenuItem", "document_attached_changed", "SearchAltName", "DynamicRequest", "GeoItem", "SearchVectorConfig", "DocumentItem", "CachedGen", "StatisticItem", "Department", "State", "CompleteIdentifierItem", "GeoVectorData", "GeoDataType", "GeoOriginType", "GeoProviderType", "GeoBufferType", ] logger = logging.getLogger(__name__) def post_save_user(sender, **kwargs): user = kwargs["instance"] if kwargs["created"]: try: IshtarUser.create_from_user(user) except DatabaseError: # manage when db is not synced pass IshtarUser.set_superuser(user) post_save.connect(post_save_user, sender=User) class ValueGetter: _prefix = "" COL_LABELS = {} GET_VALUES_EXTRA = [] GET_VALUES_EXCLUDE_FIELDS = [ "search_vector", "id", "multi_polygon", "point_2d", "point", "history_m2m", ] GET_VALUES_M2M = [] def _get_values_documents(self, prefix="", filtr=None): values = {} if not hasattr(self, "documents"): return values if not filtr or prefix + "documents" in filtr: values[prefix + "documents"] = [ doc.get_values(no_values=True) for doc in self.documents.all() ] if not filtr or prefix + "images" in filtr: values[prefix + "images"] = [ image.get_values(no_values=True) for image in self.images.all() ] if filtr and prefix + "main_image" not in filtr: return values if ( hasattr(self, "main_image") and self.main_image and hasattr(self.main_image, "get_values") ): values[prefix + "main_image"] = self.main_image.get_values(no_values=True) return values def _get_values_update_sub_filter(self, filtr, prefix): if not filtr: return return [k[len(prefix) :] for k in filtr if k.startswith(prefix)] def get_extra_values(self, prefix="", no_values=False, filtr=None, **kwargs): return {} def get_values(self, prefix="", no_values=False, filtr=None, **kwargs): if not prefix: prefix = self._prefix extra_args = ["getvalues", str(self.pk), prefix, "1" if no_values else "0"] if filtr: extra_args += filtr for k in kwargs: extra_args += [k, str(kwargs[k])] cache_key, values = get_cache( self.__class__, extra_args=extra_args ) if values: return values exclude = kwargs.get("exclude", []) values = {} if ( hasattr(self, "qrcode") and (not filtr or prefix + "qrcode_path" in filtr) and prefix + "qrcode_path" not in exclude ): values[prefix + "qrcode_path"] = self.qrcode_path for field_name in get_all_field_names(self): try: value = getattr(self, field_name) except (AttributeError, MultipleObjectsReturned): continue if ( field_name in self.GET_VALUES_EXCLUDE_FIELDS or prefix + field_name in exclude ): continue if filtr and not any( field_name for f in filtr if f.startswith(prefix + field_name) ): continue if hasattr(value, "get_values"): new_prefix = prefix + field_name + "_" values.update(value.get_values(new_prefix, filtr=filtr, **kwargs)) if hasattr(self, "get_values_for_" + field_name): values[prefix + field_name] = getattr( self, "get_values_for_" + field_name )() else: values[prefix + field_name] = value values.update(self._get_values_documents(prefix=prefix, filtr=filtr)) for extra_field in self.GET_VALUES_EXTRA: if filtr and not any( extra_field for f in filtr if f.startswith(prefix + extra_field) ): continue values[prefix + extra_field] = getattr(self, extra_field) or "" for key, val in values.items(): if val is None: val = "" elif (key in self.GET_VALUES_M2M or "type" in key) and ( val.__class__.__name__.split(".")[0] == "ManyRelatedManager" ): val = " ; ".join(str(v) for v in val.all()) elif not isinstance(val, (tuple, list, dict)): val = str(val) if val.endswith(".None"): val = "" values[key] = val values.update( self.get_extra_values(prefix=prefix, no_values=no_values, filtr=filtr, **kwargs) ) # do not provide KEYS and VALUES for sub-items if (prefix and prefix != self._prefix) or no_values: cache.set(cache_key, values, 10) return values value_list = [] for key, value_ in values.items(): if key in ("KEYS", "VALUES"): continue value_list.append((key, str(value_))) for global_var in GlobalVar.objects.all(): values[global_var.slug] = global_var.value or "" cache.set(cache_key, values, 10) return values @classmethod def get_empty_values(cls, prefix=""): if not prefix: prefix = cls._prefix return {prefix + field_name: "" for field_name in get_all_field_names(cls)} class HistoryModel(models.Model): class Meta: abstract = True def m2m_listing(self, key, create=False): if not self.history_m2m or key not in self.history_m2m: return models = self.__class__.__module__ if not models.endswith(".models"): models += ".models" models = import_module(models) model = getattr(models, self.__class__.__name__[len("Historical") :]) field = getattr(model, key) if hasattr(field, "rel"): field = field.rel else: field = field.remote_field related_model = field.model return related_model.history_decompress(self.history_m2m[key], create=create) def valid_id(cls): # valid ID validator for models def func(value): try: cls.objects.get(pk=value) except ObjectDoesNotExist: raise ValidationError(_("Not a valid item.")) return func def valid_ids(cls): def func(value): if "," in value: value = value.split(",") if type(value) not in (list, tuple): value = [value] for v in value: try: cls.objects.get(pk=v) except ObjectDoesNotExist: raise ValidationError(_("A selected item is not a valid item.")) return func def is_unique(cls, field): # unique validator for models def func(value): query = {field: value} if cls.objects.filter(**query).count() != 0: raise ValidationError(_("This item already exists.")) return func def get_general_type_label(model, slug): obj = model.get_cache(slug) if not obj: return "" return str(obj) class TinyUrl(models.Model): CHAR_MAP = string.ascii_letters + string.digits CHAR_MAP_LEN = len(CHAR_MAP) link = models.URLField() @classmethod def index_to_char(cls, seq): return "".join(cls.CHAR_MAP[x] for x in seq) def get_short_id(self): c_id = self.id digits = [] while c_id > 0: digits.append(c_id % self.CHAR_MAP_LEN) c_id //= self.CHAR_MAP_LEN digits.reverse() return self.index_to_char(digits) @classmethod def decode_id(cls, value): i = 0 for c in value: i = i * cls.CHAR_MAP_LEN + cls.CHAR_MAP.index(c) return i class ImageModel(models.Model, ImageContainerModel): image = models.ImageField( upload_to=get_image_path, blank=True, null=True, max_length=255, help_text=max_size_help(), ) thumbnail = models.ImageField( upload_to=get_image_path, blank=True, null=True, max_length=255, help_text=max_size_help(), ) DUPLICATE_EXCLUDE = ["image", "thumbnail"] IMAGE_MAX_SIZE = settings.IMAGE_MAX_SIZE THUMB_MAX_SIZE = settings.THUMB_MAX_SIZE IMAGE_PREFIX = "" class Meta: abstract = True def has_changed(self, field): if not self.pk: return True manager = getattr(self.__class__, "objects") old = getattr(manager.get(pk=self.pk), field) return getattr(self, field) != old def create_thumb(self, image, size): """Returns the image resized to fit inside a box of the given size""" image.thumbnail(size, Image.ANTIALIAS) temp = BytesIO() image.save(temp, "jpeg") temp.seek(0) return SimpleUploadedFile("temp", temp.read()) def save(self, *args, **kwargs): if "force_copy" in kwargs: kwargs.pop("force_copy") super(ImageModel, self).save(*args, **kwargs) return # manage images if not self.has_changed("image"): return super(ImageModel, self).save(*args, **kwargs) if not self.image: self.thumbnail = None return super(ImageModel, self).save(*args, **kwargs) # # generate thumbnail # convert to jpg filename = os.path.splitext(os.path.split(self.image.name)[-1])[0] old_path = self.image.path filename = "%s.jpg" % filename image = None try: image = Image.open(self.image.file) # convert to RGB if image.mode not in ("L", "RGB"): image = image.convert("RGB") # resize if necessary if self.IMAGE_MAX_SIZE: self.image.save( filename, self.create_thumb(image, self.IMAGE_MAX_SIZE), save=False, ) if old_path != self.image.path: try: os.remove(old_path) except OSError: # already clean pass # save the thumbnail thumb_filename = self._get_thumb_name(filename) self.thumbnail.save( thumb_filename, self.create_thumb(image, self.THUMB_MAX_SIZE), save=False, ) except (IOError, ValueError): self.thumbnail = None self.image = None finally: returned = super().save(*args, **kwargs) if image: image.close() return returned def _get_thumb_name(self, filename): splited = filename.split(".") return "{}-thumb.{}".format(".".join(splited[:-1]), splited[-1]) class RelationItem(models.Model): """ Items with relation between them """ MAIN_UP_MODEL_QUERY = "" relation_image = models.FileField( _("Generated relation image (SVG)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help(), ) relation_bitmap_image = models.FileField( _("Generated relation image (PNG)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help(), ) relation_dot = models.FileField( _("Generated relation image (DOT)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help(), ) relation_image_above = models.FileField( _("Generated above relation image (SVG)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help(), ) relation_dot_above = models.FileField( _("Generated above relation image (DOT)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help(), ) relation_bitmap_image_above = models.FileField( _("Generated above relation image (PNG)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help(), ) relation_image_below = models.FileField( _("Generated below relation image (SVG)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help(), ) relation_dot_below = models.FileField( _("Generated below relation image (DOT)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help(), ) relation_bitmap_image_below = models.FileField( _("Generated below relation image (PNG)"), null=True, blank=True, upload_to=get_image_path, help_text=max_size_help(), ) class Meta: abstract = True def generate_relation_image( self, highlight_current=True, render_above=True, render_below=True, full=False, ): generate_relation_graph( self, highlight_current=highlight_current, render_above=render_above, render_below=render_below, full=full, ) class JsonDataSectionManager(models.Manager): def get_by_natural_key(self, name, app_label, model): return self.get( name=name, content_type__app_label=app_label, content_type__model=model, ) class JsonDataSection(models.Model): content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE) name = models.CharField(_("Name"), max_length=200) order = models.IntegerField(_("Order"), default=10) objects = JsonDataSectionManager() class Meta: verbose_name = _("Custom data - Section") verbose_name_plural = _("Custom data - Sections") ordering = ["order", "name"] unique_together = ("name", "content_type") ADMIN_SECTION = _("Custom data / custom forms") def natural_key(self): return (self.name, self.content_type.app_label, self.content_type.model) def __str__(self): return "{} - {}".format(self.content_type, self.name) JSON_VALUE_TYPES = ( ("T", _("Text")), ("LT", _("Long text")), ("I", _("Integer")), ("B", _("Boolean")), ("F", _("Float")), ("D", _("Date")), ("C", _("Choices")), ("MC", _("Multi-choices")), ) class JsonDataFieldManager(models.Manager): def get_by_natural_key(self, key, app_label, model): return self.get( key=key, content_type__app_label=app_label, content_type__model=model, ) class JsonDataField(models.Model): name = models.CharField(_("Name"), max_length=200) content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE) key = models.CharField( _("Key"), max_length=200, help_text=_( "Value of the key in the JSON schema. For hierarchical " 'key use "__" to explain it. For instance for the key ' "'my_subkey' with data such as {'my_key': {'my_subkey': " "'value'}}, its value will be reached with " "my_key__my_subkey." ), ) display = models.BooleanField(_("Display"), default=True) value_type = models.CharField( _("Type"), default="T", max_length=10, choices=JSON_VALUE_TYPES ) order = models.IntegerField(_("Order"), default=10) search_index = models.BooleanField(_("Use in search indexes"), default=False) section = models.ForeignKey( JsonDataSection, blank=True, null=True, on_delete=models.SET_NULL, related_name="json_data_field", ) custom_forms = models.ManyToManyField( "CustomForm", blank=True, through="CustomFormJsonField" ) objects = JsonDataFieldManager() class Meta: verbose_name = _("Custom data - Field") verbose_name_plural = _("Custom data - Fields") ordering = ["order", "name"] unique_together = ("content_type", "key") ADMIN_SECTION = _("Custom data / custom forms") def natural_key(self): return (self.key, self.content_type.app_label, self.content_type.model) def __str__(self): return "{} - {}".format(self.content_type, self.name) def format_value(self, value): if value is None or value == "": return value if self.value_type == "D": return format_date(value) else: return value def clean(self): if not self.section: return if self.section.content_type != self.content_type: raise ValidationError( _("Content types of the field and of the menu do not match") ) LOGICAL_TYPES = ( ("above", _("Above")), ("below", _("Below")), ("equal", _("Equal")), ("include", _("Include")), ("included", _("Is included")), ) class GeneralRelationType(GeneralType): order = models.IntegerField(_("Order"), default=1) symmetrical = models.BooleanField(_("Symmetrical")) tiny_label = models.CharField(_("Tiny label"), max_length=50, blank=True, null=True) inverse_relation = models.ForeignKey( "self", verbose_name=_("Inverse relation"), blank=True, null=True, on_delete=models.SET_NULL, ) logical_relation = models.CharField( verbose_name=_("Logical relation"), max_length=10, choices=LOGICAL_TYPES, blank=True, null=True, ) class Meta: abstract = True def clean(self): # cannot have symmetrical and an inverse_relation if self.symmetrical and self.inverse_relation: raise ValidationError(_("Cannot have symmetrical and an inverse_relation")) def get_tiny_label(self): return self.tiny_label or self.label or "" def save(self, *args, **kwargs): obj = super(GeneralRelationType, self).save(*args, **kwargs) # after saving check that the inverse_relation of the inverse_relation # point to the saved object if self.inverse_relation and ( not self.inverse_relation.inverse_relation or self.inverse_relation.inverse_relation != self ): self.inverse_relation.inverse_relation = self self.inverse_relation.symmetrical = False self.inverse_relation.save() return obj class GeneralRecordRelations(Imported): class Meta: abstract = True @classmethod def general_types(cls): return ["relation_type"] def save(self, *args, **kwargs): super(GeneralRecordRelations, self).save(*args, **kwargs) # after saving create the symmetric or the inverse relation sym_rel_type = self.relation_type if not self.relation_type.symmetrical: sym_rel_type = self.relation_type.inverse_relation # no symetric/inverse is defined if not sym_rel_type: return dct = { "right_record": self.left_record, "left_record": self.right_record, "relation_type": sym_rel_type, } self.__class__.objects.get_or_create(**dct) return self def post_delete_record_relation(sender, instance, **kwargs): # delete symmetrical or inverse relation sym_rel_type = instance.relation_type if not instance.relation_type.symmetrical: sym_rel_type = instance.relation_type.inverse_relation # no symetric/inverse is defined if not sym_rel_type: return dct = { "right_record_id": instance.left_record_id, "left_record_id": instance.right_record_id, "relation_type": sym_rel_type, } q = instance.__class__.objects.filter(**dct) if q.count(): q.delete() @task() def relation_view_update(sender, kwargs): if isinstance(sender, (tuple, list)): sender = apps.get_model(*sender) sender._update(kwargs["item_id"]) class RelationsViews(models.Model): CREATE_SQL = "" # SQL view creation DELETE_SQL = "" # SQL view deletion CREATE_TABLE_SQL = "" # SQL table creation class Meta: managed = False abstract = True @classmethod def _update(cls, item_id): raise NotImplemented() @classmethod def update(cls, item_id): profile = get_current_profile() if profile.parent_relations_engine == "V": return if not settings.USE_BACKGROUND_TASK: return relation_view_update(cls, {"item_id": item_id}) else: sender, kwargs = serialize_args_for_tasks(cls, None, {"item_id": item_id}) task_item = relation_view_update.delay(sender, kwargs) revoke_old_task(kwargs, "relation_view_update", task_item.id, cls) return task_item @classmethod def _get_base_children(cls): raise NotImplemented() @classmethod def regenerate_all(cls, quiet=True): cls.check_engine() profile = get_current_profile(force=True) if profile.parent_relations_engine == "V": return cls.objects.filter(pk__isnull=False).delete() base_children = list(cls._get_base_children()) total = len(base_children) for idx, cr_id in enumerate(base_children): if not quiet: sys.stdout.write(f"Processing: {idx + 1} / {total}\t\t{cr_id}\r") sys.stdout.flush() cls.update(cr_id) if not quiet: sys.stdout.write("\n") @classmethod def create_table(cls): raise NotImplemented() @classmethod def check_engine(cls): """ Check view or table properly created with settings on the profile :return: True if table or view updated """ if not cls.CREATE_SQL or not cls.DELETE_SQL or not cls.CREATE_TABLE_SQL: raise NotImplementedError("CREATE_SQL or DELETE_SQL or CREATE_TABLE_SQL is missing.") profile = get_current_profile(force=True) table_type = "" with connection.cursor() as cursor: q = ( "select table_type from information_schema.tables WHERE " "table_name=%s;" ) cursor.execute(q, [cls._meta.db_table]) q = cursor.fetchall() if q: table_type = q[0][0] if profile.parent_relations_engine == "V": if table_type == "VIEW": return elif "TABLE" in table_type: q = "DROP TABLE IF EXISTS %s" % cls._meta.db_table cursor.execute(q) cursor.execute(cls.CREATE_SQL) return True if profile.parent_relations_engine == "T": if "TABLE" in table_type: return elif table_type == "VIEW": cursor.execute(cls.DELETE_SQL) cursor.execute(cls.CREATE_TABLE_SQL) return True class SearchQuery(models.Model): label = models.TextField(_("Label"), blank=True, default="") query = models.TextField(_("Query"), blank=True, default="") content_type = models.ForeignKey( ContentType, verbose_name=_("Content type"), on_delete=models.CASCADE ) profile = models.ForeignKey( "UserProfile", verbose_name=_("Profile"), on_delete=models.CASCADE ) is_alert = models.BooleanField(_("Is an alert"), default=False) class Meta: verbose_name = _("Search query") verbose_name_plural = _("Search queries") ordering = ["label"] def __str__(self): return str(self.label) class Language(GeneralType): iso_code = models.CharField(_("ISO code"), null=True, blank=True, max_length=2) class Meta: verbose_name = _("Language type") verbose_name_plural = _("Language types") ADMIN_SECTION = _("Documents") CURRENCY = (("€", _("Euro")), ("$", _("US dollar"))) FIND_INDEX_SOURCE = (("O", _("Operations")), ("CR", _("Context records"))) SITE_LABELS = [("site", _("Site")), ("entity", _("Archaeological entity"))] TRANSLATED_SITE_LABELS = { "site": { "plural": _("Sites"), "search": _("Site search"), "new": _("New site"), "modification": _("Site modification"), "deletion": _("Site deletion"), "attached-to-operation": _("Site (attached to the operation)"), "name-attached-to-operation": _("Site name (attached to the operation)"), "attached-to-cr": _("Site (attached to the context record)"), "name-attached-to-cr": _("Site name (attached to the context record)"), }, "entity": { "plural": _("Archaeological entities"), "search": _("Archaeological entity search"), "new": _("New archaeological entity"), "modification": _("Archaeological entity modification"), "deletion": _("Archaeological entity deletion"), "attached-to-operation": _( "Archaeological entity (attached to the " "operation)" ), "name-attached-to-operation": _( "Archaeological entity name (attached " "to the operation)" ), "attached-to-cr": _( "Archaeological entity (attached to the context " "record)" ), "name-attached-to-cr": _( "Archaeological entity name (attached to the context record)" ), }, } ACCOUNT_NAMING_STYLE = ( ("NF", _("name.firstname")), ("FN", _("firstname.name")), ) class IshtarSiteProfile(models.Model, Cached): slug_field = "slug" label = models.TextField(_("Name")) slug = models.SlugField(_("Slug"), unique=True) active = models.BooleanField(_("Current active"), default=False) experimental_feature = models.BooleanField( _("Activate experimental feature"), default=False ) description = models.TextField(_("Description"), blank=True, default="") warning_name = models.TextField(_("Warning name"), blank=True, default="") warning_message = models.TextField(_("Warning message"), blank=True, default="") footer = models.TextField(_("Footer text"), default="", blank=True, help_text=_("You can use markdown syntax.")) delete_image_zip_on_archive = models.BooleanField( _("Import - Delete image/document zip on archive"), default=False ) clean_redundant_document_association = models.BooleanField( _("Document - Remove redundant association"), default=False, help_text=_( "For instance, remove operation association of a " "document also associated to a find of this operation. " "Only manage association of operations, context records " "and finds." ), ) calculate_weight_on_full = models.BooleanField( _("Container - calculate weight only when all find has a weight"), default=False, ) parent_relations_engine = models.CharField( _("Parent relations engine"), choices=( ("V", _("SQL views")), ("T", _("Cache tables")), ), default="T", max_length=1, help_text=_( "If you experience performance problems with complex relations " "(for instance: complex statigraphic relations), set it to " '"Cache tables" in order to use static cache tables. Do not ' "forget to update theses table with the " '"migrate_relations_cache_tables" manage.py command.' ), ) config = models.CharField( _("Alternate configuration"), max_length=200, choices=ALTERNATE_CONFIGS_CHOICES, help_text=_("Choose an alternate configuration for label, " "index management"), null=True, blank=True, ) files = models.BooleanField(_("File/administrative module"), default=False) archaeological_site = models.BooleanField( _("Archaeological site module"), default=False ) archaeological_site_label = models.CharField( _("Archaeological site type"), max_length=200, choices=SITE_LABELS, default="site", ) context_record = models.BooleanField(_("Context records module"), default=False) find = models.BooleanField( _("Finds module"), default=False, help_text=_("Need context records module"), ) find_index = models.CharField( _("Find index is based on"), default="O", max_length=2, choices=FIND_INDEX_SOURCE, help_text=_( "To prevent irrelevant indexes, change this parameter " "only if there is no find in the database" ), ) warehouse = models.BooleanField( _("Warehouses module"), default=False, help_text=_("Need finds module") ) preservation = models.BooleanField(_("Preservation module"), default=False) mapping = models.BooleanField(_("Mapping module"), default=False) museum = models.BooleanField(_("Museum module"), default=False, help_text=_("Need finds module")) point_precision = models.IntegerField( _("Point precision (search and sheets)"), null=True, blank=True, help_text=_( "Number of digit to round from the decimal point for coordinates " "in WGS84 (latitude, longitude). Empty value means no round." ), ) locate_warehouses = models.BooleanField( _("Locate warehouse and containers"), default=False, help_text=_( "Mapping module must be activated. With many containers and " "background task not activated, activating this option may " "consume many resources." ), ) use_town_for_geo = models.BooleanField( _("Use town to locate when coordinates are missing"), default=True ) relation_graph = models.BooleanField(_("Generate relation graph"), default=False) preventive_operator = models.BooleanField( _("Preventive operator module"), default=False ) underwater = models.BooleanField(_("Underwater module"), default=False) no_context_button = models.ForeignKey( "archaeological_context_records.ContextRecord", verbose_name=_("Context record for no context button"), help_text=_('If provided a button is displayed on find add page to create a "No context" find'), on_delete=models.SET_NULL, null=True, blank=True ) parcel_mandatory = models.BooleanField( _("Parcel are mandatory for context records"), default=True ) homepage = models.TextField( _("Home page"), blank=True, default="", help_text=_( "Homepage of Ishtar - if not defined a default homepage " "will appear. Use HTML or markdown syntax." ), ) homepage_title = models.CharField(_("Homepage - Title"), max_length=100, default="", blank=True) homepage_statistics_available = models.BooleanField( _("Homepage - Statistics available"), default=False ) homepage_statistics_available_offline = models.BooleanField( _("Homepage - Statistics available off-line"), default=False ) homepage_random_image_available = models.BooleanField( _("Homepage - Random image available"), default=False ) operation_prefix = models.CharField( _("Main operation code prefix"), default="OA", null=True, blank=True, max_length=20, ) default_operation_prefix = models.CharField( _("Default operation code prefix"), default="OP", null=True, blank=True, max_length=20, ) operation_region_code = models.CharField( _("Operation region code"), null=True, blank=True, max_length=5 ) operation_complete_identifier = models.TextField( _("Operation complete identifier"), default="{% if code_patriarche %}OA{{code_patriarche}}{% else %}" "{% if operation_code and year %}{{year}}-{{operation_code}}" "{% else %}-{% endif %}{% endif %}", help_text=_("Formula to manage operation complete identifier."), ) operation_custom_index = models.TextField( _("Operation custom index key"), default="", blank=True, help_text=_( "Keys to be used to manage operation custom index. " "Separate keys with a semicolon." ), ) operation_cached_label = models.TextField( _("Operation cached label"), default="{{cached_towns_label}} | {% if code_patriarche %}OA{{code_patriarche}}{% else %}" "{% if operation_code and year %}OP{{year}}-{{operation_code}}{% endif %}{% endif %}", help_text=_( "Formula to manage cached label. If not set a default formula is used." ), ) site_complete_identifier = models.TextField( _("Archaeological site complete identifier"), default="", blank=True, help_text=_("Formula to manage archaeological site complete identifier."), ) site_custom_index = models.TextField( _("Archaeological site custom index key"), default="", blank=True, help_text=_( "Keys to be used to manage archaeological site custom " "index. Separate keys with a semicolon." ), ) site_cached_label = models.TextField( _("Site cached label"), default="", blank=True, help_text=_( "Formula to manage cached label. If not set a default formula is used." ), ) file_external_id = models.TextField( _("File external id"), default="{year}-{numeric_reference}", help_text=_( "Formula to manage file external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive." ), ) file_complete_identifier = models.TextField( _("Archaeological file complete identifier"), default="", blank=True, help_text=_("Formula to manage archaeological file complete identifier."), ) file_custom_index = models.TextField( _("Archaeological file custom index key"), default="", blank=True, help_text=_( "Keys to be used to manage archaeological file custom " "index. Separate keys with a semicolon." ), ) file_cached_label = models.TextField( _("File cached label"), default="", blank=True, help_text=_( "Formula to manage cached label. If not set a default formula is used." ), ) parcel_external_id = models.TextField( _("Parcel external id"), default="{associated_file__external_id}{operation__code_patriarche}-" "{town__numero_insee}-{section}{parcel_number}", help_text=_( "Formula to manage parcel external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive." ), ) parcel_cached_label = models.TextField( _("Parcel cached label"), default="{associated_file__external_id}{operation__complete_identifier} | " "{town__numero_insee} | {section}{parcel_number}", help_text=_( "Formula to manage cached label. If not set a default formula is used." ), ) context_record_external_id = models.TextField( _("Context record external id"), default="{parcel__external_id}-{label}", help_text=_( "Formula to manage context record external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive." ), ) contextrecord_complete_identifier = models.TextField( _("Context record complete identifier"), default="", blank=True, help_text=_("Formula to manage context record complete identifier."), ) contextrecord_custom_index = models.TextField( _("Context record custom index key"), default="", blank=True, help_text=_( "Keys to be used to manage context record custom index. " "Separate keys with a semicolon." ), ) contextrecord_cached_label = models.TextField( _("Context record cached label"), default="{parcel__cached_label} | {label}", help_text=_( "Formula to manage cached label. If not set a default formula is used." ), ) base_find_external_id = models.TextField( _("Base find external id"), default="{context_record__external_id}-{label}", help_text=_( "Formula to manage base find external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive." ), ) basefind_complete_identifier = models.TextField( _("Base find complete identifier"), default="", blank=True, help_text=_("Formula to manage base find complete identifier."), ) basefind_custom_index = models.TextField( _("Base find custom index key"), default="", blank=True, help_text=_( "Keys to be used to manage base find custom index. " "Separate keys with a semicolon." ), ) basefind_cached_label = models.TextField( _("Base find cached label"), default="{context_record__cached_label} | {label}", blank=True, help_text=_( "Formula to manage cached label. If not set a default formula is used." ), ) find_external_id = models.TextField( _("Find external id"), default="{get_first_base_find__context_record__external_id}-{label}", help_text=_( "Formula to manage find external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive." ), ) find_complete_identifier = models.TextField( _("Find complete identifier"), default="", blank=True, help_text=_("Formula to manage find complete identifier."), ) find_custom_index = models.TextField( _("Find custom index key"), default="", blank=True, help_text=_( "Keys to be used to manage find custom index. " "Separate keys with a semicolon." ), ) find_cached_label = models.TextField( _("Find cached label"), default="{get_first_base_find__context_record__operation__complete_identifier}-{index:0>5} | {label}", help_text=_( "Formula to manage cached label. If not set a default formula is used." ), ) museum_complete_identifier = models.TextField( _("Find - Complete museum ID"), default="{% if museum_id_prefix %}{{museum_id_prefix}}.{% endif %}{{museum_id}}{% if museum_id_suffix %}.{{museum_id_suffix}}{% endif %}", help_text=_( "Formula to manage cached label. If not set a default formula is used." ), ) container_external_id = models.TextField( _("Container external id"), default="{parent_external_id}-{container_type__txt_idx}-" "{reference}", help_text=_( "Formula to manage container external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive." ), ) container_complete_identifier = models.TextField( _("Container complete identifier"), default="", blank=True, help_text=_("Formula to manage container complete identifier."), ) container_custom_index = models.TextField( _("Container custom index key"), default="", blank=True, help_text=_( "Keys to be used to manage container custom index. " "Separate keys with a semicolon." ), ) container_cached_label = models.TextField( _("Container cached label"), default="", blank=True, help_text=_( "Formula to manage cached label. If not set a default formula is used." ), ) warehouse_external_id = models.TextField( _("Warehouse external id"), default="{slug}", help_text=_( "Formula to manage warehouse external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive." ), ) warehouse_complete_identifier = models.TextField( _("Warehouse complete identifier"), default="", blank=True, help_text=_("Formula to manage warehouse complete identifier."), ) warehouse_custom_index = models.TextField( _("Warehouse custom index key"), default="", blank=True, help_text=_( "Keys to be used to manage warehouse custom index. " "Separate keys with a semicolon." ), ) warehouse_cached_label = models.TextField( _("Warehouse cached label"), default="", blank=True, help_text=_( "Formula to manage cached label. If not set a default formula is used." ), ) document_external_id = models.TextField( _("Document external id"), default="{index}", help_text=_( "Formula to manage document external ID. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive." ), ) document_complete_identifier = models.TextField( _("Document complete identifier"), default="", blank=True, help_text=_("Formula to manage document complete identifier."), ) document_custom_index = models.TextField( _("Document custom index key"), default="", blank=True, help_text=_( "Keys to be used to manage document custom index. " "Separate keys with a semicolon." ), ) document_cached_label = models.TextField( _("Document cached label"), default="", blank=True, help_text=_( "Formula to manage cached label. If not set a default formula is used." ), ) person_raw_name = models.TextField( _("Raw name for person"), default="{name|upper} {surname}", help_text=_( "Formula to manage person raw_name. " "Change this with care. With incorrect formula, the " "application might be unusable and import of external " "data can be destructive." ), ) find_use_index = models.BooleanField(_("Use auto index for finds"), default=True) currency = models.CharField( _("Currency"), default="€", choices=CURRENCY, max_length=5 ) account_naming_style = models.CharField( _("Naming style for accounts"), max_length=2, default="FN", choices=ACCOUNT_NAMING_STYLE, ) default_center = models.PointField( _("Maps - default center"), default="SRID=4326;POINT(2.4397 46.5528)" ) default_zoom = models.IntegerField(_("Maps - default zoom"), default=6) display_srs = models.ForeignKey( SpatialReferenceSystem, verbose_name=_("Spatial Reference System for display"), blank=True, null=True, help_text=_( "Spatial Reference System used for display when no SRS is defined" ), on_delete=models.SET_NULL, related_name="profile_display_srs" ) srs = models.ForeignKey( SpatialReferenceSystem, verbose_name=_("Spatial Reference System in database"), blank=True, null=True, help_text=_( "Set it to the most used spatial reference system. Warning: after change " "launch the migrate_srid script." ), on_delete=models.SET_NULL, related_name="profile_srs" ) default_language = models.ForeignKey( Language, verbose_name=_("Default language for documentation"), blank=True, null=True, help_text=_( "If set, by default the selected language will be set for " "localized documents." ), on_delete=models.SET_NULL, ) objects = SlugModelManager() class Meta: verbose_name = _("Ishtar site profile") verbose_name_plural = _("Ishtar site profiles") ordering = ["label"] ADMIN_SECTION = _("General settings") def __str__(self): return str(self.label) def natural_key(self): return (self.slug,) def has_overload(self, key): return ( self.config and self.config in ALTERNATE_CONFIGS and hasattr(ALTERNATE_CONFIGS[self.config], key) ) @classmethod def get_current_profile(cls, force=False): cache_key, value = get_cache(cls, ["is-current-profile"]) if value and not force: return value q = cls.objects.filter(active=True) if not q.count(): obj = cls.objects.create( label="Default profile", slug="default", active=True ) else: obj = q.all()[0] cache.set(cache_key, obj, settings.CACHE_TIMEOUT) return obj @classmethod def get_default_site_label(cls, key=None): return cls.get_current_profile().get_site_label(key) def get_site_label(self, key=None): if not key: return str(dict(SITE_LABELS)[self.archaeological_site_label]) return str(TRANSLATED_SITE_LABELS[self.archaeological_site_label][key]) @property def rendered_footer(self): if not self.footer: return "" return markdown(self.footer) def save(self, *args, **kwargs): raw = False if "raw" in kwargs: raw = kwargs.pop("raw") super(IshtarSiteProfile, self).save(*args, **kwargs) obj = self if raw: return obj q = self.__class__.objects.filter(active=True).exclude(slug=self.slug) if obj.active and q.count(): for profile in q.all(): profile.active = False profile.save(raw=True) changed = False if not obj.active and not q.count(): obj.active = True changed = True if obj.warehouse and not obj.find: obj.find = True changed = True if obj.find and not obj.context_record: obj.context_record = True changed = True if changed: obj = obj.save(raw=True) return obj def _profile_mapping(): return get_current_profile().mapping profile_mapping = lazy(_profile_mapping) def cached_site_changed(sender, **kwargs): get_current_profile(force=True) from ishtar_common.menus import Menu MAIN_MENU = Menu(None) MAIN_MENU.init() MAIN_MENU.reinit_menu_for_all_user() post_save.connect(cached_site_changed, sender=IshtarSiteProfile) post_delete.connect(cached_site_changed, sender=IshtarSiteProfile) class CustomFormManager(models.Manager): def get_by_natural_key(self, name, form): return self.get(name=name, form=form) class CustomForm(models.Model): name = models.CharField(_("Name"), max_length=250) form = models.CharField(_("Form"), max_length=250) available = models.BooleanField(_("Available"), default=True) enabled = models.BooleanField( _("Enable this form"), default=True, help_text=_( "Disable with caution: disabling a form with mandatory " "fields may lead to database errors." ), ) header = models.TextField(_("Header text"), default="", blank=True, help_text=_("You can use markdown syntax.")) apply_to_all = models.BooleanField( _("Apply to all"), default=False, help_text=_( "Apply this form to all users. If set to True, selecting " "user and user type is useless." ), ) users = models.ManyToManyField("IshtarUser", blank=True) user_types = models.ManyToManyField( "PersonType", blank=True, help_text=_("Deprecated - use profile types") ) profile_types = models.ManyToManyField("ProfileType", blank=True) objects = CustomFormManager() SERIALIZATION_EXCLUDE = ("users",) class Meta: verbose_name = _("Custom form") verbose_name_plural = _("Custom forms") ordering = ["name", "form"] unique_together = (("name", "form"),) ADMIN_SECTION = _("Custom data / custom forms") def natural_key(self): return (self.name, self.form) def __str__(self): return "{} - {}".format(self.name, self.form) def users_lbl(self): users = [str(user) for user in self.users.all()] return " ; ".join(users) users_lbl.short_description = _("Users") def user_types_lbl(self): user_types = [str(u) for u in self.user_types.all()] return " ; ".join(user_types) user_types_lbl.short_description = _("User types") @classmethod def register(cls): if hasattr(cls, "_register") and hasattr(cls, "_register_fields"): return cls._register, cls._register_fields cache_key, value = get_cache( cls.__class__, ["dct-forms"], app_label="ishtar_common" ) cache_key_fields, value_fields = get_cache( cls.__class__, ["dct-fields"], app_label="ishtar_common" ) if value and value_fields: cls._register = value cls._register_fields = value_fields return cls._register, cls._register_fields cls._register, cls._register_fields = {}, {} # ideally should be improved but only used in admin from ishtar_common.admin import ISHTAR_FORMS from ishtar_common.forms import CustomForm as CustomFormForm for app_form in ISHTAR_FORMS: app_name = app_form.__package__ for form in dir(app_form): if "Form" not in form and "Select" not in form: # not very clean... but do not treat inappropriate items continue form = getattr(app_form, form) if ( not inspect.isclass(form) or not issubclass(form, CustomFormForm) or not getattr(form, "form_slug", None) or getattr(form, "NO_CUSTOM_FORM", False) ): continue model_name = form.form_slug.split("-")[0].replace("_", "") if app_name not in cls._register_fields: cls._register_fields[app_name] = [] if model_name not in cls._register_fields[app_name]: cls._register_fields[app_name].append(model_name) cls._register[form.form_slug] = form return cls._register, cls._register_fields def get_form_class(self): register, register_fields = self.register() if self.form not in self._register: return return register[self.form] def get_available_json_fields(self): register, register_fields = self.register() if self.form not in self._register: return [] current_form = register[self.form] app_name = current_form.__module__.split(".")[0] if app_name not in register_fields: return [] res = [] for model_name in register_fields[app_name]: q = ContentType.objects.filter(app_label=app_name, model=model_name) if not q.count(): continue ct = q.all()[0] for json_field in JsonDataField.objects.filter(content_type=ct).all(): res.append( ( json_field.pk, "{} ({})".format( json_field.name, dict(JSON_VALUE_TYPES)[json_field.value_type], ), ) ) return res class ExcludedFieldManager(models.Manager): def get_by_natural_key(self, custom_form_name, custom_form_form, field): return self.get( custom_form__name=custom_form_name, custom_form__form=custom_form_form, field=field, ) class ExcludedField(models.Model): custom_form = models.ForeignKey( CustomForm, related_name="excluded_fields", on_delete=models.CASCADE ) field = models.CharField(_("Field"), max_length=250) objects = ExcludedFieldManager() class Meta: verbose_name = _("Excluded field") verbose_name_plural = _("Excluded fields") unique_together = ("custom_form", "field") def natural_key(self): return (self.custom_form.name, self.custom_form.form, self.field) class CustomFormJsonFieldManager(models.Manager): def get_by_natural_key( self, custom_form_name, custom_form_form, json_field_key, json_field_app_label, json_field_model, ): return self.get( custom_form__name=custom_form_name, custom_form__form=custom_form_form, json_field__key=json_field_key, json_field__content_type__app_label=json_field_app_label, json_field__content_type__model=json_field_model, ) class CustomFormJsonField(models.Model): custom_form = models.ForeignKey( CustomForm, related_name="json_fields", on_delete=models.CASCADE ) json_field = models.ForeignKey( JsonDataField, related_name="custom_form_details", on_delete=models.CASCADE ) label = models.CharField(_("Label"), max_length=200, blank=True, default="") order = models.IntegerField(verbose_name=_("Order"), default=1) help_text = models.TextField(_("Help"), blank=True, default="") objects = CustomFormJsonFieldManager() class Meta: verbose_name = _("Custom forms - Json data field") verbose_name_plural = _("Custom forms - Json data fields") unique_together = ("custom_form", "json_field") ADMIN_SECTION = _("Custom data / custom forms") def natural_key(self): return ( self.custom_form.name, self.custom_form.form, self.json_field.key, self.json_field.content_type.app_label, self.json_field.content_type.model, ) class GlobalVar(models.Model, Cached): slug = models.SlugField(_("Variable name"), unique=True) description = models.TextField( _("Description of the variable"), blank=True, default="" ) value = models.TextField(_("Value"), blank=True, default="") objects = SlugModelManager() class Meta: verbose_name = _("Global variable") verbose_name_plural = _("Global variables") ordering = ["slug"] ADMIN_SECTION = _("General settings") def natural_key(self): return (self.slug,) def __str__(self): return str(self.slug) def cached_globalvar_changed(sender, **kwargs): if not kwargs["instance"]: return var = kwargs["instance"] cache_key, value = get_cache(GlobalVar, var.slug) cache.set(cache_key, var.value, settings.CACHE_TIMEOUT) post_save.connect(cached_globalvar_changed, sender=GlobalVar) class UserDashboard: def __init__(self): types = IshtarUser.objects.values( "person__person_types", "person__person_types__label" ) self.types = types.annotate(number=Count("pk")).order_by("person__person_types") class StatsCache(models.Model): model = models.CharField(_("Model name"), max_length=200) model_pk = models.IntegerField(_("Associated primary key")) values = JSONField(default=dict, blank=True) updated = models.DateTimeField(default=datetime.datetime.now) update_requested = models.DateTimeField(blank=True, null=True) class Meta: verbose_name = _("Cache for stats") verbose_name_plural = _("Caches for stats") class Dashboard(object): def __init__( self, model, slice="year", date_source=None, show_detail=None, fltr=None ): if not fltr: fltr = {} # don't provide date_source if it is not relevant self.model = model self.total_number = model.get_total_number(fltr) self.show_detail = show_detail history_model = self.model.history.model # last edited - created self.recents, self.lasts = [], [] for last_lst, modif_type in ((self.lasts, "+"), (self.recents, "~")): last_ids = history_model.objects.values("id").annotate( hd=Max("history_date") ) last_ids = last_ids.filter(history_type=modif_type) from archaeological_finds.models import Find if self.model == Find: last_ids = last_ids.filter(downstream_treatment_id__isnull=True) if modif_type == "+": last_ids = last_ids.filter(upstream_treatment_id__isnull=True) last_ids = last_ids.order_by("-hd").distinct().all()[:5] for idx in last_ids: try: obj = self.model.objects.get(pk=idx["id"]) except self.model.DoesNotExist: # deleted object are always referenced in history continue obj.history_date = idx["hd"] last_lst.append(obj) # years base_kwargs = {"fltr": fltr.copy()} if date_source: base_kwargs["date_source"] = date_source periods_kwargs = copy.deepcopy(base_kwargs) periods_kwargs["slice"] = slice self.periods = model.get_periods(**periods_kwargs) self.periods = list(set(self.periods)) self.periods.sort() if not self.total_number or not self.periods: return kwargs_num = copy.deepcopy(base_kwargs) self.serie_labels = [_("Total")] # numbers if slice == "year": self.values = [("year", "", list(reversed(self.periods)))] self.numbers = [ model.get_by_year(year, **kwargs_num).count() for year in self.periods ] self.values += [("number", _("Number"), list(reversed(self.numbers)))] if slice == "month": periods = list(reversed(self.periods)) self.periods = [ "%d-%s-01" % (p[0], ("0" + str(p[1])) if len(str(p[1])) == 1 else p[1]) for p in periods ] self.values = [("month", "", self.periods)] if show_detail: for dpt, lbl in settings.ISHTAR_DPTS: self.serie_labels.append(str(dpt)) idx = "number_" + str(dpt) kwargs_num["fltr"]["towns__numero_insee__startswith"] = str(dpt) numbers = [ model.get_by_month(*p.split("-")[:2], **kwargs_num).count() for p in self.periods ] self.values += [(idx, dpt, list(numbers))] # put "Total" at the end self.serie_labels.append(self.serie_labels.pop(0)) kwargs_num = base_kwargs.copy() self.numbers = [ model.get_by_month(*p.split("-")[:2], **kwargs_num).count() for p in self.periods ] self.values += [("number", _("Total"), list(self.numbers))] # calculate self.average = self.get_average() self.variance = self.get_variance() self.standard_deviation = self.get_standard_deviation() self.median = self.get_median() self.mode = self.get_mode() # by operation if not hasattr(model, "get_by_operation"): return operations = model.get_operations() operation_numbers = [model.get_by_operation(op).count() for op in operations] # calculate self.operation_average = self.get_average(operation_numbers) self.operation_variance = self.get_variance(operation_numbers) self.operation_standard_deviation = self.get_standard_deviation( operation_numbers ) self.operation_median = self.get_median(operation_numbers) operation_mode_pk = self.get_mode(dict(zip(operations, operation_numbers))) if operation_mode_pk: from archaeological_operations.models import Operation self.operation_mode = str(Operation.objects.get(pk=operation_mode_pk)) def get_average(self, vals=None): if not vals: vals = self.numbers[:] return sum(vals) / len(vals) def get_variance(self, vals=None): if not vals: vals = self.numbers[:] avrg = self.get_average(vals) return self.get_average([(x - avrg) ** 2 for x in vals]) def get_standard_deviation(self, vals=None): if not vals: vals = self.numbers[:] return round(self.get_variance(vals) ** 0.5, 3) def get_median(self, vals=None): if not vals: vals = self.numbers[:] len_vals = len(vals) vals.sort() if (len_vals % 2) == 1: return vals[int(len_vals / 2)] else: return (vals[int(len_vals / 2) - 1] + vals[int(len_vals / 2)]) / 2.0 def get_mode(self, vals=None): if not vals: vals = dict(zip(self.periods, self.numbers[:])) mx = max(vals.values()) for v in vals: if vals[v] == mx: return v class DocumentTemplate(models.Model): name = models.CharField(_("Name"), max_length=100) slug = models.SlugField(_("Slug"), max_length=100, unique=True) associated_model = models.ForeignKey(ImporterModel, on_delete=models.CASCADE) template = models.FileField( _("Template"), upload_to="templates/%Y/", blank=True, null=True, help_text=max_size_help(), ) label_template = models.FileField( _("Base template for labels"), upload_to="templates/%Y/", blank=True, null=True, help_text=max_size_help(), ) label_targets = models.TextField( _("Labels: targets for labels in the LibreOffice file"), blank=True, null=True, help_text=_( "Each target is separated by a semi-colon. The first " "target is the name of the object including the data in " "base template. Following targets will be filled with the " "content of the first target. For instance: " '"Cadre1;Cadre2;Cadre3;Cadre4;Cadre5;Cadre6" for a ' "sheet with 6 labels." ), ) available = models.BooleanField(_("Available"), default=True) for_labels = models.BooleanField(_("Used for labels"), default=False) label_per_page = models.IntegerField( _("Number of label per page"), blank=True, null=True, help_text=_("Only relevant for label template"), ) objects = SlugModelManager() SERIALIZATION_FILES = ("template",) class Meta: verbose_name = _("Document template") verbose_name_plural = _("Document templates") ordering = ["associated_model", "name"] ADMIN_SECTION = _("General settings") def __str__(self): return self.name def natural_key(self): return (self.slug,) def clean(self): if self.for_labels and not self.label_per_page: raise ValidationError( _("For label template, you must provide " "number of label per page.") ) def generate_label_template(self): if not self.label_template.name or not self.label_targets: return targets = self.label_targets.split(";") base_target = targets[0] try: with zipfile.ZipFile(self.label_template.path) as zip: with zip.open("content.xml") as content: soup = BeautifulSoup(content.read(), "xml") base_content = soup.find( "draw:frame", attrs={"draw:name": base_target} ) if not base_content: return base_content = base_content.contents except (FileNotFoundError, zipfile.BadZipFile, KeyError): base_content = None if not base_content: return for idx, target in enumerate(targets[1:]): replace_str = "items." + str(idx + 1) new_content = [] for content in base_content: content = copy.copy(content) for text in content.find_all(text=re.compile("items.0")): fixed_text = text.replace("items.0", replace_str) text.replace_with(fixed_text) for image in content.find_all( attrs={"draw:name": re.compile("items.0")} ): image["draw:name"] = image["draw:name"].replace( "items.0", replace_str ) new_content.append(content) next_target = soup.find("draw:frame", attrs={"draw:name": target}) if next_target: next_target.contents = new_content with tempfile.TemporaryDirectory() as tmp: sp = self.label_template.name.split(os.sep)[-1].split(".") if len(sp) == 1: # no extension? sp.append("odt") sp[-2] += "-label" new_filename = ".".join(sp) new_file = os.path.join(tmp, new_filename) with zipfile.ZipFile(new_file, "w") as zip_out: with zipfile.ZipFile(self.label_template.path, "r") as zip_in: zip_out.comment = zip_in.comment for item in zip_in.infolist(): if item.filename != "content.xml": zip_out.writestr(item, zip_in.read(item.filename)) with zipfile.ZipFile( new_file, mode="a", compression=zipfile.ZIP_DEFLATED ) as zf: zf.writestr("content.xml", str(soup)) media_dir = "templates/{}/".format(datetime.date.today().year) full_media_dir = os.path.join(settings.MEDIA_ROOT, media_dir) if not os.path.exists(full_media_dir): os.mkdir(full_media_dir) media_file = new_filename idx = 0 while os.path.exists(os.path.join(settings.MEDIA_ROOT, media_file)): idx += 1 sp = media_file.split(".") sub_sp = sp[-2].split("-label") sub_sp[-1] += str(idx) sp[-2] = "-label".join(sub_sp) media_file = ".".join(sp) with open(new_file, "rb") as file: with ContentFile(file.read()) as file_content: self.template.save(media_file, file_content) self.save() def save(self, *args, **kwargs): if not self.slug: self.slug = create_slug(DocumentTemplate, self.name) super(DocumentTemplate, self).save(*args, **kwargs) if self.label_template.name and self.label_targets and not self.template: self.generate_label_template() @classmethod def get_tuples(cls, dct=None, empty_first=True): if not dct: dct = {} dct["available"] = True if empty_first: yield "", "----------" items = cls.objects.filter(**dct) for item in items.distinct().order_by(*cls._meta.ordering).all(): yield item.pk, _(str(item)) def get_baselink_for_labels(self): return reverse("generate-labels", args=[self.slug]) def _exclude_filter(self, value): """ value is excluded from values to fetch? """ if not value or value in ("in", "not", "el") or value.startswith("|"): return True try: int(value) return True except ValueError: # not a single int pass return False def get_filter(self, template, regexp_list=None): if not regexp_list: return None z = zipfile.ZipFile(template) content = z.open("content.xml") full_content = content.read().decode("utf-8") filtr = [] for regexp in regexp_list: iter = re.finditer(regexp, full_content) for s in iter: key = s.groups()[0] if key not in filtr: filtr.append(key) new_filter = [] OPERATORS = [ "==", "not", "in", ">", "<", "!=", ">", "<", ">=", "<=", "or", ">=", "<=", ] for fil in filtr: if not fil: continue new_filter += [f for f in fil.split(" ") if f and f not in OPERATORS] filtr = new_filter new_filter = [] for fil in filtr: keys = fil.strip().split("|")[0].split(".") new_filter += [k for k in keys if not self._exclude_filter(k)] prefix = "" for k in keys: if self._exclude_filter(k): continue if prefix: prefix += "_" if prefix + k in new_filter: continue new_filter.append(prefix + k) prefix += k return list(set(new_filter)) ITEM_RE = r"([A-Za-z0-9_.]*)(?:[\[\]0-9-:])*(?:\|[^}]+)*" BASE_RE = [ # {{ key1.key2 }} r"{{ *" + ITEM_RE + " *}}", # {% for item in key1.key2 %} r"{% *for +[A-Za-z0-9_]+ +in +" + ITEM_RE + r" *%}", # {% if ** %} r"{% (?:el)*if ([^}]*)%}", ] def publish(self, c_object): tempdir = tempfile.mkdtemp("-ishtardocs") output_name = ( tempdir + os.path.sep + slugify(self.name.replace(" ", "_").lower()) + "-" + datetime.date.today().strftime("%Y-%m-%d") + "." + self.template.name.split(".")[-1] ) filtr = self.get_filter(self.template, self.BASE_RE) # values = c_object.get_values(filtr=[]) if "VALUES" in filtr: filtr = [] values = c_object.get_values(filtr=filtr) if not filtr or "VALUES" in filtr: values["VALUES"] = json.dumps( values, indent=4, sort_keys=True, skipkeys=True, ensure_ascii=False, separators=("", " : "), ).replace(" " * 4, "\t") engine = IshtarSecretaryRenderer() try: result = engine.render(self.template, **values) except TemplateSyntaxError as e: raise TemplateSyntaxError(str(e), e.lineno) except UndefinedError as e: raise TemplateSyntaxError(str(e), 0) except Exception as e: raise TemplateSyntaxError(str(e), 0) with open(output_name, "wb") as output: output.write(result) return output_name LABEL_ITEM_RE = r"items\.\d\.([A-Za-z0-9_.]*)(?:[\[\]0-9-:])*(?:\|[^}]+)*" LABEL_RE = [ # {{items.4.key}} r"{{ *" + LABEL_ITEM_RE + r" *}}", # {% if ** %} r"{% (?:el)*if ([^}]*)%}", # {% for item in items.42.another_keys %} r"{% *for +[A-Za-z0-9_]+ +in +" + LABEL_ITEM_RE + r" *%}", ] def publish_labels(self, objects): if not objects: return tempdir = tempfile.mkdtemp("-ishtarlabels") main_output_name = ( tempdir + os.path.sep + slugify(self.name.replace(" ", "_").lower()) + "-" + datetime.datetime.now().strftime("%Y-%m-%d-%H%M%S") ) suffix = "." + self.template.name.split(".")[-1] len_objects = len(objects) names = [] filtr = self.get_filter(self.template, self.LABEL_RE) for idx in range(int(len(objects) / self.label_per_page) + 1): if idx * self.label_per_page >= len_objects: break values = {"items": []} for subidx in range(self.label_per_page): c_idx = idx * self.label_per_page + subidx if c_idx >= len_objects: break obj = objects[c_idx] values["items"].append(obj.get_values(filtr=filtr)) engine = IshtarSecretaryRenderer() try: result = engine.render(self.template, **values) except TemplateSyntaxError as e: raise TemplateSyntaxError(str(e), e.lineno) output_name = main_output_name + "-" + str(idx) + suffix names.append(output_name) with open(output_name, "wb") as output: output.write(result) output_name = main_output_name + suffix o = OOoPy(infile=names[0], outfile=output_name) if len(names) > 1: t = OOTransformer( o.mimetype, OOTransforms.get_meta(o.mimetype), OOTransforms.Concatenate(*(names[1:])), OOTransforms.renumber_all(o.mimetype), OOTransforms.set_meta(o.mimetype), OOTransforms.Fix_OOo_Tag(), OOTransforms.Manifest_Append(), ) t.transform(o) o.close() return output_name class Area(HierarchicalType, DocumentItem, MainItem): SLUG = "area" towns = models.ManyToManyField( Town, verbose_name=_("Towns"), blank=True, related_name="areas" ) reference = models.TextField(_("Reference"), blank=True, default="") parent = models.ForeignKey( "self", blank=True, null=True, verbose_name=_("Parent"), help_text=_("Only four level of parent are managed."), related_name="children", on_delete=models.SET_NULL, ) documents = models.ManyToManyField( "Document", related_name="areas", verbose_name=_("Documents"), blank=True ) main_image = models.ForeignKey( "Document", related_name="main_image_areas", on_delete=models.SET_NULL, verbose_name=_("Main image"), blank=True, null=True, ) class Meta: verbose_name = _("Area") verbose_name_plural = _("Areas") ordering = ("label",) ADMIN_SECTION = _("Geography") def __str__(self): if not self.reference: return self.label return "{} ({})".format(self.label, self.reference) @classmethod def get_or_create_by_towns(cls, towns, get_geo=False): if hasattr(towns, "all"): # queryset if not towns.count(): return towns = towns.all() elif not len(towns): return name = [] reference = [] for town in sorted(towns, key=lambda x: (x.numero_insee, x.name)): name.append(town._generate_cached_label()) reference.append(town.numero_insee or slugify(town.name)) name = " / ".join(name) reference = f"area-{'/'.join(reference)}" area, created = cls.objects.get_or_create( reference=reference, defaults={"label": name, "available": False} ) area_content_type = ContentType.objects.get(app_label="ishtar_common", model="area") attrs = { "source_content_type": area_content_type, "source_id": area.pk, } q = GeoVectorData.objects.filter(**attrs) if created or not q.count(): data_type, __ = GeoDataType.objects.get_or_create( txt_idx="area-limit", defaults={"label": str(_("Communal area boundaries"))} ) attrs["data_type"] = data_type attrs["name"] = name geo = GeoVectorData.objects.create(**attrs) else: geo = q.all()[0] q_poly_towns = GeoVectorData.objects.filter( source_content_type__app_label="ishtar_common", source_content_type__model="town", source_id__in=[t.pk for t in towns]) q_poly = q_poly_towns.annotate(poly=Union("multi_polygon")) poly = q_poly.all()[0].poly if not geo.multi_polygon or not geo.multi_polygon.equals_exact(poly, 0.001): origins, providers = [], [] for g in q_poly_towns: origins.append(g.origin) providers.append(g.provider) if len(set(origins)) == 1: # no ambiguous origin geo.origin = origins[0] if len(set(providers)) == 1: # no ambiguous provider geo.provider = providers[0] if isinstance(poly, Polygon): poly = MultiPolygon(poly,) geo.multi_polygon = poly geo.save() if get_geo: return geo return area @property def full_label(self): label = [str(self)] if self.parent and self.parent.pk != self.pk: label.append(self.parent.full_label) return " / ".join(label) def _get_base_image_path(self): return self.SLUG @property def associated_filename(self): value = self.reference if self.reference else self.name return slugify(value) m2m_changed.connect(document_attached_changed, sender=Area.documents.through) GENDER = ( ("M", _("Male")), ("F", _("Female")), ("N", _("Neutral")), ) def documentation_get_gender_values(): doc = "" for idx, gender in enumerate(GENDER): key, label = gender if idx: doc += " ;" doc += ' "{}": {}'.format(key, label) return doc class BaseGenderedType(ValueGetter): def get_extra_values(self, prefix="", **kwargs): dct = {} if not hasattr(self, "grammatical_gender"): raise NotImplementedError("This model should have a grammatical_gender field") dct[prefix + "grammatical_gender"] = self.grammatical_gender return dct class GenderedType(BaseGenderedType, GeneralType): grammatical_gender = models.CharField( _("Grammatical gender"), max_length=1, choices=GENDER, blank=True, default="", help_text=documentation_get_gender_values, ) class Meta: abstract = True @classmethod def get_documentation_string(cls): """ Used for automatic documentation generation """ doc = super(GenderedType, cls).get_documentation_string() doc += ", **grammatical_gender** {} -".format(_("Grammatical gender")) doc += documentation_get_gender_values() return doc class OrganizationType(GenderedType): class Meta: verbose_name = _("Organization type") verbose_name_plural = _("Organization types") ordering = ("label",) ADMIN_SECTION = _("Directory") def get_orga_planning_service_label(): if apps.ready: lbl = get_general_type_label(OrganizationType, "planning_service") if lbl: return lbl return _("Error: planning_service type is missing") def get_orga_general_contractor_label(): if apps.ready: lbl = get_general_type_label(OrganizationType, "general_contractor") if lbl: return lbl return _("Error: general_contractor type is missing") post_save.connect(post_save_cache, sender=OrganizationType) post_delete.connect(post_save_cache, sender=OrganizationType) organization_type_pk_lazy = lazy(OrganizationType.get_or_create_pk, str) organization_type_pks_lazy = lazy(OrganizationType.get_or_create_pks, str) class Organization(Address, Merge, OwnPerms, BaseGenderedType, ValueGetter, MainItem): TABLE_COLS = ("name", "organization_type", "address", "town") #, "precise_town") SLUG = "organization" SHOW_URL = "show-organization" DELETE_URL = "delete-organization" # search parameters EXTRA_REQUEST_KEYS = { "precise_town": "precise_town__cached_label" } BASE_SEARCH_VECTORS = [ SearchVectorConfig("name"), SearchVectorConfig("town"), SearchVectorConfig("museum_museofile_id", "raw"), ] PROPERTY_SEARCH_VECTORS = [ SearchVectorConfig("precise_town_name"), ] # alternative names of fields for searches ALT_NAMES = { "name": SearchAltName( pgettext_lazy("key for text search", "name"), "name__iexact" ), "organization_type": SearchAltName( pgettext_lazy("key for text search", "type"), "organization_type__label__iexact", ), "precise_town_id": SearchAltName( pgettext_lazy("key for text search", "precise-town"), "precise_town_id", ), "museum_museofile_id": SearchAltName( pgettext_lazy("key for text search", "museofile"), "museum_museofile_id__iexact", ), } QA_EDIT = QuickAction( url="organization-qa-bulk-update", icon_class="fa fa-pencil", text=_("Bulk update"), target="many", rights=["change_organization"], ) QUICK_ACTIONS = [QA_EDIT] objects = UUIDModelManager() # fields uuid = models.UUIDField(default=uuid.uuid4) name = models.CharField(_("Name"), max_length=500) organization_type = models.ForeignKey( OrganizationType, verbose_name=_("Type"), on_delete=models.PROTECT ) url = models.URLField(verbose_name=_("Web address"), blank=True, null=True) grammatical_gender = models.CharField( _("Grammatical gender"), max_length=1, choices=GENDER, blank=True, default="", help_text=documentation_get_gender_values, ) museum_museofile_id = models.TextField(_("Museofile ID"), blank=True, default="") cached_label = models.TextField( _("Cached name"), blank=True, default="", db_index=True ) DOWN_MODEL_UPDATE = ["members"] class Meta: verbose_name = _("Organization") verbose_name_plural = _("Organizations") permissions = ( ("view_own_organization", "Can view own Organization"), ("add_own_organization", "Can add own Organization"), ("change_own_organization", "Can change own Organization"), ("delete_own_organization", "Can delete own Organization"), ) indexes = [ GinIndex(fields=["data"]), ] ADMIN_SECTION = _("Directory") def simple_lbl(self): if self.name: return self.name return "{} - {}".format(self.organization_type, self.town or "") def natural_key(self): return (self.uuid,) def __str__(self): return self.cached_label or "" def _generate_cached_label(self): if self.name: return self.name attrs = ["organization_type", "address", "town"] items = [str(getattr(self, attr)) for attr in attrs if getattr(self, attr)] if not items: items = [str(_("unknown organization"))] return " - ".join(items) def generate_merge_key(self): self.merge_key = slugify(self.name or "") if not self.merge_key: self.merge_key = self.EMPTY_MERGE_KEY @property def associated_filename(self): values = [ str(getattr(self, attr)) for attr in ("organization_type", "name") if getattr(self, attr) ] return slugify("-".join(values)) @classmethod @pre_importer_action def import_get_publisher_type(cls, context, value): if context["name"]: q = OrganizationType.objects.filter(txt_idx="publisher") if not q.count(): return context["organization_type"] = q.all()[0] post_save.connect(cached_label_changed, sender=Organization) class PersonType(GeneralType): class Meta: verbose_name = _("Person type") verbose_name_plural = _("Person types") ordering = ("label",) ADMIN_SECTION = _("Directory") post_save.connect(post_save_cache, sender=PersonType) post_delete.connect(post_save_cache, sender=PersonType) person_type_pk_lazy = lazy(PersonType.get_or_create_pk, str) person_type_pks_lazy = lazy(PersonType.get_or_create_pks, str) def get_sra_agent_head_scientist_label(): if apps.ready: lbl = get_general_type_label(PersonType, "head_scientist") if lbl: lbl += str(_("/")) lbl2 = get_general_type_label(PersonType, "sra_agent") if lbl2: lbl += lbl2 if lbl: return lbl return _("Error: sra_agent and head_scientist types are missing") def get_sra_agent_label(): if apps.ready: lbl = get_general_type_label(PersonType, "sra_agent") if lbl: return lbl return _("Error: sra_agent type is missing") def get_general_contractor_label(): if apps.ready: lbl = get_general_type_label(PersonType, "general_contractor") if lbl: return lbl return _("Error: general_contractor type is missing") def get_responsible_planning_service_label(): if apps.ready: lbl = get_general_type_label(PersonType, "responsible_planning_service") if lbl: return lbl return _("Error: responsible_planning_service type is missing") def get_publisher_label(): if apps.ready: lbls = [] for key in settings.ISHTAR_SLUGS["document-publisher"]: lbl = get_general_type_label(OrganizationType, key) if lbl: lbls.append(lbl) if lbls: return " ; ".join(lbls) return _("Error: publisher type is missing") def get_operator_label(): if apps.ready: lbl = get_general_type_label(OrganizationType, "operator") if lbl: return lbl return _("Error: operator type is missing") class TitleType(GenderedType): long_title = models.TextField(_("Long title"), default="", blank=True) class Meta: verbose_name = _("Title type") verbose_name_plural = _("Title types") ordering = ("label",) ADMIN_SECTION = _("Directory") @classmethod def get_documentation_string(cls): """ Used for automatic documentation generation """ doc = super(TitleType, cls).get_documentation_string() doc += ", **long_title** {}".format(_("Long title")) return doc def get_extra_values(self, prefix="", **kwargs): dct = super().get_extra_values(prefix=prefix, **kwargs) dct[prefix + "long_title"] = self.long_title return dct post_save.connect(post_save_cache, sender=TitleType) post_delete.connect(post_save_cache, sender=TitleType) class Person(Address, Merge, OwnPerms, ValueGetter, MainItem): SLUG = "person" _prefix = "person_" TYPE = ( ("Mr", _("Mr")), ("Ms", _("Miss")), ("Mr and Miss", _("Mr and Mrs")), ("Md", _("Mrs")), ("Dr", _("Doctor")), ) TABLE_COLS = ( "name", "surname", "raw_name", "email", "person_types_list", "attached_to", "town", ) TABLE_COLS_ACCOUNT = ( "name", "surname", "raw_name", "email", "profiles_list", "attached_to", "town", ) SHOW_URL = "show-person" MODIFY_URL = "person_modify" DELETE_URL = "person_delete" BASE_SEARCH_VECTORS = [ SearchVectorConfig("name"), SearchVectorConfig("surname"), SearchVectorConfig("raw_name"), SearchVectorConfig("town"), SearchVectorConfig("attached_to__name"), SearchVectorConfig("email", "raw"), SearchVectorConfig("title", "raw"), SearchVectorConfig("salutation", "raw"), ] # search parameters REVERSED_BOOL_FIELDS = ["ishtaruser__isnull"] EXTRA_REQUEST_KEYS = { "ishtaruser__isnull": "ishtaruser__isnull", "attached_to": "attached_to", } COL_LABELS = {"attached_to": _("Organization")} # alternative names of fields for searches ALT_NAMES = { "name": SearchAltName( pgettext_lazy("key for text search", "name"), "name__iexact" ), "surname": SearchAltName( pgettext_lazy("key for text search", "surname"), "surname__iexact" ), "email": SearchAltName( pgettext_lazy("key for text search", "email"), "email__iexact" ), "title": SearchAltName( pgettext_lazy("key for text search", "title"), "title__label__iexact", related_name="title" ), "salutation": SearchAltName( pgettext_lazy("key for text search", "salutation"), "salutation__iexact" ), "person_types": SearchAltName( pgettext_lazy("key for text search", "type"), "person_types__label__iexact", related_name="person_types" ), "attached_to": SearchAltName( pgettext_lazy("key for text search", "organization"), "attached_to__cached_label__iexact", related_name="attached_to" ), "ishtaruser__isnull": SearchAltName( pgettext_lazy("key for text search", "has-account"), "ishtaruser__isnull", ), "profiles__profile_type": SearchAltName( pgettext_lazy("key for text search", "profile-type"), "profiles__profile_type__label__iexact", ), } QA_EDIT = QuickAction( url="person-qa-bulk-update", icon_class="fa fa-pencil", text=_("Bulk update"), target="many", rights=["change_person"], ) QUICK_ACTIONS = [QA_EDIT] objects = UUIDModelManager() # fields uuid = models.UUIDField(default=uuid.uuid4) old_title = models.CharField( _("Title"), max_length=100, choices=TYPE, blank=True, null=True ) title = models.ForeignKey( TitleType, verbose_name=_("Title"), on_delete=models.SET_NULL, blank=True, null=True, ) salutation = models.CharField( _("Salutation"), max_length=200, blank=True, null=True ) surname = models.CharField( _("Surname"), max_length=50, blank=True, null=True, help_text=_( "Attention, historical and unfortunate residue in the " "code of an initial translation error." ), ) name = models.CharField(_("Name"), max_length=200, blank=True, null=True) raw_name = models.CharField(_("Raw name"), max_length=300, blank=True, null=True) contact_type = models.CharField( _("Contact type"), max_length=300, blank=True, null=True ) comment = models.TextField(_("Comment"), blank=True, default="") person_types = models.ManyToManyField(PersonType, verbose_name=_("Types")) attached_to = models.ForeignKey( "Organization", related_name="members", on_delete=models.SET_NULL, verbose_name=_("Is attached to"), blank=True, null=True, ) cached_label = models.TextField( _("Cached name"), blank=True, default="", db_index=True ) DOWN_MODEL_UPDATE = ["author"] class Meta: verbose_name = _("Person") verbose_name_plural = _("Persons") indexes = [ GinIndex(fields=["data"]), ] permissions = ( ("view_own_person", "Can view own Person"), ("add_own_person", "Can add own Person"), ("change_own_person", "Can change own Person"), ("delete_own_person", "Can delete own Person"), ) ordering = ['name', 'surname'] ADMIN_SECTION = _("Directory") def natural_key(self): return (self.uuid,) @property def first_name(self): return self.surname @property def full_title(self): return " ".join( str(getattr(self, attr)) for attr in ["title", "salutation"] if getattr(self, attr) ) @property def current_profile(self): q = self.profiles.filter(current=True) if q.count(): return q.all()[0] q = self.profiles nb = q.count() if not nb: return # arbitrary set the first one as the current profile = q.all()[0] profile.current = True profile.save() return profile def get_short_html_items(self): items = super(Person, self).get_short_html_items() if items or not self.attached_to: return items orga_address = self.attached_to.get_short_html_items() if not orga_address: return [] items.append( """{}""".format(self.attached_to.name) ) items += orga_address return items def simple_lbl(self): values = [ str(getattr(self, attr)) for attr in ("surname", "name") if getattr(self, attr) ] if not values and self.raw_name: values = [self.raw_name] return " ".join(values) def __str__(self): return self.cached_label or "" def _generate_cached_label(self): lbl = get_generated_id("person_raw_name", self) if not lbl.strip(): return self.raw_name or "-" if self.attached_to: attached_to = str(self.attached_to) lbl += " ({})".format(attached_to) return lbl def fancy_str(self): values = [""] values += [ str(getattr(self, attr)) for attr in ("surname", "name") if getattr(self, attr) ] if not values and self.raw_name: values += [self.raw_name] values += [""] if self.attached_to: attached_to = str(self.attached_to) if values: values.append("-") values.append(attached_to) return " ".join(values) def get_extra_values(self, prefix="", no_values=False, filtr=None, **kwargs): values = {} if not self.attached_to: values.update(Person.get_empty_values(prefix=prefix + "attached_to_")) return values person_types_list_lbl = _("Types") @property def person_types_list(self): return ", ".join(str(pt) for pt in self.person_types.all()) profiles_list_lbl = _("Profiles") @property def profiles_list(self): return ", ".join(set([str(p.profile_type) for p in self.profiles.all()])) def generate_merge_key(self): if self.name and self.name.strip(): self.merge_key = slugify(self.name.strip()) + ( ("-" + slugify(self.surname.strip())) if self.surname else "" ) elif self.raw_name and self.raw_name.strip(): self.merge_key = slugify(self.raw_name.strip()) elif self.attached_to: self.merge_key = self.attached_to.merge_key else: self.merge_key = self.EMPTY_MERGE_KEY if self.merge_key != self.EMPTY_MERGE_KEY and self.attached_to: self.merge_key += "-" + self.attached_to.merge_key def is_natural(self): return not self.attached_to def has_right(self, right_name, session=None, obj=None): if "." in right_name: right_name = right_name.split(".")[-1] res, cache_key = "", "" if session: cache_key = "session-{}-{}".format(session.session_key, right_name) res = cache.get(cache_key) if res in (True, False): return res # list all cache key in order to clean them on profile change cache_key_list = "sessionlist-{}".format(session.session_key) key_list = cache.get(cache_key_list, []) key_list.append(cache_key) cache.set(cache_key_list, key_list, settings.CACHE_TIMEOUT) if isinstance(right_name, (list, tuple)): res = ( bool( self.profiles.filter( current=True, profile_type__txt_idx__in=right_name ).count() ) or bool( self.profiles.filter( current=True, profile_type__groups__permissions__codename__in=right_name, ).count() ) or bool( self.ishtaruser.user_ptr.groups.filter( permissions__codename__in=right_name ).count() ) or bool( self.ishtaruser.user_ptr.user_permissions.filter( codename__in=right_name ).count() ) ) else: res = ( bool( self.profiles.filter( current=True, profile_type__txt_idx=right_name ).count() ) or bool( self.profiles.filter( current=True, profile_type__groups__permissions__codename=right_name, ).count() ) or bool( self.ishtaruser.user_ptr.groups.filter( permissions__codename__in=[right_name] ).count() ) or bool( self.ishtaruser.user_ptr.user_permissions.filter( codename__in=[right_name] ).count() ) ) if session: cache.set(cache_key, res, settings.CACHE_TIMEOUT) return res def full_label(self): values = [] if self.title: values = [self.title.label] values += [ str(getattr(self, attr)) for attr in ("salutation", "surname", "name") if getattr(self, attr) ] if not values and self.raw_name: values = [self.raw_name] if self.attached_to: values.append("- " + str(self.attached_to)) return " ".join(values) @property def associated_filename(self): values = [ str(getattr(self, attr)) for attr in ("surname", "name", "attached_to") if getattr(self, attr) ] return slugify("-".join(values)) def docs_q(self): return Document.objects.filter(authors__person=self) def operation_docs_q(self): return Document.objects.filter( authors__person=self, operations__pk__isnull=False ) def contextrecord_docs_q(self): return Document.objects.filter( authors__person=self, context_records__pk__isnull=False ) def find_docs_q(self): return Document.objects.filter(authors__person=self, finds__pk__isnull=False) def save(self, *args, **kwargs): super(Person, self).save(*args, **kwargs) raw_name = get_generated_id("person_raw_name", self) if raw_name.strip() and self.raw_name != raw_name: self.raw_name = raw_name self.save() if hasattr(self, "responsible_town_planning_service"): for fle in self.responsible_town_planning_service.all(): fle.save() # force update of raw_town_planning_service if hasattr(self, "general_contractor"): for fle in self.general_contractor.all(): fle.save() # force update of raw_general_contractor def get_extra_actions(self, request): actions = super().get_extra_actions(request) # for admin only if not request.user.is_staff: return actions actions += [ ( reverse("account-manage", args=[self.pk]), _("Manage account"), "fa fa-user", "", "", False, ), ] return actions @classmethod def get_query_owns(cls, ishtaruser): return ( Q(operation_scientist_responsability__collaborators__ishtaruser=ishtaruser) | Q(operation_scientist_responsability__scientist__ishtaruser=ishtaruser) | Q(operation_collaborator__collaborators__ishtaruser=ishtaruser) | Q(operation_collaborator__scientist__ishtaruser=ishtaruser) ) post_save.connect(cached_label_changed, sender=Person) TEXT_FORMAT = ( ("NO", _("None")), ("MD", _("Markdown")), ("HT", _("HTML")), ) def text_format(text, text_format): if text_format == "MD": return mark_safe(markdown(text)) elif text_format == "HT": return mark_safe(bleach.clean(text)) return text class BiographicalNote(BaseHistorizedItem, ValueGetter, MainItem): SLUG = "biographicalnote" denomination = models.TextField(_("Denomination")) slug = models.SlugField( _("Textual ID"), max_length=300, help_text=_( "The slug is the standardized version of the name. It contains " "only lowercase letters, numbers and hyphens. Each slug must " "be unique." ), blank=True, ) last_name = models.TextField(_("Last name"), blank=True, default="") first_name = models.TextField(_("First name"), blank=True, default="") birth_year = models.PositiveIntegerField(_("Year of birth"), blank=True, null=True) death_year = models.PositiveIntegerField(_("Year of death"), blank=True, null=True) biography = models.TextField(_("Biography"), blank=True, default="") biography_format = models.CharField( _("Biography format"), blank=True, default="NO", max_length=2, choices=TEXT_FORMAT ) person = models.ForeignKey( Person, blank=True, null=True, on_delete=models.SET_NULL, verbose_name=_("Person"), related_name="biographical_notes" ) organization = models.ForeignKey( Organization, blank=True, null=True, on_delete=models.SET_NULL, verbose_name=_("Organization"), related_name="biographical_notes" ) QA_EDIT = QuickAction( url="biographicalnote-qa-edit", icon_class="fa fa-pencil", text=_("Edit biographical note"), rights=["change_biographicalnote", "change_own_biographicalnote"], ) class Meta: verbose_name = _("Biographical note") verbose_name_plural = _("Biographical notes") permissions = ( ("view_own_biographicalnote", "Can view own Biographical note"), ("add_own_biographicalnote", "Can add own Biographical note"), ("change_own_biographicalnote", "Can change own Biographical note"), ("delete_own_biographicalnote", "Can delete own Biographical note"), ) ADMIN_SECTION = _("Directory") @property def formatted_biography(self): return text_format(self.biography, self.biography_format) def __str__(self): return self.denomination def history_compress(self): return self.slug @classmethod def history_decompress(cls, full_value, create=False): if not full_value: return [] res = [] for value in full_value: try: res.append(cls.objects.get(slug=value)) except cls.DoesNotExist: continue return res def set_slug(self): self.slug = create_slug(self.__class__, self.denomination, max_length=250, pk=self.pk) def get_extra_actions(self, request): """ For sheet template """ # url, base_text, icon, extra_text, extra css class, is a quick action, actions = super().get_extra_actions(request) can_edit = self.can_do(request, "change_biographicalnote") if not can_edit: return actions actions += [ ( reverse("biographicalnote-qa-edit", args=[self.pk]), _("Edit"), "fa fa-pencil", "", "", True, ), ] return actions def save(self, *args, **kwargs): if not self.slug: self.set_slug() return super().save(*args, **kwargs) GDPR_ACTIVITY = ( ("DC", _("Directory consultation")), ("DE", _("Directory export")), ("PV", _("Viewing a person's notice")), ("PE", _("Exporting a person's notice")), ("PC", _("Person creation")), ("PM", _("Person modification")), ("Pm", _("Person merge")), ("PD", _("Person deletion")), ("AC", _("Admin - Directory consultation")), ("AV", _("Admin - Person view")), ("AM", _("Admin - Person modification")), ("AD", _("Admin - Person deletion")), ) GDPR_ACTIVITY_DICT = dict(GDPR_ACTIVITY) class GDPRPerson(models.Model): person = models.ForeignKey(Person, verbose_name=_("Person"), on_delete=models.SET_NULL, blank=True, null=True, related_name="gdpr_person") raw_name = models.CharField(_("Raw name"), max_length=300, default="-") class Meta: verbose_name = _("GDPR - Person") verbose_name_plural = _("GDPR - Persons") def __str__(self): return self.raw_name class GDPRLog(models.Model): user = models.ForeignKey(User, verbose_name=_("User"), on_delete=models.PROTECT, blank=True, null=True) date = models.DateTimeField(verbose_name=_("Date"), default=datetime.datetime.now) ip = models.GenericIPAddressField(verbose_name=_("IP"), blank=True, null=True) routable_ip = models.BooleanField(verbose_name=_("Routable IP"), default=False) activity = models.CharField(_("Activity"), max_length=2, choices=GDPR_ACTIVITY) persons = models.ManyToManyField(GDPRPerson, verbose_name=_("Persons"), blank=True) class Meta: verbose_name = _("GDPR - Log") verbose_name_plural = _("GDPR - Logs") ordering = ("date",) ADMIN_SECTION = _("GDPR") @property def activity_lbl(self): if self.activity not in GDPR_ACTIVITY_DICT: return str(_("Unknown activity :")) + str(self.activity) return GDPR_ACTIVITY_DICT[str(self.activity)] @property def persons_lbl(self): return " ; ".join( self.persons.through.objects.filter( gdprlog_id=self.pk ).values_list("gdprperson__raw_name", flat=True) ) def __str__(self): return f"{self.user.username} - {self.date} - {self.activity_lbl}" @classmethod def create_log(cls, request, activity, person_query, slice_query=None): if not request.user: # log creation is for logged user should be a script, a bug or a hacker... user_id = None else: user_id = request.user.id client_ip, routable_ip = get_client_ip(request) cls._create_log(user_id, client_ip, routable_ip, activity, person_query, slice_query) @classmethod def _create_log(cls, user_id, ip, routable_ip, activity, person_query, slice_query=None): log = cls.objects.create(user_id=user_id, ip=ip, routable_ip=routable_ip, activity=activity) person_query = person_query.exclude(raw_name__isnull=True).exclude(raw_name="") start, end = None, None if slice_query: start, end = slice_query # create all missing GDPRPerson missing_gdpr_person_q = person_query.filter(gdpr_person=None) gdpr_persons_creation = [] values = missing_gdpr_person_q.values_list("id", "raw_name") if start is not None: values = values[start:end] for person_id, raw_name in values: gdpr_persons_creation.append(GDPRPerson(person_id=person_id, raw_name=raw_name)) GDPRPerson.objects.bulk_create(gdpr_persons_creation) # attach gdpr persons gdpr_persons = [] values = person_query.values_list("gdpr_person__pk", flat=True) if start is not None: values = values[start:end] for pk in values: if pk: gdpr_persons.append(cls.persons.through(gdprperson_id=pk, gdprlog_id=log.pk)) if gdpr_persons: cls.persons.through.objects.bulk_create(gdpr_persons) class ProfileType(GeneralType): groups = models.ManyToManyField(Group, verbose_name=_("Groups"), blank=True) class Meta: verbose_name = _("Profile type") verbose_name_plural = _("Profile types") ordering = ("label",) ADMIN_SECTION = _("Directory") post_save.connect(post_save_cache, sender=ProfileType) post_delete.connect(post_save_cache, sender=ProfileType) class ProfileTypeSummary(ProfileType): class Meta: proxy = True verbose_name = _("Profile type summary") verbose_name_plural = _("Profile types summary") ADMIN_SECTION = _("Directory") class UserProfile(models.Model): name = models.CharField(_("Name"), blank=True, default="", max_length=100) profile_type = models.ForeignKey( ProfileType, verbose_name=_("Profile type"), on_delete=models.PROTECT ) areas = models.ManyToManyField( "Area", verbose_name=_("Areas"), blank=True, related_name="profiles" ) external_sources = models.ManyToManyField( "ApiExternalSource", blank=True, related_name="profiles" ) current = models.BooleanField(_("Current profile"), default=False) show_field_number = models.BooleanField(_("Show field number"), default=False) auto_pin = models.BooleanField(_("Automatically pin"), default=False) display_pin_menu = models.BooleanField(_("Display pin menu"), default=False) background_tasks = models.BooleanField( _("Use background tasks"), default=True, help_text=_("If set to true, each import, export, document generation is set " "as a background task.") ) background_tasks_send_email = models.BooleanField( _("Send email when the task is finished"), default=True, ) person = models.ForeignKey( Person, verbose_name=_("Person"), related_name="profiles", on_delete=models.CASCADE, ) class Meta: verbose_name = _("User profile") verbose_name_plural = _("User profiles") unique_together = (("name", "profile_type", "person"),) ADMIN_SECTION = _("Directory") def __str__(self): lbl = self.name or str(self.profile_type) if not self.areas.count(): return lbl return "{} ({})".format(lbl, ", ".join(str(area) for area in self.areas.all())) @property def query_towns(self): return Town.objects.filter( Q(areas__profiles=self) | Q(areas__parent__profiles=self) | Q(areas__parent__parent__profiles=self) | Q(areas__parent__parent__parent__profiles=self) | Q(areas__parent__parent__parent__parent__profiles=self) ) @property def area_labels(self): return ", ".join(str(area) for area in self.areas.all()) def duplicate(self, **kwargs): areas = [area for area in self.areas.all()] external_sources = [external_source for external_source in self.external_sources.all()] new_item = self new_item.pk = None name = self.name for key in kwargs: if key == "name": name = kwargs[key] setattr(new_item, key, kwargs[key]) while UserProfile.objects.filter( name=name, profile_type=self.profile_type, person=self.person ).count(): name += str(_(" - duplicate")) new_item.name = name new_item.save() for area in areas: new_item.areas.add(area) for src in external_sources: new_item.external_sources.add(src) return new_item def save( self, force_insert=False, force_update=False, using=None, update_fields=None, ): super(UserProfile, self).save( force_insert=force_insert, force_update=force_update, using=using, update_fields=update_fields, ) # only one current profile per user if not self.current: return q = UserProfile.objects.filter(person=self.person, current=True).exclude( pk=self.pk ) if not q.count(): return for p in q.all(): p.current = False p.save() def post_save_userprofile(sender, **kwargs): if not kwargs.get("instance"): return instance = kwargs.get("instance") try: instance.person.ishtaruser.show_field_number(update=True) except IshtarUser.DoesNotExist: return post_save.connect(post_save_userprofile, sender=UserProfile) TASK_STATE = ( ("S", _("Scheduled")), ("P", _("In progress")), ("F", _("Finished")), ) BACKGROUND_TASKS = { # key: (label, function) "generate-labels": ( _("Generate label"), "ishtar_common.views.generate_label_view" ), } BACKGROUND_TASKS_LIST = [ (k, BACKGROUND_TASKS[k][0]) for k in BACKGROUND_TASKS ] class BackgroundTask(models.Model): name = models.TextField(_("Name")) view = models.CharField(_("View"), max_length=100, choices=BACKGROUND_TASKS_LIST) args = ArrayField(models.TextField(), verbose_name=_("Arguments"), blank=True, null=True) profile = models.ForeignKey(UserProfile, on_delete=models.CASCADE) state = models.CharField( _("State"), max_length=2, choices=TASK_STATE, default="S" ) creation_date = models.DateTimeField(default=datetime.datetime.now) launch_date = models.DateTimeField(null=True, blank=True) finished_date = models.DateTimeField(null=True, blank=True) result = models.FileField(_("Result"), null=True, blank=True) class Meta: verbose_name = _("Background task") verbose_name_plural = _("Background tasks") ordering = ["creation_date"] class IshtarUser(FullSearch): SLUG = "ishtaruser" TABLE_COLS = ( "username", "person__name", "person__surname", "person__email", "person__person_types_list", "person__attached_to__name", ) BASE_SEARCH_VECTORS = [ SearchVectorConfig("user_ptr__username"), SearchVectorConfig("person__name"), SearchVectorConfig("person__surname"), SearchVectorConfig("person__email"), SearchVectorConfig("person__town"), SearchVectorConfig("person__attached_to__name"), ] CACHED_LABELS = [] # needed to force search vector update # search parameters EXTRA_REQUEST_KEYS = {"person__person_types_list": "person__person_types__label"} COL_LABELS = { "person__attached_to__name": _("Organization"), "username": _("Username"), } # alternative names of fields for searches ALT_NAMES = { "username": SearchAltName( pgettext_lazy("key for text search", "username"), "user_ptr__username__iexact", ), "name": SearchAltName( pgettext_lazy("key for text search", "name"), "person__name__iexact" ), "surname": SearchAltName( pgettext_lazy("key for text search", "surname"), "person__surname__iexact", ), "email": SearchAltName( pgettext_lazy("key for text search", "email"), "person__email__iexact", ), "person_types": SearchAltName( pgettext_lazy("key for text search", "type"), "person__person_types__label__iexact", ), "attached_to": SearchAltName( pgettext_lazy("key for text search", "organization"), "person__attached_to__cached_label__iexact", ), } # fields user_ptr = models.OneToOneField( User, primary_key=True, related_name="ishtaruser", on_delete=models.CASCADE ) person = models.OneToOneField( Person, verbose_name=_("Person"), related_name="ishtaruser", on_delete=models.CASCADE, ) password_last_update = models.DateField( _("Password last update"), default=datetime.date.today ) advanced_shortcut_menu = models.BooleanField( _("Advanced shortcut menu"), default=False ) # latest news read by the user latest_news_version = models.CharField(_("Latest news version"), default="", blank=True, max_length=20) display_news = models.BooleanField(_("Display news"), default=True) class Meta: verbose_name = _("Ishtar user") verbose_name_plural = _("Ishtar users") ordering = ("person",) ADMIN_SECTION = _("Directory") def __str__(self): return str(self.person) @classmethod def class_verbose_name(cls): return cls._meta.verbose_name @property def is_superuser(self) -> bool: return self.user_ptr.is_superuser def show_field_number(self, update=False): cache_key, value = get_cache(self.__class__, ["show_field_number"]) if not update and value is not None: return value value = False if self.current_profile: value = self.current_profile.show_field_number cache.set(cache_key, value, settings.CACHE_TIMEOUT) return value @property def current_profile_name(self): q = UserProfile.objects.filter(current=True, person__ishtaruser=self) if q.count(): vals = q.values("profile_type__label", "name").all()[0] return vals["name"] or vals["profile_type__label"] profile = self.person.current_profile if not profile: return "" return str(profile) @property def current_profile(self): return self.person.current_profile @classmethod def set_superuser(cls, user): q = cls.objects.filter(user_ptr=user) if not q.count(): return ishtaruser = q.all()[0] person = ishtaruser.person admin, created = ProfileType.objects.get_or_create(txt_idx="administrator") if user.is_superuser: if UserProfile.objects.filter(profile_type=admin, person=person).count(): return UserProfile.objects.get_or_create( profile_type=admin, person=person, defaults={"current": True} ) @classmethod def create_from_user(cls, user): default = user.username surname = user.first_name or default name = user.last_name or default email = user.email person = Person.objects.create( surname=surname, name=name, email=email, history_modifier=user ) return cls.objects.create(user_ptr=user, person=person) def has_right(self, right_name, session=None): return self.person.has_right(right_name, session=session) def has_perm(self, perm, model=None, session=None, obj=None): return self.person.has_right(perm, session=session, obj=None) def full_label(self): return self.person.full_label() @post_importer_action def import_set_password(self, context, value): self.user_ptr.set_password(value) self.user_ptr.save() self.password_last_update = datetime.date.today() self.save() @post_importer_action def import_create_profile(self, context, value): UserProfile.objects.get_or_create(person=self.person, profile_type=value, defaults={"name": value.label}) post_save.connect(cached_label_changed, sender=IshtarUser) class Basket(FullSearch, OwnPerms, ValueGetter, TemplateItem): """ Abstract class for a basket Subclass must be defined with an "items" ManyToManyField """ IS_BASKET = True uuid = models.UUIDField(default=uuid.uuid4) label = models.CharField(_("Label"), max_length=1000) comment = models.TextField(_("Comment"), blank=True, default="") slug = models.SlugField(_("Slug"), blank=True, null=True) public = models.BooleanField(_("Public"), default=False) user = models.ForeignKey( IshtarUser, blank=True, null=True, related_name="%(class)ss", on_delete=models.SET_NULL, verbose_name=_("Owner"), ) available = models.BooleanField(_("Available"), default=True) shared_with = models.ManyToManyField( IshtarUser, verbose_name=_("Shared (read) with"), blank=True, related_name="shared_%(class)ss", ) shared_write_with = models.ManyToManyField( IshtarUser, verbose_name=_("Shared (read/edit) with"), blank=True, related_name="shared_write_%(class)ss", ) objects = UUIDModelManager() TABLE_COLS = ["label", "user"] BASE_SEARCH_VECTORS = [ SearchVectorConfig("label"), SearchVectorConfig("comment", "local"), ] PARENT_SEARCH_VECTORS = ["user"] # M2M_SEARCH_VECTORS = [SearchVectorConfig('items')] CACHED_LABELS = [] # needed to force search vector update class Meta: abstract = True ordering = ("label",) unique_together = (("label", "user"),) def __str__(self): return self.label def natural_key(self): return (self.uuid,) @classmethod def BASE_REQUEST(cls, request): if not request.user or not getattr(request.user, "ishtaruser", None): return Q(pk=None) ishtaruser = request.user.ishtaruser return ( Q(user=ishtaruser) | Q(shared_with=ishtaruser) | Q(shared_write_with=ishtaruser) ) @property def cached_label(self): return str(self) @property def full_label(self): return "{} - {} ({})".format(self.label, self.user, self.items.count()) @classmethod def get_short_menu_class(cls, pk): return "basket" @property def associated_filename(self): return "{}-{}".format( datetime.date.today().strftime("%Y-%m-%d"), slugify(self.label) ) @classmethod def get_query_owns(cls, ishtaruser): return ( Q(user=ishtaruser) | Q(shared_with=ishtaruser) | Q(shared_write_with=ishtaruser) ) @classmethod def get_write_query_owns(cls, ishtaruser): return Q(user=ishtaruser) | Q(shared_write_with=ishtaruser) def duplicate(self, label=None, ishtaruser=None): """ Duplicate the basket. Items in basket are copied but not shared users :param label: if provided use the name :param ishtaruser: if provided an alternate user is used :return: the new basket """ through = self.items.through basket_pk = "{}_id".format(self.SLUG) item_pk = "{}_id".format(self.items.model.SLUG) q = through.objects.filter(**{basket_pk: self.pk}) items = [r[item_pk] for r in q.values("pk", item_pk).order_by("pk").all()] new_item = self new_item.pk = None if ishtaruser: new_item.user = ishtaruser if not label: label = new_item.label while self.__class__.objects.filter(label=label, user=new_item.user).count(): label += str(_(" - duplicate")) new_item.label = label new_item.save() for item in items: through.objects.create(**{basket_pk: new_item.pk, item_pk: item}) return new_item class AuthorType(GeneralType): order = models.IntegerField(_("Order"), default=1) class Meta: verbose_name = _("Author type") verbose_name_plural = _("Author types") ordering = ["order", "label"] ADMIN_SECTION = _("Directory") post_save.connect(post_save_cache, sender=AuthorType) post_delete.connect(post_save_cache, sender=AuthorType) class Author(FullSearch): SLUG = "author" PARENT_SEARCH_VECTORS = ["person"] uuid = models.UUIDField(default=uuid.uuid4) person = models.ForeignKey( Person, verbose_name=_("Person"), related_name="author", on_delete=models.CASCADE, ) author_type = models.ForeignKey( AuthorType, verbose_name=_("Author type"), on_delete=models.PROTECT ) cached_label = models.TextField( _("Cached name"), blank=True, default="", db_index=True ) objects = UUIDModelManager() class Meta: verbose_name = _("Author") verbose_name_plural = _("Authors") ordering = ("author_type__order", "person__name") permissions = ( ("view_own_author", "Can view own Author"), ("add_own_author", "Can add own Author"), ("change_own_author", "Can change own Author"), ("delete_own_author", "Can delete own Author"), ) ADMIN_SECTION = _("Directory") def __str__(self): return self.cached_label or "" def natural_key(self): return (self.uuid,) def _generate_cached_label(self): return str(self.person) + settings.JOINT + str(self.author_type) def fancy_str(self): return self.person.fancy_str() + settings.JOINT + str(self.author_type) def related_sources(self): return ( list(self.treatmentsource_related.all()) + list(self.operationsource_related.all()) + list(self.findsource_related.all()) + list(self.contextrecordsource_related.all()) ) def public_representation(self): return {"type": str(self.author_type), "person": str(self.person)} def merge(self, item, keep_old=False): merge_model_objects(self, item, keep_old=keep_old) def author_post_save(sender, **kwargs): if not kwargs.get("instance"): return cached_label_changed(sender, **kwargs) instance = kwargs.get("instance") q = Author.objects.filter(person=instance.person, author_type=instance.author_type) if q.count() <= 1: return authors = list(q.all()) for author in authors[1:]: authors[0].merge(author) post_save.connect(author_post_save, sender=Author) class SourceType(HierarchicalType): coins_type = models.CharField( _("COInS export - type"), default="document", max_length=100 ) coins_genre = models.CharField( _("COInS export - genre"), blank=True, default="", max_length=100 ) is_localized = models.BooleanField( _("Is localized"), default=False, help_text=_("Setting a language for this type of document is relevant"), ) code = models.CharField(_("Code"), blank=True, default="", max_length=100) class Meta: verbose_name = _("Document type") verbose_name_plural = _("Document types") ordering = ["label"] ADMIN_SECTION = _("Documents") post_save.connect(post_save_cache, sender=SourceType) post_delete.connect(post_save_cache, sender=SourceType) class SupportType(GeneralType): document_types = models.ManyToManyField( "SourceType", blank=True, related_name="supports", help_text=_("Only available for these document types"), ) class Meta: verbose_name = _("Support type") verbose_name_plural = _("Support types") ADMIN_SECTION = _("Documents") post_save.connect(post_save_cache, sender=SupportType) post_delete.connect(post_save_cache, sender=SupportType) class Format(GeneralType): iframe_template = models.TextField( _("Iframe template"), blank=True, default="", help_text=_( "Template to insert an iframe for this format. Use django " "template with a {{document}} variable matching the " "current document." ), ) document_types = models.ManyToManyField( "SourceType", blank=True, related_name="formats", help_text=_("Only available for these document types"), ) class Meta: verbose_name = _("Format type") verbose_name_plural = _("Format types") ordering = ["label"] ADMIN_SECTION = _("Documents") post_save.connect(post_save_cache, sender=Format) post_delete.connect(post_save_cache, sender=Format) class LicenseType(OrderedHierarchicalType): url = models.URLField(_("URL"), blank=True, null=True) class Meta: verbose_name = _("License type") verbose_name_plural = _("License types") ordering = ("parent__label", "order", "label",) ADMIN_SECTION = _("Documents") post_save.connect(post_save_cache, sender=LicenseType) post_delete.connect(post_save_cache, sender=LicenseType) class DocumentTag(GeneralType): SLUG = "documenttag" class Meta: verbose_name = _("Document tag") verbose_name_plural = _("Document tags") ordering = ("label",) ADMIN_SECTION = _("Documents") post_save.connect(post_save_cache, sender=DocumentTag) post_delete.connect(post_save_cache, sender=DocumentTag) class ShootingAngle(OrderedHierarchicalType): class Meta: verbose_name = _("Shooting angle type") verbose_name_plural = _("Shooting angle types") ordering = ("order", "label",) ADMIN_SECTION = _("Documents") post_save.connect(post_save_cache, sender=ShootingAngle) post_delete.connect(post_save_cache, sender=ShootingAngle) class Document( BaseHistorizedItem, CompleteIdentifierItem, OwnPerms, ImageModel, ValueGetter, MainItem, ): APP = "ishtar-common" MODEL = "document" EXTERNAL_ID_KEY = "document_external_id" DELETE_URL = "delete-document" # order is important: put the image in the first match found # other will be symbolic links RELATED_MODELS = [ "treatment_files", "treatments", "finds", "context_records", "operations", "sites", "warehouses", "containers", "files", "administrativeacts", "towns", "areas", ] # same fields but in order for forms RELATED_MODELS_ALT = [ "finds", "context_records", "operations", "sites", "files", "administrativeacts", "warehouses", "containers", "treatments", "treatment_files", "towns", "areas", ] SLUG = "document" LINK_SPLIT = "<||>" GET_VALUES_EXCLUDE_FIELDS = ValueGetter.GET_VALUES_EXCLUDE_FIELDS + [ "warehouses", "operations", "treatments", "files", "treatment_files", "administrativeacts", "id", "associated_links", "source_type_id", "history_creator_id", "containers", "sites", "towns", "areas", "main_image_towns", "main_image_areas", "main_image_warehouses", "main_image_operations", "main_image_treatments", "main_image_files", "main_image_treatment_files", "main_image_id", "main_image_associated_links", "main_image_source_type_id", "main_image_history_creator_id", "main_image_containers", "main_image_sites", ] _TABLE_COLS = [ "title", "source_type__label", "cache_related_label", "cache_authors", "associated_url", ] NEW_QUERY_ENGINE = True COL_LINK = ["associated_url"] BASE_SEARCH_VECTORS = [ SearchVectorConfig("title"), SearchVectorConfig("source_type__label"), SearchVectorConfig("external_id", "raw"), SearchVectorConfig("reference", "raw"), SearchVectorConfig("internal_reference", "raw"), SearchVectorConfig("description", "local"), SearchVectorConfig("comment", "local"), SearchVectorConfig("additional_information", "local"), ] BASE_SEARCH_VECTORS += [ SearchVectorConfig("treatment_files__name"), SearchVectorConfig("treatments__cached_label"), SearchVectorConfig("finds__cached_label"), SearchVectorConfig("context_records__cached_label"), SearchVectorConfig("operations__cached_label"), SearchVectorConfig("sites__cached_label"), SearchVectorConfig("warehouses__name"), SearchVectorConfig("containers__cached_label"), SearchVectorConfig("files__cached_label"), SearchVectorConfig("towns__name"), SearchVectorConfig("areas__label"), SearchVectorConfig("shooting_angle__label"), ] PARENT_SEARCH_VECTORS = [ "authors", ] M2M_SEARCH_VECTORS = [ SearchVectorConfig("tags__label"), ] BOOL_FIELDS = BaseHistorizedItem.BOOL_FIELDS + ["duplicate"] NUMBER_FIELDS = ["operations__year"] COL_LABELS = { "authors__cached_label": _("Authors"), "complete_identifier": _("Identifier"), "source_type__label": _("Type"), } CACHED_LABELS = ["cache_related_label", "cache_authors"] CACHED_COMPLETE_ID = "" EXTRA_REQUEST_KEYS = { "operations": "operations__pk", "context_records": "context_records__pk", "context_records__operation": "context_records__operation__pk", "finds": "finds__pk", "finds__base_finds__context_record": "finds__base_finds__context_record__pk", "finds__base_finds__context_record__operation": "finds__base_finds__context_record__operation__pk", "authors__cached_label": "authors__cached_label", "complete_identifier": "complete_identifier", "authors__person__pk": "authors__person__pk", "container_id": "container_id", "publisher__pk": "publisher__pk", "source_type__label": "source_type__label", } # alternative names of fields for searches ALT_NAMES = { "authors": SearchAltName( pgettext_lazy("key for text search", "author"), "authors__cached_label__iexact", related_name="authors" ), "publisher": SearchAltName( pgettext_lazy("key for text search", "publisher"), "publisher__name__iexact", related_name="publisher" ), "publishing_year": SearchAltName( pgettext_lazy("key for text search", "publishing-year"), "publishing_year", ), "title": SearchAltName( pgettext_lazy("key for text search", "title"), "title__iexact" ), "source_type": SearchAltName( pgettext_lazy("key for text search", "type"), "source_type__label__iexact", ), "reference": SearchAltName( pgettext_lazy("key for text search", "reference"), "reference__iexact", ), "internal_reference": SearchAltName( pgettext_lazy("key for text search", "internal-reference"), "internal_reference__iexact", ), "description": SearchAltName( pgettext_lazy("key for text search", "description"), "description__iexact", ), "tag": SearchAltName( pgettext_lazy("key for text search", "tag"), "tags__label__iexact", related_name="tags" ), "format": SearchAltName( pgettext_lazy("key for text search", "format"), "format_type__label__iexact", related_name="format_type" ), "support": SearchAltName( pgettext_lazy("key for text search", "medium"), "support_type__label__iexact", ), "language": SearchAltName( pgettext_lazy("key for text search", "language"), "language__label__iexact", ), "licenses": SearchAltName( pgettext_lazy("key for text search", "license"), "licenses__label__iexact", related_name="licenses" ), "rights_owner": SearchAltName( pgettext_lazy("key for text search", "rights-owner"), "rights_owner__name__iexact", ), "copyright": SearchAltName( pgettext_lazy("key for text search", "copyright"), "copyright__iexact" ), "scale": SearchAltName( pgettext_lazy("key for text search", "scale"), "scale__iexact" ), "associated_url": SearchAltName( pgettext_lazy("key for text search", "url"), "associated_url__iexact", related_name="associated_url" ), "isbn": SearchAltName( pgettext_lazy("key for text search", "isbn"), "isbn__iexact" ), "issn": SearchAltName( pgettext_lazy("key for text search", "issn"), "issn__iexact" ), "source": SearchAltName( pgettext_lazy("key for text search", "source"), "source__title__iexact", ), "source_free_input": SearchAltName( pgettext_lazy("key for text search", "source-free-input"), "source_free_input__iexact", ), "warehouse_container": SearchAltName( pgettext_lazy("key for text search", "warehouse-container"), "wcontainer_id", ), "warehouse_container_ref": SearchAltName( pgettext_lazy("key for text search", "warehouse-container-reference"), "wcontainer_ref_id", ), "comment": SearchAltName( pgettext_lazy("key for text search", "comment"), "comment__iexact" ), "additional_information": SearchAltName( pgettext_lazy("key for text search", "additional-information"), "additional_information__iexact", ), "duplicate": SearchAltName( pgettext_lazy("key for text search", "has-duplicate"), "duplicate" ), "operation": SearchAltName( pgettext_lazy("key for text search", "operation"), "operations__cached_label__iexact", related_name="operations" ), "operations__operation_type": SearchAltName( pgettext_lazy("key for text search", "operation-type"), "operations__operation_type__label__iexact", ), "operations__year": SearchAltName( pgettext_lazy("key for text search", "operation-year"), "operations__year", ), "context_record": SearchAltName( pgettext_lazy("key for text search", "context-record"), "context_records__cached_label__iexact", related_name="context_records" ), "find_basket": SearchAltName( pgettext_lazy("key for text search", "basket-finds"), "finds__basket__label__iexact", related_name="finds__basket" ), "find": SearchAltName( pgettext_lazy("key for text search", "find"), "finds__cached_label__iexact", related_name="finds" ), "find__denomination": SearchAltName( pgettext_lazy("key for text search", "find-denomination"), "finds__denomination__iexact", ), "file": SearchAltName( pgettext_lazy("key for text search", "file"), "files__cached_label__iexact", ), "containers": SearchAltName( pgettext_lazy("key for text search", "container"), "containers__cached_label__iexact", related_name="containers" ), "site": SearchAltName( pgettext_lazy("key for text search", "site"), "sites__cached_label__iexact", ), "warehouse": SearchAltName( pgettext_lazy("key for text search", "warehouse"), "warehouses__name__iexact", ), "town": SearchAltName( pgettext_lazy("key for text search", "town"), "towns__name__iexact", related_name="towns" ), "area": SearchAltName( pgettext_lazy("key for text search", "area"), "areas__label__iexact", ), "image__isnull": SearchAltName( pgettext_lazy("key for text search", "has-image"), "image__isnull" ), "associated_file__isnull": SearchAltName( pgettext_lazy("key for text search", "has-file"), "associated_file__isnull", ), "receipt_date": SearchAltName( pgettext_lazy("key for text search", "receipt-date"), "receipt_date", ), "receipt_date_in_documentation": SearchAltName( pgettext_lazy( "key for text search", "receipt-in-documentation-date" ), "receipt_date_in_documentation", ), "creation_date": SearchAltName( pgettext_lazy("key for text search", "creation-date"), "creation_date", ), "shooting_angle": SearchAltName( pgettext_lazy("key for text search", "shooting-angle"), "shooting_angle__label__iexact", ), } ALT_NAMES.update(BaseHistorizedItem.ALT_NAMES) # search parameters REVERSED_BOOL_FIELDS = ["image__isnull", "associated_file__isnull"] DATED_FIELDS = BaseHistorizedItem.DATED_FIELDS + [ "receipt_date__lte", "receipt_date__gte", "receipt_date_in_documentation__lte", "receipt_date_in_documentation__gte", "creation_date__lte", "creation_date__gte", ] objects = ExternalIdManager() RELATIVE_SESSION_NAMES = [ ("find", "finds__pk"), ("contextrecord", "context_records__pk"), ("operation", "operations__pk"), ("site", "sites__pk"), ("file", "files__pk"), ("warehouse", "warehouses__pk"), ("treatment", "treatments__pk"), ("treatmentfile", "treatment_files__pk"), ("administrativeact", "administrativeacts__pk"), ] UP_MODEL_QUERY = { "operation": ( pgettext_lazy("key for text search", "operation"), "cached_label", ), "contextrecord": ( pgettext_lazy("key for text search", "context-record"), "cached_label", ), "file": (pgettext_lazy("key for text search", "file"), "cached_label"), "find": (pgettext_lazy("key for text search", "find"), "cached_label"), "site": (pgettext_lazy("key for text search", "site"), "cached_label"), "warehouse": ( pgettext_lazy("key for text search", "warehouse"), "cached_label", ), "treatment": ( pgettext_lazy("key for text search", "treatment"), "cached_label", ), "treatmentfile": ( pgettext_lazy("key for text search", "treatment-file"), "cached_label", ), } QA_EDIT = QuickAction( url="document-qa-bulk-update", icon_class="fa fa-pencil", text=_("Bulk update"), target="many", rights=["change_document", "change_own_document"], ) QUICK_ACTIONS = [ QA_EDIT, QuickAction( url="document-qa-duplicate", icon_class="fa fa-clone", text=_("Duplicate"), target="one", rights=["change_document", "change_own_document"], ), QuickAction( url="document-qa-packaging", icon_class="fa fa-gift", text=_("Packaging"), target="many", rights=["change_document", "change_own_document"], module="warehouse", ), ] SERIALIZATION_FILES = ["image", "thumbnail", "associated_file"] SERIALIZE_PROPERTIES = ["external_id", "images_number"] title = models.TextField(_("Title"), blank=True, default="") associated_file = models.FileField( verbose_name=pgettext_lazy("pdf, odt, zip, ...", "Associated file"), upload_to=get_image_path, blank=True, null=True, max_length=255, help_text=max_size_help(), ) index = models.IntegerField(verbose_name=_("Index"), blank=True, null=True) external_id = models.TextField(_("External ID"), blank=True, default="") reference = models.TextField(_("Ref."), blank=True, default="") internal_reference = models.TextField(_("Internal ref."), blank=True, default="") source_type = models.ForeignKey( SourceType, verbose_name=_("Type"), on_delete=models.SET_NULL, null=True, blank=True, ) publisher = models.ForeignKey( Organization, verbose_name=_("Publisher"), blank=True, null=True, related_name="publish", on_delete=models.SET_NULL, ) publishing_year = models.PositiveIntegerField( _("Year of publication"), blank=True, null=True ) licenses = models.ManyToManyField( LicenseType, verbose_name=_("Rights of use / license"), blank=True ) rights_owner = models.ForeignKey( Organization, verbose_name=_("Rights owner"), blank=True, null=True, on_delete=models.SET_NULL, ) copyright = models.TextField(_("Copyright"), blank=True, default="") tags = models.ManyToManyField(DocumentTag, verbose_name=_("Tags"), blank=True) language = models.ForeignKey( Language, verbose_name=_("Language"), blank=True, null=True, on_delete=models.SET_NULL, ) issn = models.CharField(_("ISSN"), blank=True, null=True, max_length=10) isbn = models.CharField(_("ISBN"), blank=True, null=True, max_length=17) source = models.ForeignKey( "Document", verbose_name=_("Source"), blank=True, null=True, related_name="children", on_delete=models.SET_NULL, ) source_free_input = models.CharField( verbose_name=_("Source - free input"), blank=True, null=True, max_length=500, ) source_page_range = models.CharField( verbose_name=_("Source - page range"), blank=True, null=True, max_length=500, ) support_type = models.ForeignKey( SupportType, verbose_name=_("Medium"), on_delete=models.SET_NULL, blank=True, null=True, ) format_type = models.ForeignKey( Format, verbose_name=_("Format"), on_delete=models.SET_NULL, blank=True, null=True, ) scale = models.CharField(_("Scale"), max_length=30, null=True, blank=True) shooting_angle = models.ForeignKey( ShootingAngle, verbose_name=_("Shooting angle"), on_delete=models.SET_NULL, blank=True, null=True, ) authors = models.ManyToManyField( Author, verbose_name=_("Authors"), related_name="documents", blank=True ) authors_raw = models.CharField( verbose_name=_("Authors (raw)"), blank=True, null=True, max_length=250 ) associated_url = models.URLField( blank=True, null=True, max_length=1000, verbose_name=_("Numerical ressource (web address)"), ) receipt_date = models.DateField( blank=True, null=True, verbose_name=_("Receipt date") ) creation_date = models.DateField( blank=True, null=True, verbose_name=_("Creation date") ) receipt_date_in_documentation = models.DateField( blank=True, null=True, verbose_name=_("Receipt date in documentation") ) item_number = models.IntegerField(_("Number of items"), default=1) description = models.TextField(_("Description"), blank=True, default="") container_id = models.PositiveIntegerField( verbose_name=_("Container ID"), blank=True, null=True ) # container = models.ForeignKey("archaeological_warehouse.Container") container_ref_id = models.PositiveIntegerField( verbose_name=_("Container reference ID"), blank=True, null=True ) # container_ref = models.ForeignKey("archaeological_warehouse.Container") comment = models.TextField(_("Comment"), blank=True, default="") additional_information = models.TextField( _("Additional information"), blank=True, default="" ) duplicate = models.NullBooleanField(_("Has a duplicate"), blank=True, null=True) associated_links = models.TextField(_("Symbolic links"), blank=True, default="") cache_related_label = models.TextField( _("Related"), blank=True, default="", db_index=True, help_text=_("Cached value - do not edit"), ) cache_authors = models.TextField( _("Authors"), blank=True, default="", db_index=True, help_text=_("Cached value - do not edit"), ) history = HistoricalRecords(inherit=True) class Meta: verbose_name = _("Document") verbose_name_plural = _("Documents") ordering = ("title",) permissions = ( ("view_own_document", ugettext("Can view own Document")), ("add_own_document", ugettext("Can add own Document")), ("change_own_document", ugettext("Can change own Document")), ("delete_own_document", ugettext("Can delete own Document")), ) indexes = [ GinIndex(fields=["data"]), ] ADMIN_SECTION = _("Documents") def __str__(self): return self.title @classmethod def TABLE_COLS(cls): cols = cls._TABLE_COLS[:] profile = get_current_profile() if profile.document_complete_identifier: cols = ["complete_identifier"] + cols return cols @property def operation_codes(self): Operation = apps.get_model("archaeological_operations", "Operation") return "|".join( sorted( [ Operation.objects.get(pk=ope_id).code_patriarche for ope_id in self.get_related_operation_ids() ] ) ) @post_importer_action def set_main_image(self, __, value): """ Post importer action - set the imported image as a main image for associated "value" items """ for item in getattr(self, value).all(): item.main_image = self item.no_post_process() item.save() def get_related_operation_ids(self): operations = list(self.operations.values_list("id", flat=True).all()) operations += list( self.context_records.values_list("operation_id", flat=True).all() ) operations += list( self.finds.values_list( "base_finds__context_record__operation_id", flat=True ).all() ) return list(set(operations)) def get_index_operation_source_type_code(self): if not self.source_type or not self.source_type.code: return return self.get_index_operation( extra_filters={"source_type__code": self.source_type.code} ) def get_index_operation(self, extra_filters=None): operations = self.get_related_operation_ids() if len(operations) != 1: return current_operation = operations[0] q = Document.objects.exclude(pk=self.pk).filter( Q(operations__id=current_operation) | Q(context_records__operation_id=current_operation) | Q(finds__base_finds__context_record__operation_id=current_operation) ) if extra_filters: q = q.filter(**extra_filters) q = q.order_by("-custom_index") current_index = None for doc in q.all(): if not doc.custom_index: continue if len(doc.get_related_operation_ids()) != 1: continue current_index = doc.custom_index break # ordered by "-custom_index" so max current index is reached if not current_index: return 1 else: return current_index + 1 def natural_key(self): return (self.external_id,) def sheet_header(self): headers = [] if self.complete_identifier: headers.append(self.complete_identifier) if self.title: headers.append(self.title) return " - ".join(headers) @property def has_iframe(self): return self.format_type and self.format_type.iframe_template def get_iframe(self): if not self.has_iframe: return "" return Template(self.format_type.iframe_template).render( Context({"document": self}) ) @property def container(self): if not self.container_id: return Container = apps.get_model("archaeological_warehouse", "Container") try: return Container.objects.get(pk=self.container_id) except Container.DoesNotExist: return @property def container_ref(self): if not self.container_ref_id: return Container = apps.get_model("archaeological_warehouse", "Container") try: return Container.objects.get(pk=self.container_ref_id) except Container.DoesNotExist: return @property def pdf_attached(self): if not self.associated_file and ( not self.source or not self.source.associated_file ): return extra = "" if self.associated_file: url = self.associated_file.url else: url = self.source.associated_file.url if self.source_page_range: extra = "#page=" extra += self.source_page_range.split("-")[0].split(";")[0] if not url.lower().endswith(".pdf"): return return url + extra """ @property def code(self): if not self.index: return "{}-".format(self.operation.code_patriarche or '') return "{}-{:04d}".format(self.operation.code_patriarche or '', self.index) """ def duplicate_item(self, user=None, data=None): return duplicate_item(self, user, data) @property def source_type_html(self): source_types = [] source_type = self.source_type while source_type: source_types.append(str(source_type)) source_type = source_type.parent return " / ".join(reversed(source_types)) def public_representation(self): site = Site.objects.get_current() scheme = "https" if settings.ISHTAR_SECURE else "http" base_url = scheme + "://" + site.domain + "/" image = None if self.image: image = self.image.url if not image.startswith("http"): if not image.startswith("/"): image = "/" + image image = base_url + image thumbnail = None if self.thumbnail: thumbnail = self.thumbnail.url if not thumbnail.startswith("http"): if not thumbnail.startswith("/"): thumbnail = "/" + thumbnail thumbnail = base_url + thumbnail return { "title": self.title, "reference": self.reference, "type": self.source_type and str(self.source_type), "authors": [a.public_representation() for a in self.authors.all()], "image": image, "thumbnail": thumbnail, } def get_extra_actions(self, request): """ For sheet template """ # url, base_text, icon, extra_text, extra css class, is a quick action actions = super(Document, self).get_extra_actions(request) # is_locked = self.is_locked(request.user) can_edit_document = self.can_do(request, "change_document") if not can_edit_document: return actions actions += [ ( reverse("document-qa-duplicate", args=[self.pk]), _("Duplicate"), "fa fa-clone", "", "", True, ), ] if get_current_profile().warehouse: actions.append( ( reverse("document-qa-packaging", args=[self.pk]), _("Packaging"), "fa fa-gift", "", "", True, ) ) can_create_document = self.can_do(request, "add_document") if can_create_document: actions += [ ( reverse("create-document") + f"?source_pk={self.pk}", _("Add sub-document"), "fa fa-plus", _("sub-document"), "", False, ) ] return actions @property def thumbnail_path(self): if not self.thumbnail: return "" return self.thumbnail.path @property def image_path(self): if not self.image: return "" return self.image.path def get_extra_values(self, prefix="", no_values=False, filtr=None, **kwargs): values = {} if not filtr or prefix + "image_path" in filtr: values[prefix + "image_path"] = self.image_path if not filtr or prefix + "thumbnail_path" in filtr: values[prefix + "thumbnail_path"] = self.thumbnail_path return values @property def images_without_main_image(self): return [] @property def associated_file_name(self): if not self.associated_file: return "" return os.path.basename(self.associated_file.name) @property def images(self): # mimic a queryset pointing to himself return Document.objects.filter(pk=self.pk, image__isnull=False).exclude( image="" ) @property def images_number(self): return self.images.count() @property def main_image(self): if self.images.count(): return self.images.all()[0] @property def has_related(self): return any(getattr(self, rel).count() for rel in self.RELATED_MODELS) @classmethod def get_query_owns(cls, ishtaruser): query_own_list = [] for rel_model in cls.RELATED_MODELS: klass = getattr(cls, rel_model).remote_field.related_model q_own_dct = klass._get_query_owns_dicts(ishtaruser) if q_own_dct: query_own_list.append((rel_model + "__", q_own_dct)) q = None for prefix, owns in query_own_list: subq = cls._construct_query_own(prefix, owns) if subq: if not q: q = subq else: q |= subq q |= cls._construct_query_own("", [{"history_creator": ishtaruser.user_ptr}]) return q def get_associated_operation(self): raise NotImplementedError() @property def associated_filename(self): values = [ str(getattr(self, attr)) for attr in ("source_type", "title") if getattr(self, attr) ] return slugify("-".join(values)) def _get_base_image_paths(self): if self.pk: # m2m not available if not created... for related_model in self.RELATED_MODELS: q = getattr(self, related_model).all() if q.count(): item = q.all()[0] yield item._get_base_image_path() def _get_base_image_path(self): for path in self._get_base_image_paths(): if path: return path n = datetime.datetime.now() return "upload/{}/{:02d}/{:02d}".format(n.year, n.month, n.day) def _get_available_filename(self, path, test_link=None): """ Get a filename not used If name already used - generate a name with schema: [base]-[current_number + 1].[suffix] :param path: base path :param test_link: test if an existing path match with this link :return: if test_link is not None, (new_path, link_match) otherwise the new_path """ file_split = path.split(".") suffix, base = "", "" if len(file_split) > 1: base = ".".join(file_split[0:-1]) suffix = file_split[-1] else: base = path base_split = base.split("-") current_nb = 0 if len(base_split) > 1: try: current_nb = int(base_split[-1]) base = "-".join(base_split[0:-1]) + "-" except ValueError: pass while os.path.exists(path): if test_link and os.path.islink(path) and os.readlink(path) == test_link: return path, True current_nb += 1 path = "{}-{}.{}".format(base, current_nb, suffix) if test_link: return path, False return path def _move_image(self): """ Move to the relevant path and create appropriate symbolic links :return: list of associated links """ if getattr(self, "_no_move", False): return reference_path = self.image.path filename = os.path.basename(reference_path) links = [] for related_model in self.RELATED_MODELS: q = getattr(self, related_model).all() if q.count(): item = q.all()[0] base_path = item._get_base_image_path() new_path = base_path + "/" + filename # create a link new_path = settings.MEDIA_ROOT + new_path if not os.path.exists(os.path.dirname(new_path)): os.makedirs(os.path.dirname(new_path)) new_path, match = self._get_available_filename( new_path, test_link=reference_path ) links.append(new_path) if match: # the current link is correct continue try: os.symlink(reference_path, new_path) except FileExistsError: pass return links def related_label(self): items = [] for rel_attr in reversed(self.RELATED_MODELS): for item in getattr(self, rel_attr).all(): items.append(str(item)) return " ; ".join(items) def _generate_cache_related_label(self): return self.related_label()[:1000] def _generate_cache_authors(self): return " ; ".join([author.cached_label for author in self.authors.all()]) @classmethod def get_next_index(cls): q = cls.objects.values("index").filter(index__isnull=False).order_by("-index") if not q.count(): return 1 cid = q.all()[0]["index"] if not cid: cid = 0 return cid + 1 @classmethod def get_import_defaults(cls): return {"index": cls.get_next_index()} def set_index(self): if self.index: return self.index = self.get_next_index() @classmethod @pre_importer_action def import_get_next_index(cls, context, value): context["index"] = cls.get_next_index() @property def dublin_core_identifier(self): identifier = None if self.isbn: identifier = self.isbn elif self.issn: identifier = self.issn elif self.associated_url: identifier = self.associated_url elif Site.objects.count(): identifier = "http://{}{}".format( Site.objects.all()[0].domain, self.get_absolute_url() ) return identifier def dublin_core_tags(self): if not self.title: return "" tags = [ ( "link", { "rel": "schema.DC", "href": "http://purl.org/dc/elements/1.1/", }, ), ( "link", {"rel": "schema.DCTERMS", "href": "http://purl.org/dc/terms/"}, ), ] title = {"name": "DC.title", "content": self.title} tags.append(("meta", title)) if self.creation_date: date = { "name": "DC.date", "scheme": "DCTERMS.W3CDTF", "content": self.creation_date.strftime("%Y-%m-%d"), } tags.append(("meta", date)) if self.tags.count(): content = ", ".join(str(t) for t in self.tags.all()) tg = {"name": "DC.subject", "content": content} tags.append(("meta", tg)) if self.description: tags.append( ( "meta", {"name": "DC.description", "content": self.description}, ) ) if self.publisher: tags.append( ( "meta", {"name": "DC.publisher", "content": self.publisher.name}, ) ) if self.authors.count(): content = ", ".join(str(t.person.raw_name) for t in self.authors.all()) tags.append(("meta", {"name": "DC.creator", "content": content})) if self.source_type: tags.append(("meta", {"name": "DC.type", "content": str(self.source_type)})) if self.format_type: tags.append( ( "meta", {"name": "DC.format", "content": str(self.format_type)}, ) ) identifier = self.dublin_core_identifier if identifier: tags.append(("meta", {"name": "DC.identifier", "content": identifier})) if self.language: lang = self.language.iso_code tags.append(("meta", {"name": "DC.language", "content": lang})) if self.licenses.count(): licences = ", ".join(str(l) for l in self.licenses.all()) tags.append(("meta", {"name": "DC.rights", "content": licences})) src = None if self.source: src = self.source.dublin_core_identifier if src: tags.append(("meta", {"name": "DC.relation", "content": src})) tags.append(("meta", {"name": "DC.source", "content": src})) elif self.source_free_input: tags.append( ( "meta", {"name": "DC.source", "content": self.source_free_input}, ) ) html = "" for tag, attrs in tags: et = ET.Element(tag, attrib=attrs) html += ET.tostring(et, encoding="unicode", method="html") return html @property def extra_meta(self): return self.dublin_core_tags() def coins_tag(self): if not self.title: return info = [ ("url_ver", "Z39.88-2004"), ("ctx_ver", "Z39.88-2004"), ("rft_val_fmt", "info:ofi/fmt:kev:mtx:dc"), ("rft.title", self.title), ("rft.btitle", self.title), ] if self.associated_url: info.append(("rft.identifier", self.associated_url)) elif Site.objects.count(): info.append( ( "rft.identifier", "http://{}{}".format( Site.objects.all()[0].domain, self.get_absolute_url() ), ) ) for author in self.authors.all(): person = author.person if not person.raw_name: continue if person.first_name and person.name: info.append(("rft.aulast", person.name)) info.append(("rft.aufirst", person.first_name)) info.append(("rft.au", "{}+{}".format(person.first_name, person.name))) else: info.append(("rft.au", person.raw_name)) if self.source_type: info.append(("rft.type", self.source_type.coins_type)) if self.source_type.coins_genre: info.append(("rft.genre", self.source_type.coins_genre)) if self.description: info.append(("rft.description", self.description)) if self.publisher: info.append(("rft.publisher", self.publisher.name)) if self.creation_date: if self.creation_date.day == 1 and self.creation_date.month == 1: info.append(("rft.date", self.creation_date.year)) else: info.append(("rft.date", self.creation_date.strftime("%Y-%m-%d"))) if self.source and self.source.title: info.append(("rft.source", self.source.title)) elif self.source_free_input: info.append(("rft.source", self.source_free_input)) if self.issn: info.append(("rft.issn", self.issn)) if self.isbn: info.append(("rft.isbn", self.isbn)) if self.licenses.count(): licenses = ";".join(str(l) for l in self.licenses.all()) info.append(("rft.rights", licenses)) if self.language: info.append(("rft.language", self.language.iso_code)) return ''.format(urlencode(info)) def save(self, *args, **kwargs): no_path_change = "no_path_change" in kwargs and kwargs.pop("no_path_change") self.set_index() if not self.associated_url: self.associated_url = None container = self.container if self.container_id and not container: self.container_id = None if container and container.pk != self.container_id: self.container_id = container.pk container_ref = self.container_ref if self.container_ref_id and not container_ref: self.container_ref_id = None if container_ref and container_ref.pk != self.container_ref_id: self.container_ref_id = container_ref.pk super(Document, self).save(*args, **kwargs) if ( self.image and not no_path_change and not getattr(self, "_no_path_change", False) ): links = self._move_image() if not links: return links = self.LINK_SPLIT.join(links) if links != self.associated_links: self.associated_links = links self.save(no_path_change=True) def document_changed(sender, **kwargs): cached_label_changed(sender, **kwargs) if not settings.PDFTOPPM_BINARY or not kwargs.get("instance"): return generate_pdf_preview(kwargs.get("instance")) post_save.connect(document_changed, sender=Document) class OperationType(GeneralType): order = models.IntegerField(_("Order"), default=1) preventive = models.BooleanField(_("Is preventive"), default=True) judiciary = models.BooleanField(_("Is judiciary"), default=False) class Meta: verbose_name = _("Operation type") verbose_name_plural = _("Operation types") ordering = ["judiciary", "-preventive", "order", "label"] @classmethod def get_types( cls, dct=None, instances=False, exclude=None, empty_first=True, default=None, initial=None, ): dct = dct or {} exclude = exclude or [] initial = initial or [] tuples = [] dct["available"] = True if not instances and empty_first and not default: tuples.append(("", "--")) if default and not instances: try: default = cls.objects.get(txt_idx=default) tuples.append((default.pk, _(str(default)))) except cls.DoesNotExist: pass items = cls.objects.filter(**dct) if default and not instances: exclude.append(default.txt_idx) if exclude: items = items.exclude(txt_idx__in=exclude) current_preventive, current_judiciary, current_lst = None, None, None item_list = list(items.order_by(*cls._meta.ordering).all()) new_vals = cls._get_initial_types( initial, [i.pk for i in item_list], instance=True ) item_list += new_vals for item in item_list: item.rank = 0 if instances: return item_list for item in item_list: if ( not current_lst or item.preventive != current_preventive or item.judiciary != current_judiciary ): if current_lst: tuples.append(current_lst) if item.judiciary: gp_lbl = _("Judiciary") elif item.preventive: gp_lbl = _("Preventive") else: gp_lbl = _("Research") current_lst = [gp_lbl, []] current_preventive = item.preventive current_judiciary = item.judiciary current_lst[1].append((item.pk, _(str(item)))) if current_lst: tuples.append(current_lst) return tuples @classmethod def is_preventive(cls, ope_type_id, key=""): try: op_type = cls.objects.get(pk=ope_type_id) except cls.DoesNotExist: return False if not key: return op_type.preventive return key == op_type.txt_idx @classmethod def is_judiciary(cls, ope_type_id): try: op_type = cls.objects.get(pk=ope_type_id) except cls.DoesNotExist: return False return op_type.judiciary post_save.connect(post_save_cache, sender=OperationType) post_delete.connect(post_save_cache, sender=OperationType) class AdministrationScript(models.Model): path = models.CharField(_("Filename"), max_length=30) name = models.TextField(_("Name"), blank=True, default="") class Meta: verbose_name = _("Administration script") verbose_name_plural = _("Administration scripts") ordering = ["name"] ADMIN_SECTION = _("General settings") def __str__(self): return str(self.name) SCRIPT_STATE = ( ("S", _("Scheduled")), ("P", _("In progress")), ("FE", _("Finished with errors")), ("F", _("Finished")), ) SCRIPT_STATE_DCT = dict(SCRIPT_STATE) class AdministrationTask(models.Model): script = models.ForeignKey(AdministrationScript, on_delete=models.CASCADE) state = models.CharField( _("State"), max_length=2, choices=SCRIPT_STATE, default="S" ) creation_date = models.DateTimeField(default=datetime.datetime.now) launch_date = models.DateTimeField(null=True, blank=True) finished_date = models.DateTimeField(null=True, blank=True) result = models.TextField(_("Result"), blank=True, default="") class Meta: verbose_name = _("Administration task") verbose_name_plural = _("Administration tasks") ordering = ["script"] ADMIN_SECTION = _("General settings") def __str__(self): state = _("Unknown") if self.state in SCRIPT_STATE_DCT: state = str(SCRIPT_STATE_DCT[self.state]) return "{} - {} - {}".format(self.script, self.creation_date, state) def execute(self): if self.state != "S": return self.launch_date = datetime.datetime.now() script_dir = settings.ISHTAR_SCRIPT_DIR if not script_dir: self.result = str( _( "ISHTAR_SCRIPT_DIR is not set in your " "local_settings. Contact your administrator." ) ) self.state = "FE" self.finished_date = datetime.datetime.now() self.save() return if ".." in script_dir: self.result = str( _( "Your ISHTAR_SCRIPT_DIR is containing " 'dots "..". As it can refer to relative ' "paths, it can be a security issue and this is " "not allowed. Only put a full path." ) ) self.state = "FE" self.finished_date = datetime.datetime.now() self.save() return if not os.path.isdir(script_dir): self.result = str( _('Your ISHTAR_SCRIPT_DIR: "{}" is not a valid directory.') ).format(script_dir) self.state = "FE" self.finished_date = datetime.datetime.now() self.save() return script_name = None # only script inside the script directory can be executed # script directory is not web available for name in os.listdir(script_dir): if name == self.script.path: if os.path.isfile(os.path.join(script_dir, name)): script_name = os.path.join(script_dir, name) break if not script_name: self.result = str( _( 'Script "{}" is not available in your script directory. ' "Check your configuration." ) ).format(self.script.path) self.state = "FE" self.finished_date = datetime.datetime.now() self.save() return self.state = "P" self.save() self.finished_date = datetime.datetime.now() try: # nosec: only script inside the script directory can be executed # this script directory is not web available session = Popen([script_name], stdout=PIPE, stderr=PIPE) # nosec stdout, stderr = session.communicate() except OSError as e: self.state = "FE" self.result = 'Error executing "{}" script: {}'.format(self.script.path, e) self.save() return self.finished_date = datetime.datetime.now() if stderr: self.state = "FE" self.result = "Error: {}".format(stderr.decode("utf-8")) else: self.state = "F" self.result = "{}".format(stdout.decode("utf-8")) self.save() ITEM_TYPES = ( ("O", _("Operation")), ("S", _("Archaeological site")), ("CR", _("Context record")), ("F", _("Find")), ("W", _("Warehouse")), ) EXPORT_STATE = (("C", _("Created")),) + SCRIPT_STATE class ExportTask(models.Model): filter_type = models.CharField( _("Filter on"), max_length=2, choices=ITEM_TYPES, null=True, blank=True ) filter_text = models.TextField( _("Filter query"), blank=True, default="", help_text=_("Textual query on this item (try it on the main " "interface)"), ) geo = models.BooleanField( _("Export geographic data"), default=True, help_text=_( "Geographic data can represent large volume of " "information. Geographic data can be excluded from the " "export" ), ) state = models.CharField( _("State"), max_length=2, choices=EXPORT_STATE, default="C" ) put_locks = models.BooleanField(_("Put locks on associated items"), default=False) lock_user = models.ForeignKey( User, related_name="+", on_delete=models.SET_NULL, verbose_name=_("Lock user"), blank=True, null=True, help_text=_( "Owner of the lock if item are locked. Warning: if no " "user is provided the locks can be remove by any user " "with the permission to edit." ), ) export_types = models.BooleanField(_("Export types"), default=True) export_conf = models.BooleanField(_("Export configuration"), default=True) export_importers = models.BooleanField(_("Export importers"), default=True) export_geo = models.BooleanField(_("Export towns, areas..."), default=True) export_dir = models.BooleanField(_("Export directory"), default=True) export_docs = models.BooleanField(_("Export documents"), default=True) export_items = models.BooleanField(_("Export main items"), default=True) creation_date = models.DateTimeField(default=datetime.datetime.now) launch_date = models.DateTimeField(null=True, blank=True) finished_date = models.DateTimeField(null=True, blank=True) result = models.FileField( _("Result"), null=True, blank=True, upload_to="exports/%Y/%m/" ) result_info = models.TextField(_("Result information"), blank=True, default="") class Meta: verbose_name = _("Archive - Export") verbose_name_plural = _("Archive - Exports") ordering = ["creation_date"] ADMIN_SECTION = pgettext_lazy("name", "Archive") def __str__(self): state = _("Unknown") if self.state in SCRIPT_STATE_DCT: state = str(SCRIPT_STATE_DCT[self.state]) return "Export - {} - {}".format(self.creation_date, state) @property def label(self): fltr = _("Whole database") if self.filter_type and self.filter_text: dct = dict(ITEM_TYPES) if self.filter_type in dct: fltr = '{} "{}"'.format(dct[self.filter_type], self.filter_text) return "{} - {}".format(fltr, self.creation_date) def clean(self): if (self.filter_text and not self.filter_type) or ( self.filter_type and not self.filter_text ): raise ValidationError( _("To filter filter type and filter text must be filled.") ) class ImportTask(models.Model): creation_date = models.DateTimeField(default=datetime.datetime.now) launch_date = models.DateTimeField(null=True, blank=True) finished_date = models.DateTimeField(null=True, blank=True) import_user = models.ForeignKey( User, related_name="+", on_delete=models.SET_NULL, verbose_name=_("Import user"), blank=True, null=True, help_text=_( 'If set the "Import user" will be the editor for last ' "version. If the field is left empty no history will be " "recorded." ), ) state = models.CharField( _("State"), max_length=2, choices=EXPORT_STATE, default="C" ) delete_before = models.BooleanField( _("Delete before adding"), default=False, help_text=_("Delete existing items before adding"), ) releasing_locks = models.BooleanField( _("Releasing locks on associated items"), default=False ) source = models.FileField(_("Source"), upload_to="imports/%Y/%m/") class Meta: verbose_name = _("Archive - Import") verbose_name_plural = _("Archive - Imports") ordering = ["creation_date"] ADMIN_SECTION = pgettext_lazy("name", "Archive") def __str__(self): state = _("Unknown") if self.state in SCRIPT_STATE_DCT: state = str(SCRIPT_STATE_DCT[self.state]) return "Import - {} - {}".format(self.creation_date, state)