#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2012-2025 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. from collections import OrderedDict import uuid from django.apps import apps from django.conf import settings from django.contrib.gis.db import models from django.contrib.postgres.indexes import GinIndex from django.contrib.sites.models import Site from django.db import transaction, OperationalError, IntegrityError from django.db.models import Q from django.db.models.signals import post_delete, post_save, m2m_changed from django.urls import reverse, reverse_lazy from ishtar_common.utils import ugettext_lazy as _, pgettext_lazy, pgettext from django.utils.text import slugify from ishtar_common.utils import ( cached_label_changed, m2m_historization_changed, post_save_geo, SearchAltName, ) from ishtar_common.models import ( Document, Person, GeneralType, BaseHistorizedItem, OwnPerms, Imported, GeneralRelationType, GeneralRecordRelations, OrderedHierarchicalType, post_delete_record_relation, post_save_cache, ValueGetter, RelationItem, Town, get_current_profile, document_attached_changed, HistoryModel, GeoItem, CompleteIdentifierItem, SearchVectorConfig, DocumentItem, MainItem, QuickAction, RelationsViews, ) from ishtar_common.models_common import GeoVectorData, HistoricalRecords,\ SerializeItem, geodata_attached_changed from archaeological_operations.models import ( add_oa_prefix, ArchaeologicalSite, CulturalAttributionType, Operation, Period, Parcel, ) from ishtar_common.model_managers import UUIDModelManager class DatingType(GeneralType): class Meta: verbose_name = _("Dating type") verbose_name_plural = _("Dating types") ordering = ("label",) post_save.connect(post_save_cache, sender=DatingType) post_delete.connect(post_save_cache, sender=DatingType) class DatingQuality(GeneralType): class Meta: verbose_name = _("Dating quality type") verbose_name_plural = _("Dating quality types") ordering = ("label",) post_save.connect(post_save_cache, sender=DatingQuality) post_delete.connect(post_save_cache, sender=DatingQuality) class Dating(models.Model, SerializeItem): SLUG = "dating" SERIALIZE_EXCLUDE = ["find", "context_record"] uuid = models.UUIDField(default=uuid.uuid4) period = models.ForeignKey( Period, verbose_name=_("Chronological period"), on_delete=models.PROTECT ) start_date = models.IntegerField(_("Start date"), blank=True, null=True) end_date = models.IntegerField(_("End date"), blank=True, null=True) dating_type = models.ForeignKey( DatingType, verbose_name=_("Dating type"), on_delete=models.SET_NULL, blank=True, null=True, ) quality = models.ForeignKey( DatingQuality, verbose_name=_("Quality"), on_delete=models.SET_NULL, blank=True, null=True, ) precise_dating = models.TextField(_("Precise on this dating"), blank=True, default="") objects = UUIDModelManager() ASSOCIATED_ALT_NAMES = { "datings__period": SearchAltName( pgettext_lazy("key for text search", "datings-period"), "datings__period__label__iexact", ), "datings__precise_dating": SearchAltName( pgettext_lazy("key for text search", "datings-precise"), "datings__precise_dating__iexact", ), "datings__start_date": SearchAltName( pgettext_lazy("key for text search", "datings-start"), "datings__start_date", ), "datings__end_date": SearchAltName( pgettext_lazy("key for text search", "datings-end"), "datings__end_date", ), "datings__dating_type": SearchAltName( pgettext_lazy("key for text search", "datings-type"), "datings__dating_type__label__iexact", ), "datings__quality": SearchAltName( pgettext_lazy("key for text search", "datings-quality"), "datings__quality__label__iexact", ), } class Meta: verbose_name = _("Dating") verbose_name_plural = _("Datings") def __str__(self): if self.precise_dating and self.precise_dating.strip(): return self.precise_dating.strip() start_date = self.start_date and str(self.start_date) or "" end_date = self.end_date and str(self.end_date) or "" if not start_date and not end_date: return str(self.period) return "%s (%s-%s)" % (self.period, start_date, end_date) def natural_key(self): return (self.uuid,) def get_values(self, prefix="", no_values=False, filtr=None, **kwargs): values = {} if not filtr or prefix + "period" in filtr: values[prefix + "period"] = str(self.period) if not filtr or prefix + "start_date" in filtr: values[prefix + "start_date"] = self.start_date or "" if not filtr or prefix + "end_date" in filtr: values[prefix + "end_date"] = self.end_date or "" if not filtr or prefix + "dating_type" in filtr: values[prefix + "dating_type"] = ( str(self.dating_type) if self.dating_type else "" ) if not filtr or prefix + "quality" in filtr: values[prefix + "quality"] = str(self.quality) if self.quality else "" if not filtr or prefix + "precise_dating" in filtr: values[prefix + "precise_dating"] = self.precise_dating return values HISTORY_ATTR = [ "period", "start_date", "end_date", "dating_type", "quality", "precise_dating", ] def history_compress(self): values = {} for attr in self.HISTORY_ATTR: val = getattr(self, attr) if hasattr(val, "history_compress"): val = val.history_compress() elif hasattr(val, "isoformat"): val = val.isoformat() elif val is None: val = "" else: val = str(val) values[attr] = val return values @classmethod def history_decompress(cls, full_value, create=False): if not full_value: return [] full_res = [] for value in full_value: res = {} for key in value: val = value[key] if val == "" and key != "precise_dating": val = None elif key in ("period", "dating_type", "quality"): field = cls._meta.get_field(key) q = field.related_model.objects.filter(txt_idx=val) if q.count(): val = q.all()[0] else: # do not exist anymore in db val = None elif key in ("start_date", "end_date"): val = int(val) res[key] = val if create: res = cls.objects.create(**res) full_res.append(res) return full_res @classmethod def is_identical(cls, dating_1, dating_2): """ Compare two dating attribute by attribute and return True if all attribute is identical """ for attr in [ "period", "start_date", "end_date", "dating_type", "quality", "precise_dating", ]: value1 = getattr(dating_1, attr) value2 = getattr(dating_2, attr) if attr == "precise_dating": if value1: value1 = value1.strip() if value2: value2 = value2.strip() if value1 != value2: return False return True def context_records_lbl(self): return " - ".join(cr.cached_label for cr in self.context_records.all()) context_records_lbl.short_description = _("Context record") context_records_lbl.admin_order_field = "context_records__cached_label" def finds_lbl(self): return " - ".join(f.cached_label for f in self.find.all()) finds_lbl.short_description = _("Find") finds_lbl.admin_order_field = "find__cached_label" @classmethod def fix_dating_association(cls, obj): """ Fix redundant m2m dating association (usually after imports) """ current_datings = [] for dating in obj.datings.order_by("pk").all(): key = ( dating.period.pk, dating.start_date, dating.end_date, dating.dating_type, dating.quality, dating.precise_dating, ) if key not in current_datings: current_datings.append(key) continue dating.delete() class Unit(GeneralType): order = models.IntegerField(_("Order")) parent = models.ForeignKey( "Unit", verbose_name=_("Parent context record type"), on_delete=models.SET_NULL, blank=True, null=True, ) class Meta: verbose_name = _("Context record Type") verbose_name_plural = _("Context record Types") ordering = ("order", "label") def __str__(self): return self.label post_save.connect(post_save_cache, sender=Unit) post_delete.connect(post_save_cache, sender=Unit) class ActivityType(GeneralType): order = models.IntegerField(_("Order")) class Meta: verbose_name = _("Activity Type") verbose_name_plural = _("Activity Types") ordering = ("order", "label") def __str__(self): return self.label post_save.connect(post_save_cache, sender=ActivityType) post_delete.connect(post_save_cache, sender=ActivityType) class IdentificationType(OrderedHierarchicalType): class Meta: verbose_name = _("Identification Type") verbose_name_plural = _("Identification Types") ordering = ("order", "label") def __str__(self): return self.label post_save.connect(post_save_cache, sender=IdentificationType) post_delete.connect(post_save_cache, sender=IdentificationType) class StructureType(OrderedHierarchicalType): class Meta: verbose_name = _("Structure Type") verbose_name_plural = _("Structure Types") ordering = ("order", "label") def __str__(self): return self.label post_save.connect(post_save_cache, sender=StructureType) post_delete.connect(post_save_cache, sender=StructureType) class TextureType(OrderedHierarchicalType): class Meta: verbose_name = _("Texture Type") verbose_name_plural = _("Texture Types") ordering = ("order", "label") def __str__(self): return self.label post_save.connect(post_save_cache, sender=TextureType) post_delete.connect(post_save_cache, sender=TextureType) class ColorType(OrderedHierarchicalType): class Meta: verbose_name = _("Color Type") verbose_name_plural = _("Color Types") ordering = ("order", "label") def __str__(self): return self.label post_save.connect(post_save_cache, sender=ColorType) post_delete.connect(post_save_cache, sender=ColorType) class InclusionType(OrderedHierarchicalType): class Meta: verbose_name = _("Inclusion Type") verbose_name_plural = _("Inclusion Types") ordering = ("order", "label") def __str__(self): return self.label post_save.connect(post_save_cache, sender=InclusionType) post_delete.connect(post_save_cache, sender=InclusionType) class ExcavationTechnicType(OrderedHierarchicalType): class Meta: verbose_name = _("Excavation technique type") verbose_name_plural = _("Excavation technique types") ordering = ("order", "label") post_save.connect(post_save_cache, sender=ExcavationTechnicType) post_delete.connect(post_save_cache, sender=ExcavationTechnicType) class DocumentationType(OrderedHierarchicalType): class Meta: verbose_name = _("Documentation type") verbose_name_plural = _("Documentation types") ordering = ("order", "label") post_save.connect(post_save_cache, sender=DocumentationType) post_delete.connect(post_save_cache, sender=DocumentationType) class GeographicSubTownItem(GeoItem): UPPER_GEO = [] class Meta: abstract = True def _get_geo_town(self): raise NotImplementedError() def post_save_geo(self, save=True): # manage geodata towns if getattr(self, "_post_save_geo_ok", False) or not self.pk: # prevent infinite loop - should not happen, but... return self._post_save_geo_ok = True q_geodata_current_town = self.geodata.filter( source_content_type__model="town", source_content_type__app_label="ishtar_common", ) town = self._get_geo_town() has_geo_town = ( town and town.main_geodata and town.main_geodata.multi_polygon ) bad_towns = q_geodata_current_town if town: bad_towns = q_geodata_current_town.exclude(source_id=town.id) modified = False for bad_town in bad_towns.all(): self.geodata.remove(bad_town) if self.main_geodata == bad_town: self.main_geodata = None modified = True current_model = self.__class__ main_item_is_set = current_model.objects.filter( id=self.pk, main_geodata__source_content_type__app_label=current_model._meta.app_label, main_geodata__source_content_type__model=current_model._meta.model_name, ).count() # main geo is set for the current model for upper_attr in self.UPPER_GEO: q_dict = {"id": self.pk, f"{upper_attr}__main_geodata_id__isnull": False} q = current_model.objects.filter(**q_dict) if q.count(): upper = None main_geodata_id = q.values_list(f"{upper_attr}__main_geodata_id", flat=True)[0] if main_geodata_id not in self.geodata.values_list("id", flat=True): upper = getattr(self, upper_attr, None) modified = True self.geodata.add(upper.main_geodata) if not main_item_is_set: if self.main_geodata_id != main_geodata_id: if not upper: upper = getattr(self, upper_attr, None) modified = True self.main_geodata = upper.main_geodata main_item_is_set = True if modified and save: if settings.USE_BACKGROUND_TASK and hasattr(self, "no_post_process"): self.no_post_process() else: self.skip_history_when_saving = True self._no_move = True self._post_saved_geo = False self._no_down_model_update = False self.save() if not has_geo_town: return modified if not q_geodata_current_town.filter(source_id=town.id).count(): try: # multiple save, post treatments can cause synchronous add with transaction.atomic(): self.geodata.add(town.main_geodata) except (OperationalError, IntegrityError): pass if not modified: return False if save: post_save_geo(self.__class__, instance=self, created=False, update_fields=False, raw=False, using="default") return True class ContextRecord( DocumentItem, BaseHistorizedItem, CompleteIdentifierItem, GeographicSubTownItem, OwnPerms, ValueGetter, MainItem, RelationItem, ): SLUG = "contextrecord" APP = "archaeological-context-records" MODEL = "context-record" SHOW_URL = "show-contextrecord" DELETE_URL = "delete-contextrecord" EXTERNAL_ID_KEY = "context_record_external_id" EXTERNAL_ID_DEPENDENCIES = ["base_finds"] TABLE_COLS = [ "label", "operation__common_name", "town__name", "parcel__cached_label", "unit__label", ] if settings.COUNTRY == "fr": TABLE_COLS.insert(1, "operation__code_patriarche") TABLE_COLS_FOR_OPE = [ "label", "parcel__cached_label", "unit__label", "cached_periods", "description", ] NEW_QUERY_ENGINE = True COL_LABELS = { "cached_periods": _("Periods"), "datings__period__label": _("Periods"), "datings__period": _("Datings (period)"), "detailed_related_context_records": _("Related context records"), "cached_related_context_records": _("Related context records"), "operation__code_patriarche": _("Operation (Patriarche code)"), "operation__common_name": _("Operation (name)"), "parcel__external_id": _("Parcel (external ID)"), "town__name": _("Town"), "town": _("Town"), "parcel__year": _("Parcel (year)"), "section__parcel_number": _("Parcel"), "parcel__cached_label": _("Parcel"), "unit__label": _("Context record type"), } CONTEXTUAL_TABLE_COLS = { "full": {"related_context_records": "cached_related_context_records"} } # statistics STATISTIC_MODALITIES_OPTIONS = OrderedDict( [ ("unit__label", _("Context record type")), ("operation__cached_label", _("Operation")), ("datings__period__label", _("Chronological period")), ("identifications__label", _("Identification")), ("activity__label", _("Activity")), ("excavation_technics__label", _("Excavation techniques")), ("documents__source_type__label", _("Associated document type")), ("last_modified__year", _("Modification (year)")), ] ) STATISTIC_MODALITIES = [key for key, lbl in STATISTIC_MODALITIES_OPTIONS.items()] GET_VALUES_M2M = [ "documentations", "cultural_attributions", "excavation_technics", "identifications" ] # search parameters EXTRA_REQUEST_KEYS = { "town": "town__pk", "town__name": "town__name", "parcel__cached_label": "parcel__cached_label", "operation__year": "operation__year__contains", "year": "operation__year__contains", "operation__code_patriarche": "operation__code_patriarche", "operation__operation_code": "operation__operation_code", "operation__common_name": "operation__common_name", "datings__period": "datings__period__pk", "parcel_0": "operation__parcels__section", "parcel_1": "operation__parcels__parcel_number", "parcel_2": "operation__parcels__public_domain", "label": "label__icontains", "archaeological_sites": "operation__archaeological_sites__pk", "cached_label": "cached_label__icontains", "datings__period__label": "datings__period__label", "operation_id": "operation_id", # dynamic_table_documents "excavator_id": "excavator_id", # dynamic_table_documents "unit__label": "unit__label", } MANY_COUNTED_FIELDS = ["base_finds__isnull"] REVERSED_BOOL_FIELDS = [ "documents__image__isnull", "documents__associated_file__isnull", "documents__associated_url__isnull", ] NUMBER_FIELDS = ["operation__year", "operation__operation_code", "datings__start_date", "datings__end_date"] + GeographicSubTownItem.NUMBER_FIELDS RELATION_TYPES_PREFIX = { "ope_relation_types": "operation__", "cr_relation_types": "", } # alternative names of fields for searches ALT_NAMES = { "label": SearchAltName( pgettext_lazy("key for text search", "id"), "label__iexact" ), "town": SearchAltName( pgettext_lazy("key for text search", "town"), "town__cached_label__iexact" ), "area": SearchAltName( pgettext_lazy("key for text search", "area"), "town__areas__label__iexact" ), "operation__year": SearchAltName( pgettext_lazy("key for text search", "operation-year"), "operation__year" ), "operation__common_name": SearchAltName( pgettext_lazy("key for text search", "operation-name"), "operation__common_name__iexact", ), "operation__code_patriarche": SearchAltName( pgettext_lazy("key for text search", "patriarche"), "operation__code_patriarche__iexact", ), "operation__operation_code": SearchAltName( pgettext_lazy("key for text search", "operation-code"), "operation__operation_code", ), "operation__cached_label": SearchAltName( pgettext_lazy("key for text search", "operation"), "operation__cached_label__icontains", ), "archaeological_site": SearchAltName( pgettext_lazy("key for text search", "site"), "archaeological_site__cached_label__icontains", related_name="archaeological_site" ), "ope_relation_types": SearchAltName( pgettext_lazy("key for text search", "operation-relation-type"), "ope_relation_types", ), "datings__period": SearchAltName( pgettext_lazy("key for text search", "period"), "datings__period__label__iexact", ), "unit": SearchAltName( pgettext_lazy("key for text search", "unit-type"), "unit__label__iexact" ), "activity": SearchAltName( pgettext_lazy("key for text search", "activity"), "activity__label__iexact" ), "parcel": SearchAltName( pgettext_lazy("key for text search", "parcel"), "parcel__cached_label__iexact", ), "has_finds": SearchAltName( pgettext_lazy("key for text search", "has-finds"), "base_finds__isnull" ), "cr_relation_types": SearchAltName( pgettext_lazy("key for text search", "record-relation-type"), "cr_relation_types", ), "excavation_technics": SearchAltName( pgettext_lazy("key for text search", "excavation-technique"), "excavation_technics__label__iexact", ), "cultural_attributions": SearchAltName( pgettext_lazy("key for text search", "cultural-attribution"), "cultural_attributions__label__iexact", ), "identifications": SearchAltName( pgettext_lazy("key for text search", "identification"), "identifications__label__iexact" ), "documentations": SearchAltName( pgettext_lazy("key for text search", "documentation"), "documentations__label__iexact" ), "description": SearchAltName( pgettext_lazy("key for text search", "description"), "description__iexact" ), "filling": SearchAltName( pgettext_lazy("key for text search", "filling"), "filling__iexact" ), "interpretation": SearchAltName( pgettext_lazy("key for text search", "interpretation"), "interpretation__iexact" ), "comment": SearchAltName( pgettext_lazy("key for text search", "comment"), "comment__iexact" ), "structures": SearchAltName( pgettext_lazy("key for text search", "structure"), "structures__label__iexact", ), "textures": SearchAltName( pgettext_lazy("key for text search", "texture"), "textures__label__iexact", ), "inclusions": SearchAltName( pgettext_lazy("key for text search", "inclusion"), "inclusions__label__iexact", ), "colors": SearchAltName( pgettext_lazy("key for text search", "color"), "colors__label__iexact", ), "details_on_color": SearchAltName( pgettext_lazy("key for text search", "color-details"), "details_on_color__iexact" ), "excavator": SearchAltName( pgettext_lazy("key for text search", "excavator"), "excavator__cached_label__iexact", ), } ALT_NAMES.update(BaseHistorizedItem.ALT_NAMES) ALT_NAMES.update(DocumentItem.ALT_NAMES) ALT_NAMES.update(Dating.ASSOCIATED_ALT_NAMES) ALT_NAMES.update(GeoItem.ALT_NAMES) ALT_NAMES.update(Imported.ALT_NAMES) BASE_SEARCH_VECTORS = [ SearchVectorConfig("label", "raw"), SearchVectorConfig("location"), SearchVectorConfig("town__name"), SearchVectorConfig("town__numero_insee", "raw"), SearchVectorConfig("interpretation", "local"), SearchVectorConfig("filling", "local"), SearchVectorConfig("datings_comment", "local"), SearchVectorConfig("unit__label"), SearchVectorConfig("activity__label"), SearchVectorConfig("excavator__cached_label", "raw"), SearchVectorConfig("operation__code_patriarche", "raw"), SearchVectorConfig("operation__code_patriarche", "raw", func=add_oa_prefix), SearchVectorConfig("archaeological_site__name", "raw"), SearchVectorConfig("archaeological_site__other_reference", "raw"), SearchVectorConfig("archaeological_site__reference", "raw"), SearchVectorConfig("parcel__cached_label", "raw"), ] M2M_SEARCH_VECTORS = [ SearchVectorConfig("datings__period__label", "local"), SearchVectorConfig("excavation_technics__label", "local"), SearchVectorConfig("identifications__label", "local"), ] UP_MODEL_QUERY = { "operation": ( pgettext_lazy("key for text search", "operation"), "cached_label", ), "site": (pgettext_lazy("key for text search", "site"), "cached_label"), } MAIN_UP_MODEL_QUERY = "operation" RELATIVE_SESSION_NAMES = [ ("operation", "operation__pk"), ("site", "archaeological_site__pk"), ("file", "operation__associated_file__pk"), ] HISTORICAL_M2M = ["datings", "documentations", "excavation_technics", "identifications"] CACHED_LABELS = ["cached_label", "cached_periods", "cached_related_context_records"] DOWN_MODEL_UPDATE = ["base_finds"] QA_LOCK = QuickAction( url="contextrecord-qa-lock", icon_class="fa fa-lock", text=_("Lock/Unlock"), target="many", rights=[ "archaeological_context_records.change_contextrecord", "archaeological_context_records.change_own_contextrecord" ], btn_class="btn-warning" ) QA_LINK = QuickAction( url="contextrecord-qa-link", icon_class="fa fa-link", text=_("Link to account"), target="many", rights=["ishtaradmin"], btn_class="btn-warning" ) QA_EDIT = QuickAction( url="contextrecord-qa-bulk-update", icon_class="fa fa-pencil", text=_("Bulk update"), target="many", rights=[ "archaeological_context_records.change_contextrecord", "archaeological_context_records.change_own_contextrecord" ], ) QUICK_ACTIONS = [ QA_EDIT, QuickAction( url="contextrecord-qa-duplicate", icon_class="fa fa-clone", text=_("Duplicate"), target="one", rights=[ "archaeological_context_records.change_contextrecord", "archaeological_context_records.change_own_contextrecord" ], ), QA_LOCK, QA_LINK, ] SERIALIZE_EXCLUDE = MainItem.SERIALIZE_EXCLUDE + ["contextrecord"] SERIALIZE_PROPERTIES = MainItem.SERIALIZE_PROPERTIES + [ "short_label", "town_label_with_areas", ] UPPER_GEO = ["operation", "archaeological_site"] UPPER_PERMISSIONS = [(Operation, "operation_id")] SHEET_EMPTY_KEYS = [ "m2m_listing", "interpretation", "activity", "taq", "taq_estimated", "tpq", "tpq_estimated" ] DEFAULT_WIZARD = reverse_lazy("record_search", args=["general-record_search"]) history = HistoricalRecords(bases=[HistoryModel]) objects = UUIDModelManager() # fields uuid = models.UUIDField(default=uuid.uuid4) external_id = models.TextField(_("External ID"), blank=True, default="") auto_external_id = models.BooleanField( _("External ID is set automatically"), default=False ) parcel = models.ForeignKey( Parcel, verbose_name=_("Parcel"), related_name="context_record", on_delete=models.SET_NULL, blank=True, null=True, ) town = models.ForeignKey( Town, verbose_name=_("Town"), related_name="context_record", on_delete=models.SET_NULL, blank=True, null=True, ) operation = models.ForeignKey( Operation, verbose_name=_("Operation"), related_name="context_record", on_delete=models.CASCADE, ) archaeological_site = models.ForeignKey( ArchaeologicalSite, verbose_name=_("Archaeological site"), on_delete=models.SET_NULL, blank=True, null=True, related_name="context_records", ) label = models.CharField(_("ID"), max_length=200) description = models.TextField(_("Description"), blank=True, default="") comment = models.TextField(_("General comment"), blank=True, default="") excavator = models.ForeignKey( Person, verbose_name=_("Excavator"), on_delete=models.SET_NULL, blank=True, null=True, related_name="context_record_excavation" ) opening_date = models.DateField(_("Opening date"), blank=True, null=True) closing_date = models.DateField(_("Closing date"), blank=True, null=True) length = models.FloatField(_("Length (m)"), blank=True, null=True) excavated_length = models.FloatField(_("Excavated length (m)"), blank=True, null=True) width = models.FloatField(_("Width (m)"), blank=True, null=True) excavated_width = models.FloatField(_("Excavated width (m)"), blank=True, null=True) thickness = models.FloatField(_("Thickness (m)"), blank=True, null=True) diameter = models.FloatField(_("Diameter (m)"), blank=True, null=True) depth = models.FloatField(_("Depth (m)"), blank=True, null=True) depth_of_appearance = models.FloatField( _("Depth of appearance (m)"), blank=True, null=True ) surface = models.FloatField(_("Surface (m²)"), blank=True, null=True) location = models.TextField( _("Location"), blank=True, default="", help_text=_("A short description of the location of the context record"), ) datings = models.ManyToManyField(Dating, related_name="context_records") documentations = models.ManyToManyField(DocumentationType, blank=True) structures = models.ManyToManyField(StructureType, blank=True) textures = models.ManyToManyField(TextureType, blank=True) colors = models.ManyToManyField(ColorType, blank=True) details_on_color = models.TextField(_("Details on color"), blank=True, default="") inclusions = models.ManyToManyField(InclusionType, blank=True) datings_comment = models.TextField(_("Comment on datings"), blank=True, default="") unit = models.ForeignKey( Unit, verbose_name=_("Context record type"), on_delete=models.SET_NULL, related_name="+", blank=True, null=True, ) filling = models.TextField(_("Filling"), blank=True, default="") interpretation = models.TextField(_("Interpretation"), blank=True, default="") cultural_attributions = models.ManyToManyField( CulturalAttributionType, verbose_name=_("Cultural attribution"), blank=True ) taq = models.IntegerField( _("TAQ"), blank=True, null=True, help_text=_( '"Terminus Ante Quem" the context record can\'t have ' "been created after this date" ), ) taq_estimated = models.IntegerField( _("Estimated TAQ"), blank=True, null=True, help_text=_('Estimation of a "Terminus Ante Quem"'), ) tpq = models.IntegerField( _("TPQ"), blank=True, null=True, help_text=_( '"Terminus Post Quem" the context record can\'t have ' "been created before this date" ), ) tpq_estimated = models.IntegerField( _("Estimated TPQ"), blank=True, null=True, help_text=_('Estimation of a "Terminus Post Quem"'), ) identifications = models.ManyToManyField( IdentificationType, blank=True, verbose_name=_("Identification"), related_name="context_records", ) activity = models.ForeignKey( ActivityType, blank=True, null=True, on_delete=models.SET_NULL, verbose_name=_("Activity"), ) excavation_technics = models.ManyToManyField( ExcavationTechnicType, blank=True, verbose_name=_("Excavation techniques"), related_name="context_records", ) related_context_records = models.ManyToManyField( "ContextRecord", through="RecordRelations", blank=True ) documents = models.ManyToManyField( Document, related_name="context_records", verbose_name=_("Documents"), blank=True, ) main_image = models.ForeignKey( Document, related_name="main_image_context_records", on_delete=models.SET_NULL, verbose_name=_("Main image"), blank=True, null=True, ) cached_periods = models.TextField( _("Cached periods label"), blank=True, default="", help_text=_("Generated automatically - do not edit"), ) cached_related_context_records = models.TextField( _("Cached related context records"), blank=True, default="", help_text=_("Generated automatically - do not edit"), ) class Meta: verbose_name = _("Context Record") verbose_name_plural = _("Context Record") permissions = ( ("view_own_contextrecord", "Can view own Context Record"), ("change_own_contextrecord", "Can change own Context Record"), ("delete_own_contextrecord", "Can delete own Context Record"), ) ordering = ("cached_label",) indexes = [ GinIndex(fields=["data"]), ] def natural_key(self): return (self.uuid,) @property def name(self): return self.label or "" @property def short_class_name(self): return pgettext("short", "Context record") def __str__(self): return self.short_label or "" @property def surface_ha(self): if self.surface: return self.surface / 10000.0 def _get_geo_town(self): if self.parcel: return self.parcel.town return self.town def geodata_child_item_queries(self): return [self.base_finds] def public_representation(self): dct = super(ContextRecord, self).public_representation() dct.update( { "operation": self.operation.public_representation(), "site": self.archaeological_site and self.archaeological_site.public_representation(), "parcel": str(self.parcel), "town": self.town.label_with_areas if self.town else None, "label": self.label, "description": self.description, "comment": self.comment, } ) return dct DOC_VALUES = [ ("base_finds", _("List of associated base finds")), ] def get_extra_values(self, prefix="", no_values=False, filtr=None, **kwargs): values = {} no_base_finds = True if "no_base_finds" in kwargs: no_base_finds = kwargs["no_base_finds"] if prefix and no_base_finds or kwargs.get("force_no_base_finds", True): return values if not filtr or prefix + "base_finds" in filtr: values[prefix + "base_finds"] = [ bf.get_values(prefix=prefix, no_values=True, filtr=None, **kwargs) for bf in self.base_finds.distinct().all() ] return values def get_town_centroid(self): if self.town: return self.town.center, self._meta.verbose_name if self.archaeological_site: centroid = self.archaeological_site.get_town_centroid() if centroid: return centroid return self.operation.get_town_centroid() def get_town_polygons(self): if self.town: return self.town.limit, self._meta.verbose_name if self.archaeological_site: polys = self.archaeological_site.get_town_polygons() if polys: return polys return self.operation.get_town_polygons() def get_precise_points(self): precise_points = super(ContextRecord, self).get_precise_points() if precise_points: return precise_points if self.archaeological_site: precise_points = self.archaeological_site.get_precise_points() if precise_points: return precise_points return self.operation.get_precise_points() def get_precise_polygons(self): precise_poly = super(ContextRecord, self).get_precise_polygons() if precise_poly: return precise_poly if self.archaeological_site: precise_poly = self.archaeological_site.get_precise_polygons() if precise_poly: return precise_poly return self.operation.get_precise_polygons() def get_geo_items(self, rounded=True): dct = super(ContextRecord, self).get_geo_items(rounded=rounded) site = Site.objects.get_current() scheme = "https" if settings.ISHTAR_SECURE else "http" base_url = scheme + "://" + site.domain profile = get_current_profile() precision = profile.point_precision current_geodata = list(self.geodata.values_list("id", flat=True)) q = self.base_finds.filter(main_geodata__isnull=False) url = base_url + "/show-basefind/{}/" collection_finds = GeoVectorData._get_geo_item_list( q, current_geodata, url, precision, rounded ) dct["finds"] = { "type": "FeatureCollection", "features": collection_finds, } return dct @property def short_label(self): return settings.JOINT.join( [ str(item) for item in [self.operation.get_reference(), self.parcel, self.label] if item ] ) @property def town_label_with_areas(self): if not self.town: return "" return self.town.label_with_areas @property def relation_label(self): return self.label def all_base_finds(self): BaseFind = apps.get_model("archaeological_finds", "BaseFind") ids = [self.id] + [ cr.cr_id for cr in ContextRecordTree.objects.filter(cr_parent_id=self.id) ] return BaseFind.objects.filter(context_record_id__in=set(ids)) def get_values_for_datings(self, prefix=""): return [dating.get_values(prefix=prefix) for dating in self.datings.all()] @property def show_url(self): return reverse("show-contextrecord", args=[self.pk, ""]) def get_extra_actions(self, request): # url, base_text, icon, extra_text, extra css class, is a quick action actions = super().get_extra_actions(request) is_locked = hasattr(self, "is_locked") and self.is_locked(request.user) can_edit_cr = self.can_change(request) profile = get_current_profile() can_add_geo = can_edit_cr and profile.mapping and \ self.can_do(request, "ishtar_common.add_geovectordata") if can_add_geo: actions.append(self.get_add_geo_action()) can_create_find = self.can_do(request, "archaeological_finds.add_find") if can_create_find: actions += [ ( reverse("find_create", args=[self.pk]), _("Add find"), "fa fa-plus", _("find"), "", False, ), ] if can_edit_cr and not is_locked: actions += [ ( reverse("context-record-relation-modify", args=[self.pk]), _("Modify relations"), "fa fa-retweet", _("relations"), "", True, ), ] if can_edit_cr: actions += [ ( reverse("contextrecord-qa-duplicate", args=[self.pk]), _("Duplicate"), "fa fa-clone", "", "", True, ), ] return actions @classmethod def get_limit_to_area_query(cls, town_ids): return Q(operation__towns__pk__in=town_ids) @classmethod def get_query_owns(cls, ishtaruser): return ( cls._construct_query_own( cls, "operation__", Operation._get_query_owns_dicts(ishtaruser) ) | cls._construct_query_own( cls, "base_finds__find__basket__", [{"shared_with": ishtaruser, "shared_write_with": ishtaruser}], ) | cls._construct_query_own( cls, "", [ {"history_creator": ishtaruser.user_ptr}, {"operation__end_date__isnull": True}, ], ) ) @classmethod def get_owns( cls, user, menu_filtr=None, limit=None, values=None, get_short_menu_class=None, no_auth_check=False, query=False ): replace_query = None if menu_filtr and "operation" in menu_filtr: replace_query = Q(operation=menu_filtr["operation"]) owns = super(ContextRecord, cls).get_owns( user, replace_query=replace_query, limit=limit, values=values, get_short_menu_class=get_short_menu_class, no_auth_check=no_auth_check, query=query ) if query: return owns return cls._return_get_owns(owns, values, get_short_menu_class) def full_label(self): return str(self) def _real_label(self): if not self.operation.code_patriarche: return return settings.JOINT.join((self.operation.code_patriarche, self.label)) def _temp_label(self): if self.operation.code_patriarche: return return settings.JOINT.join( [ str(lbl) for lbl in [ self.operation.year, self.operation.operation_code, self.label, ] if lbl ] ) def _generate_cached_label(self): label = self._profile_generate_cached_label() if label: return label return self.full_label() def _generate_cached_periods(self): return " & ".join(dating.period.label for dating in self.datings.all()) def _generate_cached_related_context_records(self): return self.detailed_related_context_records() def _get_associated_cached_labels(self): BaseFind = apps.get_model("archaeological_finds", "BaseFind") Find = apps.get_model("archaeological_finds", "Find") return list(Find.objects.filter(base_finds__context_record=self).all()) + list( BaseFind.objects.filter(context_record=self).all() ) def _get_base_image_path(self): return self.operation._get_base_image_path() + "/{}/{}".format( self.SLUG, slugify(self.label or "00") ) @property def archaeological_site_reference(self): if self.archaeological_site: return self.archaeological_site.reference if self.operation.archaeological_sites.count(): return "-".join( a.reference for a in self.operation.archaeological_sites.all() ) return "" @property def reference(self): if not self.operation: return "00" return self.full_label() def get_department(self): if not self.operation: return "00" return self.operation.get_department() def get_town_label(self): if not self.operation: return "00" return self.operation.get_town_label() @classmethod def get_periods(cls, slice="year", fltr={}): q = cls.objects if fltr: q = q.filter(**fltr) if slice == "year": years = set() for res in list(q.values("operation__start_date")): if res["operation__start_date"]: yr = res["operation__start_date"].year years.add(yr) return list(years) return [] @classmethod def get_by_year(cls, year, fltr={}): q = cls.objects if fltr: q = q.filter(**fltr) return q.filter(operation__start_date__year=year) @classmethod def get_operations(cls): return [ dct["operation__pk"] for dct in cls.objects.values("operation__pk").distinct() ] @classmethod def get_by_operation(cls, operation_id): return cls.objects.filter(operation__pk=operation_id) @classmethod def get_total_number(cls, fltr={}): q = cls.objects if fltr: q = q.filter(**fltr) return q.count() def detailed_related_context_records(self): crs = [ "{} ({})".format(cr.right_record, cr.relation_type.get_tiny_label()) for cr in self.right_relations.all() ] return " & ".join(crs) def find_docs_q(self): return Document.objects.filter(finds__base_finds__context_record=self) def get_deleted_data(self) -> dict: """ Return sub object list that will be deleted :return: {"Sub object type": ["Sub object 1", "Sub object 2", ...]} """ if not self.base_finds.count(): return {} lbl = str(_("Base finds")) data = {lbl: []} for item in self.base_finds.all(): data[lbl].append(str(item)) for key, value in item.get_deleted_data().items(): if key not in data: data[key] = [] data[key] += value return data def fix(self): """ Fix redundant m2m dating association (usually after imports) """ Dating.fix_dating_association(self) def save(self, *args, **kwargs): super().save(*args, **kwargs) if (not self.town and self.parcel) or ( self.parcel and self.parcel.town != self.town ): self.town = self.parcel.town self.skip_history_when_saving = True self.save() def context_record_post_save(sender, **kwargs): cached_label_changed(sender=sender, **kwargs) post_save_geo(sender=sender, **kwargs) instance = kwargs.get("instance", None) if not instance or not instance.pk: return profile = get_current_profile() if profile.parent_relations_engine == "T": ContextRecordTree._update_self_relation(instance.pk) # on creation: manage self relation BaseFind = apps.get_model("archaeological_finds", "BaseFind") Find = apps.get_model("archaeological_finds", "Find") for bf in instance.base_finds.all(): cached_label_changed(BaseFind, instance=bf) for f in bf.find.all(): cached_label_changed(Find, instance=f) post_save.connect(context_record_post_save, sender=ContextRecord) m2m_changed.connect(document_attached_changed, sender=ContextRecord.documents.through) m2m_changed.connect(geodata_attached_changed, sender=ContextRecord.geodata.through) for attr in ContextRecord.HISTORICAL_M2M: m2m_changed.connect( m2m_historization_changed, sender=getattr(ContextRecord, attr).through ) class RelationType(GeneralRelationType): class Meta: verbose_name = _("Relation type") verbose_name_plural = _("Relation types") ordering = ("order", "label") class RecordRelationsManager(models.Manager): def get_by_natural_key(self, left_record, right_record, relation_type): return self.get( left_record__uuid=left_record, right_record__uuid=right_record, relation_type__txt_idx=relation_type, ) class RecordRelations(GeneralRecordRelations): MAIN_ATTR = "left_record" left_record = models.ForeignKey( ContextRecord, related_name="right_relations", on_delete=models.CASCADE ) right_record = models.ForeignKey( ContextRecord, related_name="left_relations", on_delete=models.CASCADE ) relation_type = models.ForeignKey(RelationType, on_delete=models.PROTECT) objects = RecordRelationsManager() TABLE_COLS = [ "left_record__label", "left_record__unit", "left_record__parcel", "relation_type", "right_record__label", "right_record__unit", "right_record__parcel", ] COL_LABELS = { "left_record__label": _("ID (left)"), "left_record__unit": _("Context record type (left)"), "left_record__parcel": _("Parcel (left)"), "left_record__description": _("Description (left)"), "left_record__datings__period": _("Periods (left)"), "relation_type": _("Relation type"), "right_record__label": _("ID (right)"), "right_record__unit": _("Context record type (right)"), "right_record__parcel": _("Parcel (right)"), "right_record__description": _("Description (right)"), "right_record__datings__period": _("Periods (right)"), } # search parameters EXTRA_REQUEST_KEYS = { "left_record__operation": "left_record__operation__pk" } class Meta: verbose_name = _("Record relation") verbose_name_plural = _("Record relations") permissions = [ ("view_recordrelation", "Can view all Context record relations"), ] def natural_key(self): return self.left_record.uuid, self.right_record.uuid, self.relation_type.txt_idx def post_delete_cr_record_relation(sender, instance, **kwargs): if getattr(sender, "_no_post_treatments", False): return post_delete_record_relation(sender, instance, **kwargs) ContextRecordTree.update(instance.left_record_id) ContextRecordTree.update(instance.right_record_id) def post_save_cr_record_relation(sender, instance, **kwargs): if getattr(sender, "_no_post_treatments", False): return ContextRecordTree.update(instance.left_record_id) ContextRecordTree.update(instance.right_record_id) post_delete.connect(post_delete_cr_record_relation, sender=RecordRelations) post_save.connect(post_save_cr_record_relation, sender=RecordRelations) class RecordRelationView(models.Model): CREATE_SQL = """ CREATE VIEW record_relations AS SELECT DISTINCT right_record_id as id, right_record_id, left_record_id, relation_type_id FROM archaeological_context_records_recordrelations; -- deactivate deletion CREATE RULE record_relations_del AS ON DELETE TO record_relations DO INSTEAD DELETE FROM record_relations where id=NULL; """ DELETE_SQL = """ DROP VIEW IF EXISTS record_relations; """ TABLE_COLS = [ "relation_type", "right_record__label", "right_record__unit", "right_record__parcel", "right_record__datings__period", "right_record__description", ] COL_LABELS = { "relation_type": _("Relation type"), "right_record__label": _("ID"), "right_record__unit": _("Context record type"), "right_record__parcel": _("Parcel"), "right_record__description": _("Description"), "right_record__datings__period": _("Periods"), } # search parameters EXTRA_REQUEST_KEYS = { "left_record_id": "left_record_id", "right_record__unit": "right_record__unit__label", } left_record = models.ForeignKey( ContextRecord, related_name="+", on_delete=models.DO_NOTHING ) right_record = models.ForeignKey( ContextRecord, related_name="+", on_delete=models.DO_NOTHING ) relation_type = models.ForeignKey( RelationType, related_name="+", on_delete=models.DO_NOTHING ) class Meta: managed = False db_table = "record_relations" unique_together = ("id", "right_record") permissions = [ ("view_recordrelation", "Can view all record relations - view"), ] @classmethod def general_types(cls): return [] def __str__(self): return '{} "{}"'.format(self.relation_type, self.right_record) class ContextRecordTree(RelationsViews): CREATE_SQL = """ CREATE VIEW cr_parent_relation_id AS SELECT id FROM archaeological_context_records_relationtype WHERE logical_relation in ('included', 'equal'); CREATE VIEW context_records_tree AS WITH RECURSIVE rel_tree AS ( SELECT cr.id AS cr_id, cr.id AS cr_parent_id, 1 AS level, cr.id || '_' || cr.id || '_1' AS key FROM archaeological_context_records_contextrecord cr UNION ALL SELECT rel.left_record_id AS cr_id, rel.right_record_id AS cr_parent_id, 1 AS level, rel.left_record_id || '_' || rel.right_record_id || '_1' AS key FROM archaeological_context_records_recordrelations rel WHERE rel.relation_type_id in ( SELECT id FROM cr_parent_relation_id ) UNION ALL SELECT p.cr_id AS cr_id, rel.right_record_id AS cr_parent_id, p.level + 1, p.cr_id || '_' || rel.right_record_id || '_' || p.level + 1 AS key FROM archaeological_context_records_recordrelations rel, rel_tree p WHERE rel.left_record_id = p.cr_parent_id AND rel.relation_type_id in ( SELECT id FROM cr_parent_relation_id ) AND p.level < 10 -- prevent recursive... ) SELECT DISTINCT key, cr_id, cr_parent_id, level FROM rel_tree; CREATE VIEW context_record_tree AS SELECT DISTINCT y.key, y.cr_id, y.cr_parent_id FROM (SELECT * FROM context_records_tree) y ORDER BY y.cr_id, y.cr_parent_id; -- deactivate deletion, update CREATE RULE context_records_tree_del AS ON DELETE TO context_records_tree DO INSTEAD DELETE FROM archaeological_context_records_contextrecord WHERE id=NULL; CREATE RULE context_record_tree_del AS ON DELETE TO context_record_tree DO INSTEAD DELETE FROM archaeological_context_records_contextrecord WHERE id=NULL; CREATE RULE context_records_tree_update AS ON UPDATE TO context_records_tree DO INSTEAD UPDATE archaeological_context_records_contextrecord SET id=id WHERE id=NULL; CREATE RULE context_record_tree_update AS ON UPDATE TO context_record_tree DO INSTEAD UPDATE archaeological_context_records_contextrecord SET id=id WHERE id=NULL; CREATE RULE context_records_tree_insert AS ON INSERT TO context_records_tree DO INSTEAD UPDATE archaeological_context_records_contextrecord SET id=id WHERE id=NULL; CREATE RULE context_record_tree_insert AS ON INSERT TO context_record_tree DO INSTEAD UPDATE archaeological_context_records_contextrecord SET id=id WHERE id=NULL; """ DELETE_SQL = """ DROP VIEW IF EXISTS context_record_tree; DROP VIEW IF EXISTS context_records_tree; DROP VIEW IF EXISTS cr_parent_relation_id; """ CREATE_TABLE_SQL = """ CREATE TABLE {table} ( key varchar(100) PRIMARY KEY, cr_id integer NOT NULL, cr_parent_id integer NOT NULL, CONSTRAINT fk1_{table} FOREIGN KEY(cr_id) REFERENCES {fk_table}(id) ON DELETE CASCADE, CONSTRAINT fk2_{table} FOREIGN KEY(cr_parent_id) REFERENCES {fk_table}(id) ON DELETE CASCADE ); CREATE INDEX {table}_id ON {table} (cr_id); CREATE INDEX {table}_parent_id ON {table} (cr_parent_id); """.format( table="context_records_tree", fk_table="archaeological_context_records_contextrecord", ) key = models.TextField(primary_key=True) cr = models.ForeignKey( "archaeological_context_records.ContextRecord", verbose_name=_("Context record"), related_name="context_record_tree_parent", on_delete=models.CASCADE, ) cr_parent = models.ForeignKey( "archaeological_context_records.ContextRecord", verbose_name=_("Context record parent"), related_name="context_record_tree_child", on_delete=models.CASCADE, ) class Meta: managed = False db_table = "context_records_tree" @classmethod def _save_tree(cls, tree): keys = [] for idx, parent_id in enumerate(tree[:-1]): for child_id in tree[idx:]: if child_id != parent_id: cls.objects.get_or_create( key=f"{child_id}_{parent_id}", cr_id=child_id, cr_parent_id=parent_id, ) keys.append((child_id, parent_id)) return keys @classmethod def _get_base_relations(cls): return RelationType.objects.filter( logical_relation__in=("included", "equal") ).values_list("id", flat=True) @classmethod def _get_base_equal_relations(cls): return RelationType.objects.filter(logical_relation="equal").values_list( "id", flat=True ) @classmethod def _get_base_included_relations(cls): return RelationType.objects.filter(logical_relation="included").values_list( "id", flat=True ) @classmethod def _get_base_children(cls): return ContextRecord.objects.values_list("id", flat=True) @classmethod def _update_child(cls, parent_id, tree, rel_types): whole_tree = set() children = list( RecordRelations.objects.values_list("left_record_id", flat=True).filter( right_record_id=parent_id, relation_type_id__in=rel_types ) ) to_be_pop = [] for idx, c in enumerate(children[:]): if c in tree: # cyclic to_be_pop.append(idx) for idx in reversed(to_be_pop): children.pop(idx) if not children: # last leaf in the tree return cls._save_tree(tree) for c in children: whole_tree.update(cls._update_child(c, tree[:] + [c], rel_types)) return whole_tree @classmethod def _get_parent_trees(cls, child_id, trees, rel_types, deep=0): parents = RecordRelations.objects.values_list( "right_record_id", flat=True ).filter(left_record_id=child_id, relation_type_id__in=rel_types) if not parents: return trees new_trees = [] for p in set(parents): if p == child_id or any(1 for tree in trees if p in tree): # cyclic continue c_trees = list(map(lambda x: x + [p], trees)) new_trees += cls._get_parent_trees(p, c_trees, rel_types, deep + 1) return new_trees @classmethod def _get_equals(cls, item_id, equal_rel_types, exclude=None): if not exclude: exclude = [item_id] q = RecordRelations.objects.values_list("right_record_id", flat=True).filter( left_record_id=item_id, relation_type_id__in=equal_rel_types ) q = q.exclude(right_record_id__in=exclude) equals = list(q) q = RecordRelations.objects.values_list("left_record_id", flat=True).filter( right_record_id=item_id, relation_type_id__in=equal_rel_types ) q = q.exclude(left_record_id__in=exclude) equals += list(q) exclude += equals for eq_id in equals: equals += cls._get_equals(eq_id, equal_rel_types, exclude=exclude) return equals @classmethod def _update_equals(cls, item_id, equals): keys = [] for equal_id in equals: if item_id != equal_id: cls.objects.get_or_create( key=f"{item_id}_{equal_id}", cr_id=item_id, cr_parent_id=equal_id ) keys.append((item_id, equal_id)) cls.objects.get_or_create( key=f"{equal_id}_{item_id}", cr_id=equal_id, cr_parent_id=item_id ) keys.append((equal_id, item_id)) return keys @classmethod def _update_relations_equals(cls, relations): equal_rel_types = cls._get_base_equal_relations() keys = [] for child_id, parent_id in relations: equals_child = set(cls._get_equals(child_id, equal_rel_types)) keys += cls._update_equals(child_id, equals_child) for alt_child in equals_child: if alt_child != child_id: cls.objects.get_or_create( key=f"{alt_child}_{parent_id}", cr_id=alt_child, cr_parent_id=parent_id, ) keys.append((alt_child, parent_id)) equals_parent = set(cls._get_equals(parent_id, equal_rel_types)) keys += cls._update_equals(parent_id, equals_parent) for alt_parent in equals_parent: if alt_parent != parent_id: cls.objects.get_or_create( key=f"{child_id}_{alt_parent}", cr_id=child_id, cr_parent_id=alt_parent, ) keys.append((child_id, alt_parent)) for alt_child in equals_child: if alt_child != child_id: cls.objects.get_or_create( key=f"{alt_child}_{alt_parent}", cr_id=alt_child, cr_parent_id=alt_parent, ) keys.append((alt_child, alt_parent)) return set(keys) @classmethod def _update_self_relation(cls, item_id): # add self relation cls.objects.get_or_create( key=f"{item_id}_{item_id}", cr_id=item_id, cr_parent_id=item_id ) @classmethod def _update(cls, item_id, already_updated=None): all_relations = set() if not ContextRecord.objects.filter(pk=item_id).count(): return if not already_updated: cls._update_self_relation(item_id) all_relations.add((item_id, item_id)) current_relations_as_child = list( cls.objects.filter(cr_id=item_id).values_list("cr_parent_id", flat=True) ) current_relations_as_parent = list( cls.objects.filter(cr_parent_id=item_id).values_list("cr_id", flat=True) ) ## update the whole tree inc_rel_types = cls._get_base_included_relations() # get first parents parent_ids = [ tree[-1] for tree in cls._get_parent_trees(item_id, [[item_id]], inc_rel_types) ] if not parent_ids: parent_ids = [item_id] # get all child for parents and save trees for parent_id in parent_ids: tree = [parent_id] all_relations.update(cls._update_child(parent_id, tree, inc_rel_types)) all_relations.update(cls._update_relations_equals(all_relations)) if not all_relations: equal_rel_types = cls._get_base_equal_relations() equals = set(cls._get_equals(item_id, equal_rel_types)) all_relations.update(cls._update_equals(item_id, equals)) ## delete old relations if not already_updated: already_updated = [item_id] for parent_id in current_relations_as_child: if ( item_id, parent_id, ) not in all_relations and parent_id not in already_updated: # disappeared - must regenerate already_updated.append(parent_id) cls.objects.filter(key=f"{item_id}_{parent_id}").delete() cls._update(parent_id, already_updated) for child_id in current_relations_as_parent: if ( child_id, item_id, ) not in all_relations and child_id not in already_updated: # disappeared - must regenerate already_updated.append(child_id) cls.objects.filter(key=f"{child_id}_{item_id}").delete() cls._update(child_id, already_updated)