#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2012-2017 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. from collections import OrderedDict import uuid from django.apps import apps from django.conf import settings from django.contrib.gis.db import models from django.contrib.gis.geos import Point from django.contrib.postgres.indexes import GinIndex from django.core.urlresolvers import reverse from django.db import connection from django.db.models import Q from django.db.models.signals import post_delete, post_save, m2m_changed from ishtar_common.utils import ugettext_lazy as _, pgettext_lazy, pgettext from django.utils.text import slugify from ishtar_common.utils import ( cached_label_changed, m2m_historization_changed, post_save_geo, task, ) from ishtar_common.models import ( Document, GeneralType, BaseHistorizedItem, OwnPerms, ShortMenuItem, GeneralRelationType, GeneralRecordRelations, post_delete_record_relation, post_save_cache, ValueGetter, BulkUpdatedItem, RelationItem, Town, get_current_profile, document_attached_changed, HistoryModel, SearchAltName, GeoItem, CompleteIdentifierItem, SearchVectorConfig, DocumentItem, MainItem, QuickAction, RelationsViews, ) from ishtar_common.models_common import HistoricalRecords from archaeological_operations.models import ( Operation, Period, Parcel, ArchaeologicalSite, ) from ishtar_common.model_managers import UUIDModelManager class DatingType(GeneralType): class Meta: verbose_name = _("Dating type") verbose_name_plural = _("Dating types") ordering = ("label",) post_save.connect(post_save_cache, sender=DatingType) post_delete.connect(post_save_cache, sender=DatingType) class DatingQuality(GeneralType): class Meta: verbose_name = _("Dating quality type") verbose_name_plural = _("Dating quality types") ordering = ("label",) post_save.connect(post_save_cache, sender=DatingQuality) post_delete.connect(post_save_cache, sender=DatingQuality) class Dating(models.Model): uuid = models.UUIDField(default=uuid.uuid4) period = models.ForeignKey(Period, verbose_name=_("Period")) start_date = models.IntegerField(_("Start date"), blank=True, null=True) end_date = models.IntegerField(_("End date"), blank=True, null=True) dating_type = models.ForeignKey( DatingType, verbose_name=_("Dating type"), on_delete=models.SET_NULL, blank=True, null=True, ) quality = models.ForeignKey( DatingQuality, verbose_name=_("Quality"), on_delete=models.SET_NULL, blank=True, null=True, ) precise_dating = models.TextField(_("Precise dating"), blank=True, default="") objects = UUIDModelManager() ASSOCIATED_ALT_NAMES = { "datings__period": SearchAltName( pgettext_lazy("key for text search", "datings-period"), "datings__period__label__iexact", ), "datings__precise_dating": SearchAltName( pgettext_lazy("key for text search", "datings-precise"), "datings__precise_dating__iexact", ), "datings__start_date__before": SearchAltName( pgettext_lazy("key for text search", "datings-start-before"), "datings__start_date__lte", ), "datings__start_date__after": SearchAltName( pgettext_lazy("key for text search", "datings-start-after"), "datings__start_date__gte", ), "datings__end_date__before": SearchAltName( pgettext_lazy("key for text search", "datings-end-before"), "datings__end_date__lte", ), "datings__end_date__after": SearchAltName( pgettext_lazy("key for text search", "datings-end-after"), "datings__end_date__gte", ), "datings__dating_type": SearchAltName( pgettext_lazy("key for text search", "datings-type"), "datings__dating_type__label__iexact", ), "datings__quality": SearchAltName( pgettext_lazy("key for text search", "datings-quality"), "datings__quality__label__iexact", ), } class Meta: verbose_name = _("Dating") verbose_name_plural = _("Datings") def __str__(self): if self.precise_dating and self.precise_dating.strip(): return self.precise_dating.strip() start_date = self.start_date and str(self.start_date) or "" end_date = self.end_date and str(self.end_date) or "" if not start_date and not end_date: return str(self.period) return "%s (%s-%s)" % (self.period, start_date, end_date) def natural_key(self): return (self.uuid,) def get_values(self, prefix="", no_values=False, filtr=None, **kwargs): values = {} if not filtr or prefix + "period" in filtr: values[prefix + "period"] = str(self.period) if not filtr or prefix + "start_date" in filtr: values[prefix + "start_date"] = self.start_date or "" if not filtr or prefix + "end_date" in filtr: values[prefix + "end_date"] = self.end_date or "" if not filtr or prefix + "dating_type" in filtr: values[prefix + "dating_type"] = ( str(self.dating_type) if self.dating_type else "" ) if not filtr or prefix + "quality" in filtr: values[prefix + "quality"] = str(self.quality) if self.quality else "" if not filtr or prefix + "precise_dating" in filtr: values[prefix + "precise_dating"] = self.precise_dating return values HISTORY_ATTR = [ "period", "start_date", "end_date", "dating_type", "quality", "precise_dating", ] def history_compress(self): values = {} for attr in self.HISTORY_ATTR: val = getattr(self, attr) if hasattr(val, "history_compress"): val = val.history_compress() elif hasattr(val, "isoformat"): val = val.isoformat() elif val is None: val = "" else: val = str(val) values[attr] = val return values @classmethod def history_decompress(cls, full_value, create=False): if not full_value: return [] full_res = [] for value in full_value: res = {} for key in value: val = value[key] if val == "" and key != "precise_dating": val = None elif key in ("period", "dating_type", "quality"): field = cls._meta.get_field(key) q = field.related_model.objects.filter(txt_idx=val) if q.count(): val = q.all()[0] else: # do not exist anymore in db val = None elif key in ("start_date", "end_date"): val = int(val) res[key] = val if create: res = cls.objects.create(**res) full_res.append(res) return full_res @classmethod def is_identical(cls, dating_1, dating_2): """ Compare two dating attribute by attribute and return True if all attribute is identical """ for attr in [ "period", "start_date", "end_date", "dating_type", "quality", "precise_dating", ]: value1 = getattr(dating_1, attr) value2 = getattr(dating_2, attr) if attr == "precise_dating": if value1: value1 = value1.strip() if value2: value2 = value2.strip() if value1 != value2: return False return True def context_records_lbl(self): return " - ".join(cr.cached_label for cr in self.context_records.all()) context_records_lbl.short_description = _("Context record") context_records_lbl.admin_order_field = "context_records__cached_label" def finds_lbl(self): return " - ".join(f.cached_label for f in self.find.all()) finds_lbl.short_description = _("Find") finds_lbl.admin_order_field = "find__cached_label" @classmethod def fix_dating_association(cls, obj): """ Fix redundant m2m dating association (usually after imports) """ current_datings = [] for dating in obj.datings.order_by("pk").all(): key = ( dating.period.pk, dating.start_date, dating.end_date, dating.dating_type, dating.quality, dating.precise_dating, ) if key not in current_datings: current_datings.append(key) continue dating.delete() class Unit(GeneralType): order = models.IntegerField(_("Order")) parent = models.ForeignKey( "Unit", verbose_name=_("Parent context record type"), on_delete=models.SET_NULL, blank=True, null=True, ) class Meta: verbose_name = _("Context record Type") verbose_name_plural = _("Context record Types") ordering = ("order", "label") def __str__(self): return self.label post_save.connect(post_save_cache, sender=Unit) post_delete.connect(post_save_cache, sender=Unit) class ActivityType(GeneralType): order = models.IntegerField(_("Order")) class Meta: verbose_name = _("Activity Type") verbose_name_plural = _("Activity Types") ordering = ("order",) def __str__(self): return self.label post_save.connect(post_save_cache, sender=ActivityType) post_delete.connect(post_save_cache, sender=ActivityType) class IdentificationType(GeneralType): order = models.IntegerField(_("Order")) class Meta: verbose_name = _("Identification Type") verbose_name_plural = _("Identification Types") ordering = ("order", "label") def __str__(self): return self.label post_save.connect(post_save_cache, sender=IdentificationType) post_delete.connect(post_save_cache, sender=IdentificationType) class ExcavationTechnicType(GeneralType): class Meta: verbose_name = _("Excavation technique type") verbose_name_plural = _("Excavation technique types") ordering = ("label",) post_save.connect(post_save_cache, sender=ExcavationTechnicType) post_delete.connect(post_save_cache, sender=ExcavationTechnicType) class DocumentationType(GeneralType): class Meta: verbose_name = _("Documentation type") verbose_name_plural = _("Documentation types") ordering = ("label",) post_save.connect(post_save_cache, sender=DocumentationType) post_delete.connect(post_save_cache, sender=DocumentationType) class CRBulkView(object): CREATE_SQL = """ CREATE VIEW context_records_cached_label_bulk_update AS ( SELECT cr.id AS id, ope.code_patriarche AS main_code, ope.year AS year, ope.operation_code AS ope_code, parcel.section AS section, parcel.parcel_number AS number, cr.label AS label FROM archaeological_context_records_contextrecord AS cr INNER JOIN archaeological_operations_operation ope ON ope.id = cr.operation_id INNER JOIN archaeological_operations_parcel parcel ON cr.parcel_id = parcel.id );""" DELETE_SQL = """ DROP VIEW IF EXISTS context_records_cached_label_bulk_update; """ class ContextRecord( BulkUpdatedItem, DocumentItem, BaseHistorizedItem, CompleteIdentifierItem, GeoItem, OwnPerms, ValueGetter, MainItem, RelationItem, ): SLUG = "contextrecord" APP = "archaeological-context-records" MODEL = "context-record" SHOW_URL = "show-contextrecord" DELETE_URL = "delete-contextrecord" EXTERNAL_ID_KEY = "context_record_external_id" EXTERNAL_ID_DEPENDENCIES = ["base_finds"] TABLE_COLS = [ "label", "operation__common_name", "town__name", "parcel__cached_label", "unit__label", ] if settings.COUNTRY == "fr": TABLE_COLS.insert(1, "operation__code_patriarche") TABLE_COLS_FOR_OPE = [ "label", "parcel__cached_label", "unit__label", "cached_periods", "description", ] NEW_QUERY_ENGINE = True COL_LABELS = { "cached_periods": _("Periods"), "datings__period__label": _("Periods"), "datings__period": _("Datings (period)"), "detailed_related_context_records": _("Related context records"), "cached_related_context_records": _("Related context records"), "operation__code_patriarche": _("Operation (Patriarche code)"), "operation__common_name": _("Operation (name)"), "parcel__external_id": _("Parcel (external ID)"), "town__name": _("Town"), "town": _("Town"), "parcel__year": _("Parcel (year)"), "section__parcel_number": _("Parcel"), "parcel__cached_label": _("Parcel"), "unit__label": _("Context record type"), } CONTEXTUAL_TABLE_COLS = { "full": {"related_context_records": "cached_related_context_records"} } # statistics STATISTIC_MODALITIES_OPTIONS = OrderedDict( [ ("unit__label", _("Context record type")), ("operation__cached_label", _("Operation")), ("datings__period__label", _("Period")), ("identification__label", _("Identification")), ("activity__label", _("Activity")), ("excavation_technic__label", _("Excavation technique")), ("documents__source_type__label", _("Associated document type")), ("last_modified__year", _("Modification (year)")), ] ) STATISTIC_MODALITIES = [key for key, lbl in STATISTIC_MODALITIES_OPTIONS.items()] # search parameters EXTRA_REQUEST_KEYS = { "town": "town__pk", "town__name": "town__name", "parcel__cached_label": "parcel__cached_label", "operation__year": "operation__year__contains", "year": "operation__year__contains", "operation__code_patriarche": "operation__code_patriarche", "operation__operation_code": "operation__operation_code", "operation__common_name": "operation__common_name", "datings__period": "datings__period__pk", "parcel_0": "operation__parcels__section", "parcel_1": "operation__parcels__parcel_number", "parcel_2": "operation__parcels__public_domain", "label": "label__icontains", "archaeological_sites": "operation__archaeological_sites__pk", "cached_label": "cached_label__icontains", "datings__period__label": "datings__period__label", "operation_id": "operation_id", "unit__label": "unit__label", } MANY_COUNTED_FIELDS = ["base_finds"] REVERSED_BOOL_FIELDS = [ "documents__image__isnull", "documents__associated_file__isnull", "documents__associated_url__isnull", ] RELATION_TYPES_PREFIX = { "ope_relation_types": "operation__", "cr_relation_types": "", } # alternative names of fields for searches ALT_NAMES = { "label": SearchAltName( pgettext_lazy("key for text search", "id"), "label__iexact" ), "town": SearchAltName( pgettext_lazy("key for text search", "town"), "town__cached_label__iexact" ), "operation__year": SearchAltName( pgettext_lazy("key for text search", "operation-year"), "operation__year" ), "operation__code_patriarche": SearchAltName( pgettext_lazy("key for text search", "patriarche"), "operation__code_patriarche__iexact", ), "operation__operation_code": SearchAltName( pgettext_lazy("key for text search", "operation-code"), "operation__operation_code", ), "operation__cached_label": SearchAltName( pgettext_lazy("key for text search", "operation"), "operation__cached_label__icontains", ), "archaeological_site": SearchAltName( pgettext_lazy("key for text search", "site"), "archaeological_site__cached_label__icontains", ), "ope_relation_types": SearchAltName( pgettext_lazy("key for text search", "operation-relation-type"), "ope_relation_types", ), "datings__period": SearchAltName( pgettext_lazy("key for text search", "period"), "datings__period__label__iexact", ), "unit": SearchAltName( pgettext_lazy("key for text search", "unit-type"), "unit__label__iexact" ), "parcel": SearchAltName( pgettext_lazy("key for text search", "parcel"), "parcel__cached_label__iexact", ), "has_finds": SearchAltName( pgettext_lazy("key for text search", "has-finds"), "base_finds" ), "cr_relation_types": SearchAltName( pgettext_lazy("key for text search", "record-relation-type"), "cr_relation_types", ), "excavation_technic": SearchAltName( pgettext_lazy("key for text search", "excavation-technique"), "excavation_technic__label__iexact", ), } ALT_NAMES.update(BaseHistorizedItem.ALT_NAMES) ALT_NAMES.update(DocumentItem.ALT_NAMES) ALT_NAMES.update(Dating.ASSOCIATED_ALT_NAMES) PARENT_ONLY_SEARCH_VECTORS = ["operation", "archaeological_site", "parcel"] BASE_SEARCH_VECTORS = [ SearchVectorConfig("cached_label"), SearchVectorConfig("label"), SearchVectorConfig("location"), SearchVectorConfig("town__name"), SearchVectorConfig("interpretation", "local"), SearchVectorConfig("filling", "local"), SearchVectorConfig("datings_comment", "local"), SearchVectorConfig("identification__label"), SearchVectorConfig("activity__label"), SearchVectorConfig("excavation_technic__label"), ] M2M_SEARCH_VECTORS = [SearchVectorConfig("datings__period__label", "local")] UP_MODEL_QUERY = { "operation": ( pgettext_lazy("key for text search", "operation"), "cached_label", ), "site": (pgettext_lazy("key for text search", "site"), "cached_label"), } MAIN_UP_MODEL_QUERY = "operation" RELATIVE_SESSION_NAMES = [ ("operation", "operation__pk"), ("site", "archaeological_site__pk"), ("file", "operation__associated_file__pk"), ] HISTORICAL_M2M = ["datings", "documentations"] CACHED_LABELS = ["cached_label", "cached_periods", "cached_related_context_records"] DOWN_MODEL_UPDATE = ["base_finds"] QA_LOCK = QuickAction( url="contextrecord-qa-lock", icon_class="fa fa-lock", text=_("Lock/Unlock"), target="many", rights=["change_contextrecord", "change_own_contextrecord"], ) QA_EDIT = QuickAction( url="contextrecord-qa-bulk-update", icon_class="fa fa-pencil", text=_("Bulk update"), target="many", rights=["change_contextrecord", "change_own_contextrecord"], ) QUICK_ACTIONS = [ QA_EDIT, QA_LOCK, QuickAction( url="contextrecord-qa-duplicate", icon_class="fa fa-clone", text=_("Duplicate"), target="one", rights=["change_contextrecord", "change_own_contextrecord"], ), ] history = HistoricalRecords(bases=[HistoryModel]) objects = UUIDModelManager() # fields uuid = models.UUIDField(default=uuid.uuid4) external_id = models.TextField(_("External ID"), blank=True, default="") auto_external_id = models.BooleanField( _("External ID is set automatically"), default=False ) parcel = models.ForeignKey( Parcel, verbose_name=_("Parcel"), related_name="context_record", on_delete=models.SET_NULL, blank=True, null=True, ) town = models.ForeignKey( Town, verbose_name=_("Town"), related_name="context_record", on_delete=models.SET_NULL, blank=True, null=True, ) operation = models.ForeignKey( Operation, verbose_name=_("Operation"), related_name="context_record" ) archaeological_site = models.ForeignKey( ArchaeologicalSite, verbose_name=_("Archaeological site"), on_delete=models.SET_NULL, blank=True, null=True, related_name="context_records", ) label = models.CharField(_("ID"), max_length=200) description = models.TextField(_("Description"), blank=True, default="") comment = models.TextField(_("General comment"), blank=True, default="") opening_date = models.DateField(_("Opening date"), blank=True, null=True) closing_date = models.DateField(_("Closing date"), blank=True, null=True) length = models.FloatField(_("Length (m)"), blank=True, null=True) width = models.FloatField(_("Width (m)"), blank=True, null=True) thickness = models.FloatField(_("Thickness (m)"), blank=True, null=True) diameter = models.FloatField(_("Diameter (m)"), blank=True, null=True) depth = models.FloatField(_("Depth (m)"), blank=True, null=True) depth_of_appearance = models.FloatField( _("Depth of appearance (m)"), blank=True, null=True ) surface = models.FloatField(_("Surface (m2)"), blank=True, null=True) location = models.TextField( _("Location"), blank=True, default="", help_text=_("A short description of the location of the context " "record"), ) datings = models.ManyToManyField(Dating, related_name="context_records") documentations = models.ManyToManyField(DocumentationType, blank=True) datings_comment = models.TextField(_("Comment on datings"), blank=True, default="") unit = models.ForeignKey( Unit, verbose_name=_("Context record type"), on_delete=models.SET_NULL, related_name="+", blank=True, null=True, ) filling = models.TextField(_("Filling"), blank=True, default="") interpretation = models.TextField(_("Interpretation"), blank=True, default="") taq = models.IntegerField( _("TAQ"), blank=True, null=True, help_text=_( '"Terminus Ante Quem" the context record can\'t have ' "been created after this date" ), ) taq_estimated = models.IntegerField( _("Estimated TAQ"), blank=True, null=True, help_text=_('Estimation of a "Terminus Ante Quem"'), ) tpq = models.IntegerField( _("TPQ"), blank=True, null=True, help_text=_( '"Terminus Post Quem" the context record can\'t have ' "been created before this date" ), ) tpq_estimated = models.IntegerField( _("Estimated TPQ"), blank=True, null=True, help_text=_('Estimation of a "Terminus Post Quem"'), ) identification = models.ForeignKey( IdentificationType, blank=True, null=True, on_delete=models.SET_NULL, verbose_name=_("Identification"), ) activity = models.ForeignKey( ActivityType, blank=True, null=True, on_delete=models.SET_NULL, verbose_name=_("Activity"), ) excavation_technic = models.ForeignKey( ExcavationTechnicType, blank=True, null=True, on_delete=models.SET_NULL, verbose_name=_("Excavation technique"), ) related_context_records = models.ManyToManyField( "ContextRecord", through="RecordRelations", blank=True ) documents = models.ManyToManyField( Document, related_name="context_records", verbose_name=_("Documents"), blank=True, ) main_image = models.ForeignKey( Document, related_name="main_image_context_records", on_delete=models.SET_NULL, verbose_name=_("Main image"), blank=True, null=True, ) cached_label = models.TextField( _("Cached name"), blank=True, default="", db_index=True ) cached_periods = models.TextField( _("Cached periods label"), blank=True, default="", help_text=_("Generated automatically - do not edit"), ) cached_related_context_records = models.TextField( _("Cached related context records"), blank=True, default="", help_text=_("Generated automatically - do not edit"), ) class Meta: verbose_name = _("Context Record") verbose_name_plural = _("Context Record") permissions = ( ("view_contextrecord", "Can view all Context Records"), ("view_own_contextrecord", "Can view own Context Record"), ("add_own_contextrecord", "Can add own Context Record"), ("change_own_contextrecord", "Can change own Context Record"), ("delete_own_contextrecord", "Can delete own Context Record"), ) ordering = ("cached_label",) indexes = [ GinIndex(fields=["data"]), ] def natural_key(self): return (self.uuid,) @property def name(self): return self.label or "" @property def short_class_name(self): return pgettext("short", "Context record") def __str__(self): return self.short_label or "" @property def surface_ha(self): if self.surface: return self.surface / 10000.0 def public_representation(self): dct = super(ContextRecord, self).public_representation() dct.update( { "operation": self.operation.public_representation(), "site": self.archaeological_site and self.archaeological_site.public_representation(), "parcel": str(self.parcel), "town": self.town.label_with_areas if self.town else None, "label": self.label, "description": self.description, "comment": self.comment, } ) return dct DOC_VALUES = [ ("base_finds", _("List of associated base finds")), ] def get_values(self, prefix="", no_values=False, filtr=None, **kwargs): no_base_finds = True if "no_base_finds" in kwargs: no_base_finds = kwargs["no_base_finds"] values = super(ContextRecord, self).get_values( prefix=prefix, no_values=no_values, filtr=filtr, **kwargs ) if prefix and no_base_finds: return values if not filtr or prefix + "base_finds" in filtr: values[prefix + "base_finds"] = [ bf.get_values(prefix=prefix, no_values=True, filtr=None, **kwargs) for bf in self.base_finds.distinct().all() ] return values def get_town_centroid(self): if self.town: return self.town.center, self._meta.verbose_name if self.archaeological_site: centroid = self.archaeological_site.get_town_centroid() if centroid: return centroid return self.operation.get_town_centroid() def get_town_polygons(self): if self.town: return self.town.limit, self._meta.verbose_name if self.archaeological_site: polys = self.archaeological_site.get_town_polygons() if polys: return polys return self.operation.get_town_polygons() def get_precise_points(self): precise_points = super(ContextRecord, self).get_precise_points() if precise_points: return precise_points if self.archaeological_site: precise_points = self.archaeological_site.get_precise_points() if precise_points: return precise_points return self.operation.get_precise_points() def get_precise_polygons(self): precise_poly = super(ContextRecord, self).get_precise_polygons() if precise_poly: return precise_poly if self.archaeological_site: precise_poly = self.archaeological_site.get_precise_polygons() if precise_poly: return precise_poly return self.operation.get_precise_polygons() def get_geo_items(self, get_polygons, rounded=True): dict = super(ContextRecord, self).get_geo_items(get_polygons, rounded) BaseFind = apps.get_model("archaeological_finds", "BaseFind") collection_base_finds = [] for bf in self.base_finds.distinct().all(): try: geo_item = bf.get_geo_items(get_polygons, rounded) collection_base_finds.append(geo_item) except BaseFind.DoesNotExist: pass dict["properties"]["base-finds"] = { "type": "FeatureCollection", "features": collection_base_finds, } return dict @classmethod def cached_label_bulk_update( cls, operation_id=None, parcel_id=None, transaction_id=None ): transaction_id, is_recursion = cls.bulk_recursion( transaction_id, [operation_id, parcel_id] ) if is_recursion: return if operation_id: where = "operation_id = %s" args = [int(operation_id)] kwargs = {"operation_id": operation_id} elif parcel_id: where = "parcel_id = %s" args = [int(parcel_id)] kwargs = {"parcel_id": parcel_id} else: return kwargs["transaction_id"] = transaction_id profile = get_current_profile() sql = """ UPDATE "archaeological_context_records_contextrecord" AS cr SET cached_label = CASE WHEN context_records_cached_label_bulk_update.main_code IS NULL THEN CASE WHEN context_records_cached_label_bulk_update.year IS NOT NULL AND context_records_cached_label_bulk_update.ope_code IS NOT NULL THEN '{ope_prefix}' || context_records_cached_label_bulk_update.year || '-' || context_records_cached_label_bulk_update.ope_code ELSE '' END ELSE '{main_ope_prefix}' || context_records_cached_label_bulk_update.main_code END || '{join}' || context_records_cached_label_bulk_update.section || '{join}' || context_records_cached_label_bulk_update.number || '{join}' || context_records_cached_label_bulk_update.label FROM context_records_cached_label_bulk_update WHERE cr.id = context_records_cached_label_bulk_update.id AND cr.id IN ( SELECT id FROM archaeological_context_records_contextrecord WHERE {where} ); """.format( main_ope_prefix=profile.operation_prefix, ope_prefix=profile.default_operation_prefix, join=settings.JOINT, where=where, ) with connection.cursor() as c: c.execute(sql, args) cls._meta.get_field("base_finds").related_model.cached_label_bulk_update( **kwargs ) @property def short_label(self): return settings.JOINT.join( [ str(item) for item in [self.operation.get_reference(), self.parcel, self.label] if item ] ) @property def relation_label(self): return self.label def all_base_finds(self): BaseFind = apps.get_model("archaeological_finds", "BaseFind") ids = [self.id] + [ cr.cr_id for cr in ContextRecordTree.objects.filter(cr_parent_id=self.id) ] return BaseFind.objects.filter(context_record_id__in=set(ids)) @property def show_url(self): return reverse("show-contextrecord", args=[self.pk, ""]) def get_extra_actions(self, request): # url, base_text, icon, extra_text, extra css class, is a quick action actions = super(ContextRecord, self).get_extra_actions(request) # is_locked = hasattr(self, "is_locked") and self.is_locked(request.user) can_create_find = self.can_do(request, "add_find") if can_create_find: actions += [ ( reverse("find_create", args=[self.pk]), _("Add find"), "fa fa-plus", _("find"), "", False, ), ] can_edit_cr = self.can_do(request, "change_contextrecord") if can_edit_cr: actions += [ ( reverse("contextrecord-qa-duplicate", args=[self.pk]), _("Duplicate"), "fa fa-clone", "", "", True, ), ] return actions @classmethod def get_query_owns(cls, ishtaruser): return ( cls._construct_query_own( "operation__", Operation._get_query_owns_dicts(ishtaruser) ) | cls._construct_query_own( "base_finds__find__basket__", [{"shared_with": ishtaruser, "shared_write_with": ishtaruser}], ) | cls._construct_query_own( "", [ {"history_creator": ishtaruser.user_ptr}, {"operation__end_date__isnull": True}, ], ) ) @classmethod def get_owns( cls, user, menu_filtr=None, limit=None, values=None, get_short_menu_class=None ): replace_query = None if menu_filtr and "operation" in menu_filtr: replace_query = Q(operation=menu_filtr["operation"]) owns = super(ContextRecord, cls).get_owns( user, replace_query=replace_query, limit=limit, values=values, get_short_menu_class=get_short_menu_class, ) return cls._return_get_owns(owns, values, get_short_menu_class) def full_label(self): return str(self) def _real_label(self): if not self.operation.code_patriarche: return return settings.JOINT.join((self.operation.code_patriarche, self.label)) def _temp_label(self): if self.operation.code_patriarche: return return settings.JOINT.join( [ str(lbl) for lbl in [ self.operation.year, self.operation.operation_code, self.label, ] if lbl ] ) def _generate_cached_label(self): return self.full_label() def _generate_cached_periods(self): return " & ".join(dating.period.label for dating in self.datings.all()) def _generate_cached_related_context_records(self): return self.detailed_related_context_records() def _get_associated_cached_labels(self): BaseFind = apps.get_model("archaeological_finds", "BaseFind") Find = apps.get_model("archaeological_finds", "Find") return list(Find.objects.filter(base_finds__context_record=self).all()) + list( BaseFind.objects.filter(context_record=self).all() ) def _cached_labels_bulk_update(self): self.base_finds.model.cached_label_bulk_update(context_record_id=self.pk) return True def _get_base_image_path(self): return self.operation._get_base_image_path() + "/{}/{}".format( self.SLUG, slugify(self.label or "00") ) @property def archaeological_site_reference(self): if self.archaeological_site: return self.archaeological_site.reference if self.operation.archaeological_sites.count(): return "-".join( a.reference for a in self.operation.archaeological_sites.all() ) return "" @property def reference(self): if not self.operation: return "00" return self.full_label() def get_department(self): if not self.operation: return "00" return self.operation.get_department() def get_town_label(self): if not self.operation: return "00" return self.operation.get_town_label() @classmethod def get_periods(cls, slice="year", fltr={}): q = cls.objects if fltr: q = q.filter(**fltr) if slice == "year": years = set() for res in list(q.values("operation__start_date")): if res["operation__start_date"]: yr = res["operation__start_date"].year years.add(yr) return list(years) return [] @classmethod def get_by_year(cls, year, fltr={}): q = cls.objects if fltr: q = q.filter(**fltr) return q.filter(operation__start_date__year=year) @classmethod def get_operations(cls): return [ dct["operation__pk"] for dct in cls.objects.values("operation__pk").distinct() ] @classmethod def get_by_operation(cls, operation_id): return cls.objects.filter(operation__pk=operation_id) @classmethod def get_total_number(cls, fltr={}): q = cls.objects if fltr: q = q.filter(**fltr) return q.count() def detailed_related_context_records(self): crs = [ "{} ({})".format(cr.right_record, cr.relation_type.get_tiny_label()) for cr in self.right_relations.all() ] return " & ".join(crs) def find_docs_q(self): return Document.objects.filter(finds__base_finds__context_record=self) def fix(self): """ Fix redundant m2m dating association (usually after imports) """ Dating.fix_dating_association(self) def save(self, *args, **kwargs): super(ContextRecord, self).save(*args, **kwargs) if (not self.town and self.parcel) or ( self.parcel and self.parcel.town != self.town ): self.town = self.parcel.town self.skip_history_when_saving = True self.save() def context_record_post_save(sender, **kwargs): cached_label_changed(sender=sender, **kwargs) post_save_geo(sender=sender, **kwargs) post_save.connect(context_record_post_save, sender=ContextRecord) m2m_changed.connect(document_attached_changed, sender=ContextRecord.documents.through) for attr in ContextRecord.HISTORICAL_M2M: m2m_changed.connect( m2m_historization_changed, sender=getattr(ContextRecord, attr).through ) class RelationType(GeneralRelationType): class Meta: verbose_name = _("Relation type") verbose_name_plural = _("Relation types") ordering = ("order", "label") class RecordRelationsManager(models.Manager): def get_by_natural_key(self, left_record, right_record, relation_type): return self.get( left_record__uuid=left_record, right_record__uuid=right_record, relation_type__txt_idx=relation_type, ) class RecordRelations(GeneralRecordRelations, models.Model): MAIN_ATTR = "left_record" left_record = models.ForeignKey(ContextRecord, related_name="right_relations") right_record = models.ForeignKey(ContextRecord, related_name="left_relations") relation_type = models.ForeignKey(RelationType) objects = RecordRelationsManager() TABLE_COLS = [ "left_record__label", "left_record__unit", "left_record__parcel", "relation_type", "right_record__label", "right_record__unit", "right_record__parcel", ] COL_LABELS = { "left_record__label": _("ID (left)"), "left_record__unit": _("Context record type (left)"), "left_record__parcel": _("Parcel (left)"), "left_record__description": _("Description (left)"), "left_record__datings__period": _("Periods (left)"), "relation_type": _("Relation type"), "right_record__label": _("ID (right)"), "right_record__unit": _("Context record type (right)"), "right_record__parcel": _("Parcel (right)"), "right_record__description": _("Description (right)"), "right_record__datings__period": _("Periods (right)"), } # search parameters EXTRA_REQUEST_KEYS = {"left_record__operation": "left_record__operation__pk"} class Meta: verbose_name = _("Record relation") verbose_name_plural = _("Record relations") permissions = [ ("view_recordrelation", "Can view all Context record relations"), ] def natural_key(self): return self.left_record.uuid, self.right_record.uuid, self.relation_type.txt_idx def post_delete_cr_record_relation(sender, instance, **kwargs): if getattr(sender, "_no_post_treatments", False): return post_delete_record_relation(sender, instance, **kwargs) ContextRecordTree.update(instance.left_record_id) ContextRecordTree.update(instance.right_record_id) def post_save_cr_record_relation(sender, instance, **kwargs): if getattr(sender, "_no_post_treatments", False): return ContextRecordTree.update(instance.left_record_id) ContextRecordTree.update(instance.right_record_id) post_delete.connect(post_delete_cr_record_relation, sender=RecordRelations) post_save.connect(post_save_cr_record_relation, sender=RecordRelations) class RecordRelationView(models.Model): CREATE_SQL = """ CREATE VIEW record_relations AS SELECT DISTINCT right_record_id as id, right_record_id, left_record_id, relation_type_id FROM archaeological_context_records_recordrelations; -- deactivate deletion CREATE RULE record_relations_del AS ON DELETE TO record_relations DO INSTEAD DELETE FROM record_relations where id=NULL; """ DELETE_SQL = """ DROP VIEW IF EXISTS record_relations; """ TABLE_COLS = [ "relation_type", "right_record__label", "right_record__unit", "right_record__parcel", "right_record__datings__period", "right_record__description", ] COL_LABELS = { "relation_type": _("Relation type"), "right_record__label": _("ID"), "right_record__unit": _("Context record type"), "right_record__parcel": _("Parcel"), "right_record__description": _("Description"), "right_record__datings__period": _("Periods"), } # search parameters EXTRA_REQUEST_KEYS = { "left_record_id": "left_record_id", "right_record__unit": "right_record__unit__label", } left_record = models.ForeignKey( ContextRecord, related_name="+", on_delete=models.DO_NOTHING ) right_record = models.ForeignKey( ContextRecord, related_name="+", on_delete=models.DO_NOTHING ) relation_type = models.ForeignKey( RelationType, related_name="+", on_delete=models.DO_NOTHING ) class Meta: managed = False db_table = "record_relations" unique_together = ("id", "right_record") permissions = [ ("view_recordrelation", "Can view all record relations - view"), ] @classmethod def general_types(cls): return [] def __str__(self): return '{} "{}"'.format(self.relation_type, self.right_record) class ContextRecordTree(RelationsViews): CREATE_SQL = """ CREATE VIEW cr_parent_relation_id AS SELECT id FROM archaeological_context_records_relationtype WHERE logical_relation in ('included', 'equal'); CREATE VIEW context_records_tree AS WITH RECURSIVE rel_tree AS ( SELECT cr.id AS cr_id, cr.id AS cr_parent_id, 1 AS level, cr.id || '_' || cr.id || '_1' AS key FROM archaeological_context_records_contextrecord cr UNION ALL SELECT rel.left_record_id AS cr_id, rel.right_record_id AS cr_parent_id, 1 AS level, rel.left_record_id || '_' || rel.right_record_id || '_1' AS key FROM archaeological_context_records_recordrelations rel WHERE rel.relation_type_id in ( SELECT id FROM cr_parent_relation_id ) UNION ALL SELECT p.cr_id AS cr_id, rel.right_record_id AS cr_parent_id, p.level + 1, p.cr_id || '_' || rel.right_record_id || '_' || p.level + 1 AS key FROM archaeological_context_records_recordrelations rel, rel_tree p WHERE rel.left_record_id = p.cr_parent_id AND rel.relation_type_id in ( SELECT id FROM cr_parent_relation_id ) AND p.level < 10 -- prevent recursive... ) SELECT DISTINCT key, cr_id, cr_parent_id, level FROM rel_tree; CREATE VIEW context_record_tree AS SELECT DISTINCT y.key, y.cr_id, y.cr_parent_id FROM (SELECT * FROM context_records_tree) y ORDER BY y.cr_id, y.cr_parent_id; -- deactivate deletion, update CREATE RULE context_records_tree_del AS ON DELETE TO context_records_tree DO INSTEAD DELETE FROM archaeological_context_records_contextrecord WHERE id=NULL; CREATE RULE context_record_tree_del AS ON DELETE TO context_record_tree DO INSTEAD DELETE FROM archaeological_context_records_contextrecord WHERE id=NULL; CREATE RULE context_records_tree_update AS ON UPDATE TO context_records_tree DO INSTEAD UPDATE archaeological_context_records_contextrecord SET id=id WHERE id=NULL; CREATE RULE context_record_tree_update AS ON UPDATE TO context_record_tree DO INSTEAD UPDATE archaeological_context_records_contextrecord SET id=id WHERE id=NULL; CREATE RULE context_records_tree_insert AS ON INSERT TO context_records_tree DO INSTEAD UPDATE archaeological_context_records_contextrecord SET id=id WHERE id=NULL; CREATE RULE context_record_tree_insert AS ON INSERT TO context_record_tree DO INSTEAD UPDATE archaeological_context_records_contextrecord SET id=id WHERE id=NULL; """ DELETE_SQL = """ DROP VIEW IF EXISTS context_record_tree; DROP VIEW IF EXISTS context_records_tree; DROP VIEW IF EXISTS cr_parent_relation_id; """ CREATE_TABLE_SQL = """ CREATE TABLE {table} ( key varchar(100) PRIMARY KEY, cr_id integer NOT NULL, cr_parent_id integer NOT NULL, CONSTRAINT fk1_{table} FOREIGN KEY(cr_id) REFERENCES {fk_table}(id) ON DELETE CASCADE, CONSTRAINT fk2_{table} FOREIGN KEY(cr_parent_id) REFERENCES {fk_table}(id) ON DELETE CASCADE ); CREATE INDEX {table}_id ON {table} (cr_id); CREATE INDEX {table}_parent_id ON {table} (cr_parent_id); """.format( table="context_records_tree", fk_table="archaeological_context_records_contextrecord", ) key = models.TextField(primary_key=True) cr = models.ForeignKey( "archaeological_context_records.ContextRecord", verbose_name=_("Context record"), related_name="context_record_tree_parent", on_delete=models.CASCADE, ) cr_parent = models.ForeignKey( "archaeological_context_records.ContextRecord", verbose_name=_("Context record parent"), related_name="context_record_tree_child", on_delete=models.CASCADE, ) class Meta: managed = False db_table = "context_records_tree" @classmethod def _save_tree(cls, tree): keys = [] for idx, parent_id in enumerate(tree[:-1]): for child_id in tree[idx:]: if child_id != parent_id: cls.objects.get_or_create( key=f"{child_id}_{parent_id}", cr_id=child_id, cr_parent_id=parent_id, ) keys.append((child_id, parent_id)) return keys @classmethod def _get_base_relations(cls): return RelationType.objects.filter( logical_relation__in=("included", "equal") ).values_list("id", flat=True) @classmethod def _get_base_equal_relations(cls): return RelationType.objects.filter(logical_relation="equal").values_list( "id", flat=True ) @classmethod def _get_base_included_relations(cls): return RelationType.objects.filter(logical_relation="included").values_list( "id", flat=True ) @classmethod def _get_base_children(cls): return ContextRecord.objects.values_list("id", flat=True) @classmethod def _update_child(cls, parent_id, tree, rel_types): whole_tree = set() children = list( RecordRelations.objects.values_list("left_record_id", flat=True).filter( right_record_id=parent_id, relation_type_id__in=rel_types ) ) to_be_pop = [] for idx, c in enumerate(children[:]): if c in tree: # cyclic to_be_pop.append(idx) for idx in reversed(to_be_pop): children.pop(idx) if not children: # last leaf in the tree return cls._save_tree(tree) for c in children: whole_tree.update(cls._update_child(c, tree[:] + [c], rel_types)) return whole_tree @classmethod def _get_parent_trees(cls, child_id, trees, rel_types, deep=0): parents = RecordRelations.objects.values_list( "right_record_id", flat=True ).filter(left_record_id=child_id, relation_type_id__in=rel_types) if not parents: return trees new_trees = [] for p in set(parents): if p == child_id or any(1 for tree in trees if p in tree): # cyclic continue c_trees = list(map(lambda x: x + [p], trees)) new_trees += cls._get_parent_trees(p, c_trees, rel_types, deep + 1) return new_trees @classmethod def _get_equals(cls, item_id, equal_rel_types, exclude=None): if not exclude: exclude = [item_id] q = RecordRelations.objects.values_list("right_record_id", flat=True).filter( left_record_id=item_id, relation_type_id__in=equal_rel_types ) q = q.exclude(right_record_id__in=exclude) equals = list(q) q = RecordRelations.objects.values_list("left_record_id", flat=True).filter( right_record_id=item_id, relation_type_id__in=equal_rel_types ) q = q.exclude(left_record_id__in=exclude) equals += list(q) exclude += equals for eq_id in equals: equals += cls._get_equals(eq_id, equal_rel_types, exclude=exclude) return equals @classmethod def _update_equals(cls, item_id, equals): keys = [] for equal_id in equals: if item_id != equal_id: cls.objects.get_or_create( key=f"{item_id}_{equal_id}", cr_id=item_id, cr_parent_id=equal_id ) keys.append((item_id, equal_id)) cls.objects.get_or_create( key=f"{equal_id}_{item_id}", cr_id=equal_id, cr_parent_id=item_id ) keys.append((equal_id, item_id)) return keys @classmethod def _update_relations_equals(cls, relations): equal_rel_types = cls._get_base_equal_relations() keys = [] for child_id, parent_id in relations: equals_child = set(cls._get_equals(child_id, equal_rel_types)) keys += cls._update_equals(child_id, equals_child) for alt_child in equals_child: if alt_child != child_id: cls.objects.get_or_create( key=f"{alt_child}_{parent_id}", cr_id=alt_child, cr_parent_id=parent_id, ) keys.append((alt_child, parent_id)) equals_parent = set(cls._get_equals(parent_id, equal_rel_types)) keys += cls._update_equals(parent_id, equals_parent) for alt_parent in equals_parent: if alt_parent != parent_id: cls.objects.get_or_create( key=f"{child_id}_{alt_parent}", cr_id=child_id, cr_parent_id=alt_parent, ) keys.append((child_id, alt_parent)) for alt_child in equals_child: if alt_child != child_id: cls.objects.get_or_create( key=f"{alt_child}_{alt_parent}", cr_id=alt_child, cr_parent_id=alt_parent, ) keys.append((alt_child, alt_parent)) return set(keys) @classmethod def _update(cls, item_id, already_updated=None): all_relations = set() # add self relation cls.objects.get_or_create( key=f"{item_id}_{item_id}", cr_id=item_id, cr_parent_id=item_id ) all_relations.add((item_id, item_id)) current_relations_as_child = list( cls.objects.filter(cr_id=item_id).values_list("cr_parent_id", flat=True) ) current_relations_as_parent = list( cls.objects.filter(cr_parent_id=item_id).values_list("cr_id", flat=True) ) ## update the whole tree inc_rel_types = cls._get_base_included_relations() # get first parents parent_ids = [ tree[-1] for tree in cls._get_parent_trees(item_id, [[item_id]], inc_rel_types) ] if not parent_ids: parent_ids = [item_id] # get all child for parents and save trees for parent_id in parent_ids: tree = [parent_id] all_relations.update(cls._update_child(parent_id, tree, inc_rel_types)) all_relations.update(cls._update_relations_equals(all_relations)) if not all_relations: equal_rel_types = cls._get_base_equal_relations() equals = set(cls._get_equals(item_id, equal_rel_types)) all_relations.update(cls._update_equals(item_id, equals)) ## delete old relations if not already_updated: already_updated = [item_id] for parent_id in current_relations_as_child: if ( item_id, parent_id, ) not in all_relations and parent_id not in already_updated: # disappeared - must regenerate already_updated.append(parent_id) cls.objects.filter(key=f"{item_id}_{parent_id}").delete() cls._update(parent_id, already_updated) for child_id in current_relations_as_parent: if ( child_id, item_id, ) not in all_relations and child_id not in already_updated: # disappeared - must regenerate already_updated.append(child_id) cls.objects.filter(key=f"{child_id}_{item_id}").delete() cls._update(child_id, already_updated)