#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2012-2017 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. from collections import OrderedDict import datetime import uuid from django.conf import settings from django.contrib.gis.db import models from django.contrib.postgres.indexes import GinIndex from django.core.exceptions import ObjectDoesNotExist from django.core.urlresolvers import reverse from django.db.models import Q, Max, Count from django.db.models.signals import post_save, post_delete, m2m_changed, pre_delete from django.template.defaultfilters import slugify from ishtar_common.utils import ugettext_lazy as _, pgettext_lazy from django.apps import apps from ishtar_common.data_importer import post_importer_action, pre_importer_action from ishtar_common.model_managers import ExternalIdManager, UUIDModelManager from ishtar_common.models import ValueGetter, get_current_profile from ishtar_common.models_common import ( GeneralType, LightHistorizedItem, OwnPerms, Address, post_save_cache, DashboardFormItem, document_attached_changed, SearchAltName, DynamicRequest, GeoItem, CompleteIdentifierItem, SearchVectorConfig, DocumentItem, QuickAction, MainItem, Merge, ) from ishtar_common.model_merging import merge_model_objects from ishtar_common.utils import ( cached_label_changed, cached_label_and_geo_changed, get_generated_id, ) from ishtar_common.data_importer import ImporterError class DivisionContainer(DashboardFormItem): DIVISION_TEMPLATE = """ {container} {ref}""" BASE_QUERY_LOCATION = "location" @property def pk(self): # of course implemented by models.Model raise NotImplemented def get_max_division_number(self): raise NotImplemented @property def start_division_number(self): raise NotImplemented @property def division_labels(self): if not self.get_max_division_number() + 1: return [] start = self.start_division_number return [ "{} {}".format(_("Level"), idx + start + 1) for idx in range(self.get_max_division_number() + 1) ] @property def number_divisions(self): q = { self.BASE_QUERY_LOCATION + "__id": self.pk, "container_type__stationary": True, } return Container.objects.filter(**q).count() @property def number_containers(self): q = { self.BASE_QUERY_LOCATION + "__id": self.pk, "container_type__stationary": False, } return Container.objects.filter(**q).count() @property def number_of_finds_hosted(self): Find = apps.get_model("archaeological_finds", "Find") q = { "container__{}__id".format(self.BASE_QUERY_LOCATION): self.pk, } return Find.objects.filter(**q).count() @property def number_of_finds(self): Find = apps.get_model("archaeological_finds", "Find") q = { "container_ref__{}__id".format(self.BASE_QUERY_LOCATION): self.pk, } return Find.objects.filter(**q).count() @property def number_of_containers(self): return Container.objects.filter(**{self.BASE_QUERY_LOCATION: self}).count() def _number_of_finds_by_place(self): Find = apps.get_model("archaeological_finds", "Find") return self._number_of_items_by_place( Find, division_key="inside_container__container__" ) @property def number_of_finds_by_place(self, update=False): return self._get_or_set_stats( "_number_of_finds_by_place", update, expected_type=list ) def _number_of_containers_by_place(self): return self._number_of_items_by_place( ContainerTree, "container_parent__", "container__children" ) @property def number_of_containers_by_place(self, update=False): return self._get_or_set_stats( "_number_of_containers_by_place", update, expected_type=list ) def _get_divisions(self, current_path, remaining_division, depth=0): if not remaining_division: return [current_path] remaining_division.pop(0) query_location = self.BASE_QUERY_LOCATION for __ in range(depth): query_location = "parent__" + query_location base_q = Container.objects.filter(**{query_location: self}) q = base_q if self.BASE_QUERY_LOCATION == "location": exclude = "parent_" for idx in range(depth): exclude += "_parent_" q = base_q.filter(**{exclude + "id": None}) elif not depth and not current_path: q = base_q.filter(parent_id=self.pk) for idx, p in enumerate(reversed(current_path)): parent_id, __ = p key = "parent__" * (idx + 1) + "id" q = q.filter(**{key: parent_id}) res = [] old_ref, ct = None, None if not q.count(): return [current_path] q = q.values( "id", "reference", "container_type__label", "container_type_id" ).order_by("container_type__label", "reference") for ref in q.all(): if ref["reference"] == old_ref and ref["container_type__label"] == ct: continue old_ref = ref["reference"] ct = ref["container_type__label"] cpath = current_path[:] lbl = self.DIVISION_TEMPLATE.format( id=ref["id"], container=ref["container_type__label"], ref=ref["reference"], ) cpath.append((ref["id"], lbl)) query = { "containers__parent__reference": ref["reference"], "containers__parent__container_type_id": ref["container_type_id"], "containers__" + self.BASE_QUERY_LOCATION: self, } remaining_division = list(ContainerType.objects.filter(**query).distinct()) for r in self._get_divisions(cpath, remaining_division[:], depth + 1): res.append(r) return res @property def available_division_tuples(self): """ :return: ordered list of available paths. Each path is a list of tuple with the container type and the full reference. """ q = {"containers__" + self.BASE_QUERY_LOCATION: self} if self.BASE_QUERY_LOCATION == "location": q["containers__parent"] = None top_divisions = list(ContainerType.objects.filter(**q).distinct()) divisions = self._get_divisions([], top_divisions) return divisions def _number_of_items_by_place(self, model, division_key, count_filter=None): res = {} paths = self.available_division_tuples[:] for path in paths: cpath = [] for container_id, lbl in path: cpath.append((container_id, lbl)) if tuple(cpath) in res: continue q = model.objects for idx, p in enumerate(reversed(cpath)): container_id, __ = p div_key = division_key + "parent__" * idx attrs = {div_key + "id": container_id} q = q.filter(**attrs) if count_filter: q = q.filter(**{count_filter: None}) res[tuple(cpath)] = q.distinct().count() res = [(k, res[k]) for k in res] final_res, current_res, depth = [], [], 1 len_divisions = self.get_max_division_number() + 1 for path, nb in sorted(res, key=lambda x: (len(x[0]), x[0])): if len(path) > len_divisions: continue if depth != len(path): final_res.append(current_res[:]) current_res = [] depth = len(path) if path[-1] == "-": continue path = [k[1] for k in path] path = path + ["" for __ in range(len_divisions - len(path))] current_res.append((path, nb)) final_res.append(current_res[:]) return final_res class WarehouseType(GeneralType): class Meta: verbose_name = _("Warehouse type") verbose_name_plural = _("Warehouse types") ordering = ("label",) post_save.connect(post_save_cache, sender=WarehouseType) post_delete.connect(post_save_cache, sender=WarehouseType) NO_DIVISION_ERROR = _("The division number {} has not been set for the warehouse {}.") class Warehouse( Address, DocumentItem, GeoItem, CompleteIdentifierItem, OwnPerms, MainItem, DivisionContainer, ValueGetter, ): SLUG = "warehouse" APP = "archaeological-warehouse" MODEL = "warehouse" SHOW_URL = "show-warehouse" DELETE_URL = "delete-warehouse" TABLE_COLS = ["name", "warehouse_type__label"] NEW_QUERY_ENGINE = True BASE_SEARCH_VECTORS = [ SearchVectorConfig("name"), SearchVectorConfig("warehouse_type__label"), SearchVectorConfig("external_id"), SearchVectorConfig("town"), SearchVectorConfig("precise_town__name"), SearchVectorConfig("comment", "local"), ] COL_LABELS = { "warehouse_type__label": _("Type"), } EXTRA_REQUEST_KEYS = { "warehouse_type__label": "warehouse_type__label", # used by dynamic_table_documents "person_in_charge__pk": "person_in_charge__pk", } # alternative names of fields for searches ALT_NAMES = { "name": SearchAltName( pgettext_lazy("key for text search", "name"), "name__iexact" ), "warehouse_type": SearchAltName( pgettext_lazy("key for text search", "type"), "warehouse_type__label__iexact", ), "town": SearchAltName( pgettext_lazy("key for text search", "town"), "precise_town__cached_label__iexact", ), } GEO_LABEL = "name" DOWN_MODEL_UPDATE = ["containers"] CACHED_LABELS = [] QA_LOCK = QuickAction( url="warehouse-qa-lock", icon_class="fa fa-lock", text=_("Lock/Unlock"), target="many", rights=["change_warehouse", "change_own_warehouse"], ) QUICK_ACTIONS = [QA_LOCK] objects = UUIDModelManager() uuid = models.UUIDField(default=uuid.uuid4) name = models.CharField(_("Name"), max_length=200) warehouse_type = models.ForeignKey(WarehouseType, verbose_name=_("Warehouse type")) person_in_charge = models.ForeignKey( "ishtar_common.Person", on_delete=models.SET_NULL, related_name="warehouse_in_charge", verbose_name=_("Person in charge"), null=True, blank=True, ) organization = models.ForeignKey( "ishtar_common.Organization", blank=True, null=True, related_name="warehouses", verbose_name=_("Organization"), on_delete=models.SET_NULL, ) comment = models.TextField(_("Comment"), blank=True, default="") associated_divisions = models.ManyToManyField( "WarehouseDivision", verbose_name=_("Divisions"), blank=True, through="WarehouseDivisionLink", ) documents = models.ManyToManyField( "ishtar_common.Document", related_name="warehouses", verbose_name=_("Documents"), blank=True, ) main_image = models.ForeignKey( "ishtar_common.Document", related_name="main_image_warehouses", on_delete=models.SET_NULL, verbose_name=_("Main image"), blank=True, null=True, ) external_id = models.TextField(_("External ID"), blank=True, default="") auto_external_id = models.BooleanField( _("External ID is set automatically"), default=False ) max_division_number = models.IntegerField( _("Maximum number of divisions"), default=0, help_text=_("Automatically generated"), ) SUB_ADDRESSES = ["organization", "person_in_charge"] class Meta: verbose_name = _("Warehouse") verbose_name_plural = _("Warehouses") permissions = ( ("view_warehouse", "Can view all Warehouses"), ("view_own_warehouse", "Can view own Warehouse"), ("add_own_warehouse", "Can add own Warehouse"), ("change_own_warehouse", "Can change own Warehouse"), ("delete_own_warehouse", "Can delete own Warehouse"), ) indexes = [ GinIndex(fields=["data"]), ] def __str__(self): return self.name def get_container_type_by_place(self, place: int): """ Container type by place based on the default organisation of the warehouse :param place: place number :return: container type, other location or None, None """ q = WarehouseDivisionLink.objects.filter(warehouse=self).order_by("order") previous_container_types = [] for idx, division_link in enumerate(q.all()): if idx == place: current_container_type = division_link.container_type break previous_container_types.append(division_link.container_type_id) else: return None, None return current_container_type, previous_container_types @post_importer_action def add_localisations(self, context, value): self._add_localisations(context, value) add_localisations.post_save = True def _add_localisations(self, context, value, return_errors=False): """ Add localisations for this warehouse. Get the default localisation types and set each reference from the value separated by ";" :param value: references of localisations separated by ; :param return_errors: return error message default is False :return: return an error message if return_errors set to True """ value = value.strip() if not value: if return_errors: return None, _("No value") return import_object = None if context and "import_object" in context: import_object = context["import_object"] TMP_SEMI_COLON = "|#|#|" value = value.replace("\\;", TMP_SEMI_COLON) # manage ";" used by a ref values = value.split(";") divisions = list( WarehouseDivisionLink.objects.filter(warehouse=self).order_by("order") ) parent = None for idx, value in enumerate(values): if idx >= len(divisions): if return_errors: return str( _( "{} values for only {} default divisions set for " "warehouse {}" ) ).format(len(values), len(divisions), self.name) return value = value.replace(TMP_SEMI_COLON, ";").strip() if not value or value == "-": continue parent, created = Container.objects.get_or_create( location=self, reference=value, container_type_id=divisions[idx].container_type_id, parent=parent, ) if created and import_object: parent.imports.add(import_object) @property def short_label(self): return self.name def natural_key(self): return (self.uuid,) def _get_base_image_path(self): return "{}/{}".format(self.SLUG, self.external_id) def create_attached_organization(self): """ Create an attached organization from warehouse fields """ dct_orga = {} for k in Address.FIELDS: dct_orga[k] = getattr(self, k) OrganizationType = apps.get_model("ishtar_common", "OrganizationType") q = OrganizationType.objects.filter(txt_idx="warehouse") if q.count(): orga_type = q.all()[0] else: orga_type, __ = OrganizationType.objects.get_or_create( txt_idx="undefined", defaults={"label": _("Undefined")} ) dct_orga["organization_type"] = orga_type dct_orga["name"] = self.name Organization = apps.get_model("ishtar_common", "Organization") orga = Organization.objects.create(**dct_orga) orga.save() # force duplicates self.organization = orga for k in Address.FIELDS: if k == "alt_address_is_prefered": setattr(self, k, False) elif k in ("precise_town", "email"): setattr(self, k, None) else: setattr(self, k, "") self.save() def get_max_division_number(self): return self.max_division_number @property def start_division_number(self): return 0 @property def default_location_types(self): return [ wd.container_type.label for wd in WarehouseDivisionLink.objects.filter(warehouse=self) .order_by("order") .all() if wd.container_type ] @property def associated_filename(self): return datetime.date.today().strftime("%Y-%m-%d") + "-" + slugify(str(self)) @classmethod def get_query_owns(cls, ishtaruser): return cls._construct_query_own("", cls._get_query_owns_dicts(ishtaruser)) @classmethod def _get_query_owns_dicts(cls, ishtaruser): return [{"person_in_charge__ishtaruser": ishtaruser}] def merge(self, item, keep_old=False): # do not recreate missing divisions available_divisions = [ wd.division for wd in WarehouseDivisionLink.objects.filter(warehouse=self) ] for container in list(item.containers.all()): container.location = self container.save() for loca in ContainerLocalisation.objects.filter(container=container).all(): if loca.division.division in available_divisions: div = WarehouseDivisionLink.objects.get( warehouse=self, division=loca.division.division ) ContainerLocalisation.objects.create( container=container, division=div, reference=loca.reference ) loca.delete() container.save() # force label regeneration for container in list(item.containers.all()): if Container.objects.filter(index=container.index, location=self).count(): container.index = ( Container.objects.filter(location=self) .exclude(id=container.id) .all() .aggregate(Max("index"))["index__max"] + 1 ) container.location = self container.save() for wdiv in WarehouseDivisionLink.objects.filter(warehouse=item).all(): wdiv.delete() merge_model_objects(self, item, keep_old=keep_old) def save(self, *args, **kwargs): self.update_search_vector() super(Warehouse, self).save(*args, **kwargs) self.skip_history_when_saving = True if not self.external_id or self.auto_external_id: external_id = get_generated_id("warehouse_external_id", self) if external_id != self.external_id: self.auto_external_id = True self.external_id = external_id self._cached_label_checked = False self.save() return m2m_changed.connect(document_attached_changed, sender=Warehouse.documents.through) post_save.connect(cached_label_and_geo_changed, sender=Warehouse) class WarehouseDivision(GeneralType): class Meta: verbose_name = _("Warehouse division type") verbose_name_plural = _("Warehouse division types") post_save.connect(post_save_cache, sender=WarehouseDivision) post_delete.connect(post_save_cache, sender=WarehouseDivision) class WarehouseDivisionLinkManager(models.Manager): def get_by_natural_key(self, warehouse, container_type): return self.get( warehouse__uuid=warehouse, container_type__txt_idx=container_type ) class ContainerType(GeneralType): stationary = models.BooleanField( _("Stationary"), default=False, help_text=_( "Container that will not usually be moved. Ex: building, " "room, span, shelf. Stationary containers are not automatically numbered." ), ) length = models.IntegerField(_("Length (mm)"), blank=True, null=True) width = models.IntegerField(_("Width (mm)"), blank=True, null=True) height = models.IntegerField(_("Height (mm)"), blank=True, null=True) volume = models.FloatField(_("Volume (l)"), blank=True, null=True) tare_weight = models.FloatField(_("Tare weight (g)"), blank=True, null=True) reference = models.CharField(_("Ref."), max_length=300, blank=True, null=True) order = models.IntegerField(_("Order"), default=10) class Meta: verbose_name = _("Container type") verbose_name_plural = _("Container types") ordering = ( "order", "label", ) post_save.connect(post_save_cache, sender=ContainerType) post_delete.connect(post_save_cache, sender=ContainerType) class WarehouseDivisionLink(models.Model): RELATED_SET_NAME = "divisions" RELATED_ATTRS = ["order", "container_type"] RELATIVE_MODELS = {Warehouse: "warehouse"} warehouse = models.ForeignKey(Warehouse, related_name="divisions") container_type = models.ForeignKey(ContainerType, blank=True, null=True) division = models.ForeignKey( WarehouseDivision, help_text=_("Deprecated - do not use"), blank=True, null=True ) order = models.IntegerField(_("Order"), default=10) objects = WarehouseDivisionLinkManager() class Meta: ordering = ("warehouse", "order") unique_together = ("warehouse", "division") def __str__(self): return "{} - {}".format(self.warehouse, self.container_type) def natural_key(self): return self.warehouse.uuid, self.container_type.txt_idx class ContainerTree(models.Model): CREATE_SQL = """ CREATE VIEW containers_tree AS WITH RECURSIVE rel_tree AS ( SELECT c.id AS container_id, c.parent_id AS container_parent_id, 1 AS level FROM archaeological_warehouse_container c WHERE c.parent_id is NOT NULL UNION ALL SELECT p.container_id AS container_id, c.parent_id as container_parent_id, p.level + 1 FROM archaeological_warehouse_container c, rel_tree p WHERE c.id = p.container_parent_id AND c.parent_id is NOT NULL AND p.level < 10 -- prevent recursive... ) SELECT DISTINCT container_id, container_parent_id, level FROM rel_tree; CREATE VIEW container_tree AS SELECT DISTINCT y.container_id, y.container_parent_id FROM (SELECT * FROM containers_tree) y ORDER BY y.container_id, y.container_parent_id; -- deactivate deletion, update CREATE RULE containers_tree_del AS ON DELETE TO containers_tree DO INSTEAD DELETE FROM archaeological_warehouse_container where id=NULL; CREATE RULE container_tree_del AS ON DELETE TO container_tree DO INSTEAD DELETE FROM archaeological_warehouse_container where id=NULL; CREATE RULE containers_tree_update AS ON UPDATE TO containers_tree DO INSTEAD UPDATE archaeological_warehouse_container set id=id WHERE id=NULL; CREATE RULE container_tree_update AS ON UPDATE TO container_tree DO INSTEAD UPDATE archaeological_warehouse_container set id=id WHERE id=NULL; CREATE RULE containers_tree_insert AS ON INSERT TO containers_tree DO INSTEAD UPDATE archaeological_warehouse_container set id=id WHERE id=NULL; CREATE RULE container_tree_insert AS ON INSERT TO container_tree DO INSTEAD UPDATE archaeological_warehouse_container set id=id WHERE id=NULL; """ DELETE_SQL = """ DROP VIEW IF EXISTS container_tree; DROP VIEW IF EXISTS containers_tree; """ container = models.OneToOneField( "archaeological_warehouse.Container", verbose_name=_("Container"), related_name="container_tree_child", primary_key=True, ) container_parent = models.ForeignKey( "archaeological_warehouse.Container", verbose_name=_("Container parent"), related_name="container_tree_parent", ) class Meta: managed = False db_table = "containers_tree" class Container( DocumentItem, Merge, LightHistorizedItem, CompleteIdentifierItem, GeoItem, OwnPerms, MainItem, DivisionContainer, ValueGetter, ): SLUG = "container" APP = "archaeological-warehouse" MODEL = "container" SHOW_URL = "show-container" DELETE_URL = "delete-container" NEW_QUERY_ENGINE = True TABLE_COLS = [ "container_type__label", "reference", "location__name", "cached_division", "old_reference", ] IMAGE_PREFIX = "containers/" BASE_SEARCH_VECTORS = [ SearchVectorConfig("reference"), SearchVectorConfig("container_type__label"), SearchVectorConfig("cached_location"), SearchVectorConfig("old_reference"), SearchVectorConfig("comment", "local"), ] M2M_SEARCH_VECTORS = [ SearchVectorConfig("division__reference"), SearchVectorConfig("division__division__division__label"), ] PARENT_SEARCH_VECTORS = ["parent"] STATISTIC_MODALITIES_OPTIONS = OrderedDict( [ ("location__name", _("Location (warehouse)")), ("responsibility__name", _("Responsibility (warehouse)")), ( "finds__base_finds__context_record__operation__cached_label", _("Operation"), ), ("container_type__label", _("Container type")), ] ) STATISTIC_MODALITIES = [key for key, lbl in STATISTIC_MODALITIES_OPTIONS.items()] GET_VALUES_EXCLUDE_FIELDS = ValueGetter.GET_VALUES_EXCLUDE_FIELDS + [ "inside_container", "parent", ] # search parameters EXTRA_REQUEST_KEYS = { "location": "location__pk", "location__name": "location__name", "location_id": "location__pk", "responsibility_id": "responsibility__pk", "container_type": "container_type__pk", "reference": "reference__icontains", "old_reference": "old_reference__icontains", "finds__base_finds__context_record__operation": "finds__base_finds__context_record__operation", "finds__base_finds__context_record": "finds__base_finds__context_record", "finds": "finds", "container_type__label": "container_type__label", # dynamic tables "container_tree_child__container_parent__id": "container_tree_child__container_parent__id", } COL_LABELS = { "cached_location": _("Location - index"), "cached_division": _("Precise localisation"), "container_type__label": _("Type"), "location__name": _("Warehouse"), } GEO_LABEL = "cached_label" CACHED_LABELS = [ "cached_division", "cached_label", "cached_location", "cached_weight", ] # alternative names of fields for searches ALT_NAMES = { "location_name": SearchAltName( pgettext_lazy("key for text search", "location"), "location__name__iexact" ), "responsibility_name": SearchAltName( pgettext_lazy("key for text search", "responsibility"), "responsibility__name__iexact", ), "container_type": SearchAltName( pgettext_lazy("key for text search", "type"), "container_type__label__iexact", ), "reference": SearchAltName( pgettext_lazy("key for text search", "reference"), "reference__iexact" ), "old_reference": SearchAltName( pgettext_lazy("key for text search", "old-reference"), "old_reference__iexact", ), "comment": SearchAltName( pgettext_lazy("key for text search", "comment"), "comment__iexact" ), "operation_town": SearchAltName( pgettext_lazy("key for text search", "operation-town"), "finds__base_finds__context_record__operation__" "towns__cached_label__iexact", ), "operation_scientist": SearchAltName( pgettext_lazy("key for text search", "operation-scientist"), "finds__base_finds__context_record__operation__" "scientist__cached_label__iexact", ), "code_patriarche": SearchAltName( pgettext_lazy("key for text search", "code-patriarche"), "finds__base_finds__context_record__operation__" "code_patriarche__iexact", ), "archaeological_sites": SearchAltName( pgettext_lazy("key for text search", "site"), "finds__base_finds__context_record__operation__" "archaeological_sites__cached_label__icontains", ), "archaeological_sites_name": SearchAltName( pgettext_lazy("key for text search", "site-name"), "finds__base_finds__context_record__operation__" "archaeological_sites__name__iexact", ), "archaeological_sites_context_record": SearchAltName( pgettext_lazy("key for text search", "context-record-site"), "finds__base_finds__context_record__archaeological_site__" "cached_label__icontains", ), "archaeological_sites_context_record_name": SearchAltName( pgettext_lazy("key for text search", "context-record-site-name"), "finds__base_finds__context_record__archaeological_site__" "name__iexact", ), "context_record": SearchAltName( pgettext_lazy("key for text search", "context-record"), "finds__base_finds__context_record__cached_label__icontains", ), "find_label": SearchAltName( pgettext_lazy("key for text search", "find-label"), "finds__label__icontains", ), "find_denomination": SearchAltName( pgettext_lazy("key for text search", "find-denomination"), "finds__denomination__icontains", ), "material_types": SearchAltName( pgettext_lazy("key for text search", "material"), "finds__material_types__label__iexact", ), "object_types": SearchAltName( pgettext_lazy("key for text search", "object-type"), "finds__object_types__label__iexact", ), "preservation_to_considers": SearchAltName( pgettext_lazy("key for text search", "preservation"), "finds__preservation_to_considers__label__iexact", ), "conservatory_state": SearchAltName( pgettext_lazy("key for text search", "conservatory"), "finds__conservatory_state__label__iexact", ), "integrities": SearchAltName( pgettext_lazy("key for text search", "integrity"), "finds__integrities__label__iexact", ), "remarkabilities": SearchAltName( pgettext_lazy("key for text search", "remarkability"), "finds__remarkabilities__label__iexact", ), "alterations": SearchAltName( pgettext_lazy("key for text search", "alterations"), "finds__alterations__label__iexact", ), "alteration_causes": SearchAltName( pgettext_lazy("key for text search", "alteration-causes"), "finds__alteration_causes__label__iexact", ), "treatment_emergency": SearchAltName( pgettext_lazy("key for text search", "treatment-emergency"), "finds__treatment_emergency__label__iexact", ), "description": SearchAltName( pgettext_lazy("key for text search", "find-description"), "finds__description__iexact", ), "empty": SearchAltName(pgettext_lazy("key for text search", "empty"), "finds"), "parent": SearchAltName( pgettext_lazy("key for text search", "parent-container"), "parent__cached_label__iexact" ), "contain_containers": SearchAltName( pgettext_lazy("key for text search", "contain-containers"), "children__isnull", ), "is_stationary": SearchAltName( pgettext_lazy("key for text search", "is-stationary"), "container_type__stationary", ), } REVERSED_BOOL_FIELDS = [ "children__isnull", "documents__image__isnull", "documents__associated_file__isnull", "documents__associated_url__isnull", ] BOOL_FIELDS = ["container_type__stationary"] REVERSED_MANY_COUNTED_FIELDS = ["finds", "finds_ref"] ALT_NAMES.update(LightHistorizedItem.ALT_NAMES) ALT_NAMES.update(DocumentItem.ALT_NAMES) DYNAMIC_REQUESTS = { "division": DynamicRequest( label=_("Division -"), app_name="archaeological_warehouse", model_name="WarehouseDivision", form_key="division", search_key=pgettext_lazy("key for text search", "division"), type_query="division__division__division__txt_idx", search_query="division__reference__iexact", ), } QA_EDIT = QuickAction( url="container-qa-bulk-update", icon_class="fa fa-pencil", text=_("Bulk update"), target="many", rights=["change_container", "change_own_container"], ) QA_LOCK = QuickAction( url="container-qa-lock", icon_class="fa fa-lock", text=_("Lock/Unlock"), target="many", rights=["change_container", "change_own_container"], ) QUICK_ACTIONS = [QA_EDIT, QA_LOCK] BASE_QUERY_LOCATION = "container_tree_child__container_parent" objects = UUIDModelManager() # fields uuid = models.UUIDField(default=uuid.uuid4) location = models.ForeignKey( Warehouse, verbose_name=_("Location (warehouse)"), related_name="containers" ) responsible = models.ForeignKey( Warehouse, verbose_name=_("Responsible warehouse"), related_name="owned_containers", blank=True, null=True, help_text=_("Deprecated - do not use"), ) responsibility = models.ForeignKey( Warehouse, verbose_name=_("Responsibility"), related_name="responsibilities", blank=True, null=True, help_text=_("Warehouse that owns the container"), ) container_type = models.ForeignKey( ContainerType, verbose_name=_("Container type"), related_name="containers" ) reference = models.TextField(_("Container ref.")) comment = models.TextField(_("Comment"), blank=True, default="") cached_label = models.TextField( _("Localisation"), blank=True, default="", db_index=True ) cached_location = models.TextField( _("Cached location"), blank=True, default="", db_index=True ) cached_division = models.TextField( _("Cached division"), blank=True, default="", db_index=True ) parent = models.ForeignKey( "Container", verbose_name=_("Parent container"), on_delete=models.SET_NULL, related_name="children", blank=True, null=True, ) index = models.IntegerField(_("Container ID"), blank=True, null=True, db_index=True) weight = models.FloatField(_("Measured weight (g)"), blank=True, null=True) calculated_weight = models.FloatField( _("Calculated weight (g)"), blank=True, null=True ) cached_weight = models.FloatField( _("Cached weight (g)"), blank=True, null=True, help_text=_("Entered weight if available otherwise calculated weight."), ) old_reference = models.TextField(_("Old reference"), blank=True, default="") external_id = models.TextField(_("External ID"), blank=True, default="") auto_external_id = models.BooleanField( _("External ID is set automatically"), default=False ) documents = models.ManyToManyField( "ishtar_common.Document", related_name="containers", verbose_name=_("Documents"), blank=True, ) main_image = models.ForeignKey( "ishtar_common.Document", related_name="main_image_containers", on_delete=models.SET_NULL, verbose_name=_("Main image"), blank=True, null=True, ) DISABLE_POLYGONS = False MERGE_ATTRIBUTE = "get_merge_key" MERGE_STRING_FIELDS = ["old_reference"] class Meta: verbose_name = _("Container") verbose_name_plural = _("Containers") ordering = ( "location", "index", "cached_label", ) unique_together = [("location", "container_type", "parent", "reference")] permissions = ( ("view_container", "Can view all Containers"), ("view_own_container", "Can view own Container"), ("add_own_container", "Can add own Container"), ("change_own_container", "Can change own Container"), ("delete_own_container", "Can delete own Container"), ) indexes = [ GinIndex(fields=["data"]), ] def __str__(self): return self.cached_label or "" @property def start_division_number(self): depth = 1 parent = self.parent parents = [] while parent and parent.pk not in parents: parents.append(parent.pk) depth += 1 parent = parent.parent return depth def get_max_division_number(self): return self.location.get_max_division_number() - self.start_division_number @property def number_of_finds_hosted(self): count = super(Container, self).number_of_finds_hosted Find = apps.get_model("archaeological_finds", "Find") count += Find.objects.filter(container_id=self.pk).count() return count @property def number_of_finds(self): count = super(Container, self).number_of_finds Find = apps.get_model("archaeological_finds", "Find") count += Find.objects.filter(container_ref_id=self.pk).count() return count @property def name(self): return "{} - {}".format(self.container_type.name, self.reference) @property def short_label(self): return "{} {}".format(self.container_type.label, self.reference) @property def parent_external_id(self): if not self.parent: return self.location.external_id return self.parent.external_id @property def calculated_weight_kg(self): if self.calculated_weight: return self.calculated_weight / 1000 @property def weight_kg(self): if self.weight: return self.weight / 1000 def natural_key(self): return (self.uuid,) @classmethod @pre_importer_action def import_get_location(cls, context, value): if context.get("container_type", None) and context.get("reference", None): try: context["location"] = Warehouse.objects.get(external_id=value) return except Warehouse.DoesNotExist: pass for k in list(context.keys()): if k != "import_get_location": context.pop(k) def _generate_cached_label(self): return self.precise_location def _generate_cached_location(self): if not self.index: return self.location.name items = [self.location.name, str(self.index)] cached_label = " - ".join(items) return cached_label def _generate_cached_weight(self): return self.weight if self.weight else self.calculated_weight def _calculate_weight(self) -> bool: """ Calculate the weight of the contained finds + tare weight of the container :return: True if calculated weight is changed """ profile = get_current_profile() if ( profile.calculate_weight_on_full and self.finds.filter(weight__isnull=True).count() ): weight = None else: weight = sum( w for w in self.finds.filter(weight__isnull=False) .values_list("weight", flat=True) .all() ) weight += ( self.container_type.tare_weight if self.container_type.tare_weight else 0 ) if weight != self.calculated_weight: self.calculated_weight = weight return True return False @property def get_calculated_weight_percent(self): if not self.calculated_weight or not self.weight: return 0 return (self.calculated_weight - self.weight) / self.calculated_weight * 100 @property def get_cached_division(self): return self._generate_cached_division() @property def get_merge_key(self): try: return str(self.location.uuid) + "|" + self._generate_cached_division() except Warehouse.DoesNotExist: return def put_document_by_key(self, value, key): Document = apps.get_model("ishtar_common", "Document") try: doc = Document.objects.get(**{key: value}) except Document.DoesNotExist: raise ImporterError( str(_("Document with {}: {} does not " "exists")).format(key, value) ) except Document.MultipleObjectsReturned: raise ImporterError( str(_("Multiple document with {}: {}")).format(key, value) ) doc.container_id = self.pk doc.container_ref_id = self.pk doc.skip_history_when_saving = True doc.save() @post_importer_action def put_document_by_external_id(self, context, value): self.put_document_by_key(value, "external_id") put_document_by_external_id.post_save = True @post_importer_action def put_document_by_reference(self, context, value): self.put_document_by_key(value, "reference") put_document_by_reference.post_save = True @post_importer_action def put_document_by_internal_reference(self, context, value): self.put_document_by_key(value, "internal_reference") put_document_by_internal_reference.post_save = True @post_importer_action def put_document_by_complete_identifier(self, context, value): self.put_document_by_key(value, "complete_identifier") put_document_by_complete_identifier.post_save = True def _generate_cached_division(self): parents = [] parent = self.parent c_ids = [] while parent: if parent.id in c_ids: # prevent cyclic break c_ids.append(parent.id) parents.append(parent) parent = parent.parent locas = [ "{} {}".format(loca.container_type.name, loca.reference) for loca in reversed(parents) ] try: locas.append("{} {}".format(self.container_type.name, self.reference)) except ObjectDoesNotExist: # generate too early on item creation pass return " | ".join(locas) def _get_base_image_path(self): return self.location._get_base_image_path() + "/" + self.external_id def _prevent_parent_infinite_loop(self) -> bool: """ Check there is no infinite loop in parents. If a loop is detected, set the parent to null. :return: True if parent has been changed """ parent = self.parent parents = [] while parent: if parent.id in parents: self.parent = None # break the loop arbitrary return True parents.append(parent.id) parent = parent.parent return False @classmethod def _change_child_location(cls, parent): for child in ( cls.objects.filter(parent=parent).exclude(location=parent.location).all() ): if child.location != parent.location: child.location = parent.location child.save() cls._change_child_location(child) def merge(self, item, keep_old=False, exclude_fields=None): # merge child containers child_references = {} for child in Container.objects.filter(parent=self).all(): key = (child.reference, child.container_type.pk) child_references[key] = child for child in Container.objects.filter(parent=item).all(): key = (child.reference, child.container_type.pk) if key in child_references.keys(): # parent field can cause integrity error before the end of # the merge child_references[key].index = None child_references[key].merge(child, exclude_fields=["parent"]) else: child.parent = self child.save() self._change_child_location(self) super(Container, self).merge( item, keep_old=keep_old, exclude_fields=exclude_fields ) @classmethod def get_query_owns(cls, ishtaruser): return Q(history_creator=ishtaruser.user_ptr) | Q( location__person_in_charge__ishtaruser=ishtaruser ) def get_precise_points(self): precise_points = super(Container, self).get_precise_points() if precise_points: return precise_points return self.location.get_precise_points() def get_town_centroid(self): return self.location.get_town_centroid() def get_town_polygons(self): return self.location.get_town_polygons() def contained_documents(self): if not self.pk: return Document = apps.get_model("ishtar_common", "Document") return Document.objects.filter(container_id=self.pk) def contained_documents_ref(self): if not self.pk: return Document = apps.get_model("ishtar_common", "Document") return Document.objects.filter(container_ref_id=self.pk) @property def associated_filename(self): filename = datetime.date.today().strftime("%Y-%m-%d") filename += "-" + self.reference filename += "-" + self.location.name filename += "-" + str(self.index) if self.cached_division is None: self.skip_history_when_saving = True self.save() if self.cached_division: filename += "-" + self.cached_division return slugify(filename) @property def precise_location(self): location = self.location.name if self.cached_division: location += " " + self.cached_division return location def get_localisations(self): """ Get precise localisation of the container in the warehouse. :return: tuple of strings with localisations """ localisations = [] parent = self.parent while parent: localisations.append(parent) parent = parent.parent localisations.append(self.location) return reversed(localisations) def get_localisation(self, place): locas = list(self.get_localisations()) if len(locas) < (place + 1): return "" return locas[place] @property def localisation_1(self): return self.get_localisation(1) @property def localisation_2(self): return self.get_localisation(2) @property def localisation_3(self): return self.get_localisation(3) @property def localisation_4(self): return self.get_localisation(4) @property def localisation_5(self): return self.get_localisation(5) @property def localisation_6(self): return self.get_localisation(6) @property def localisation_7(self): return self.get_localisation(7) @property def localisation_8(self): return self.get_localisation(8) @property def localisation_9(self): return self.get_localisation(9) def set_localisation(self, place, value, static=False, return_errors=False): """ Set the reference for the localisation number "place" (starting from 0) :param place: the number of the localisation :param value: the reference to be set :param static: do not create new containers :param return_errors: return error message :return: the container location object or None if the place does not exist - return also error message if return_errors set to True """ value = value.strip() if not value or value == "-": if return_errors: return None, _("No value") return error_msg = str(NO_DIVISION_ERROR).format(place + 1, self.location) ( current_container_type, previous_container_types, ) = self.location.get_container_type_by_place(place) if not current_container_type: # no division link set at this place if return_errors: return None, error_msg return # modify existing ## first localisation is the warehouse current_localisations = list(self.get_localisations())[1:] current_localisation, current_parent = None, None for loca in current_localisations: if loca.container_type == current_container_type: if loca.reference == value: current_localisation = loca break elif loca.container_type_id in previous_container_types: current_parent = loca if not current_localisation: dct = { "reference": value, "container_type": current_container_type, "parent": current_parent, "location": self.location, } q = Container.objects.filter(**dct) if q.count(): current_localisation = q.all()[0] else: if static: if return_errors: error_msg = str( _("The division {} {} do not exist for the " "location {}.") ).format(current_container_type, value, self.location) return None, error_msg return current_localisation = Container.objects.create(**dct) self.parent = current_localisation self.save() if return_errors: return current_localisation, None return current_localisation def set_static_localisation(self, place, value): return self.set_localisation(place, value, static=True) @post_importer_action def set_localisation_1(self, context, value): return self.set_localisation(0, value) set_localisation_1.post_save = True @post_importer_action def set_localisation_2(self, context, value): return self.set_localisation(1, value) set_localisation_2.post_save = True @post_importer_action def set_localisation_3(self, context, value): return self.set_localisation(2, value) set_localisation_3.post_save = True @post_importer_action def set_localisation_4(self, context, value): return self.set_localisation(3, value) set_localisation_4.post_save = True @post_importer_action def set_localisation_5(self, context, value): return self.set_localisation(4, value) set_localisation_5.post_save = True @post_importer_action def set_localisation_6(self, context, value): return self.set_localisation(5, value) set_localisation_6.post_save = True @post_importer_action def set_localisation_7(self, context, value): return self.set_localisation(6, value) set_localisation_7.post_save = True @post_importer_action def set_localisation_8(self, context, value): return self.set_localisation(7, value) set_localisation_8.post_save = True @post_importer_action def set_localisation_9(self, context, value): return self.set_localisation(8, value) set_localisation_9.post_save = True @post_importer_action def set_static_localisation_1(self, context, value): return self.set_static_localisation(0, value) set_static_localisation_1.post_save = True @post_importer_action def set_static_localisation_2(self, context, value): return self.set_static_localisation(1, value) set_static_localisation_2.post_save = True @post_importer_action def set_static_localisation_3(self, context, value): return self.set_static_localisation(2, value) set_static_localisation_3.post_save = True @post_importer_action def set_static_localisation_4(self, context, value): return self.set_static_localisation(3, value) set_static_localisation_4.post_save = True @post_importer_action def set_static_localisation_5(self, context, value): return self.set_static_localisation(4, value) set_static_localisation_5.post_save = True @post_importer_action def set_static_localisation_6(self, context, value): return self.set_static_localisation(5, value) set_static_localisation_6.post_save = True @post_importer_action def set_static_localisation_7(self, context, value): return self.set_static_localisation(6, value) set_static_localisation_7.post_save = True @post_importer_action def set_static_localisation_8(self, context, value): return self.set_static_localisation(7, value) set_static_localisation_8.post_save = True @post_importer_action def set_static_localisation_9(self, context, value): return self.set_static_localisation(8, value) set_static_localisation_9.post_save = True DOC_VALUES = [ ( "operation_", _( "Associated operation - use it with caution, " "only return the first found operation" ), ), ( "context_record_", _( "Associated context record - use it with caution, " "only return the first found context_record" ), ), ("material_types", _("Material types inside the container - string")), ("material_types_code", _("Material types code - string")), ("finds", _("List of associated finds")), ] def get_material_types_code(self) -> str: """ Return pipe separated material type code inside a container """ materials = set() for material in self.finds.exclude( material_types__code__isnull=True ).values_list("material_types__code", flat=True): materials.add(material) return "|".join(sorted(materials)) def get_material_types(self) -> str: """ Return comma separated string of material types inside a container """ materials = set() for material in self.finds.exclude( material_types__label__isnull=True ).values_list("material_types__label", flat=True): materials.add(material) return ", ".join(sorted(materials)) def get_values(self, prefix="", no_values=False, filtr=None, **kwargs): values = super(Container, self).get_values( prefix=prefix, no_values=no_values, filtr=filtr, **kwargs ) from_find = prefix.startswith("container_") or prefix.startswith( "container_ref_" ) base_exclude = [prefix + "container", prefix + "container_ref"] if ( (not filtr or prefix + "finds" in filtr) and not from_find and "finds" not in kwargs.get("exclude", []) ): kwargs["exclude"] = base_exclude[:] kwargs["no_base_finds"] = True # prevent recursive call values[prefix + "finds"] = [ f.get_values(prefix=prefix, no_values=True, filtr=filtr, **kwargs) for f in self.finds.distinct().all() ] if not self.finds.count(): return values operation_in_filter = filtr and any( k for k in filtr if k.startswith(prefix + "operation") ) cr_in_filter = filtr and any( k for k in filtr if k.startswith(prefix + "context_record") ) if not filtr or prefix + "material_types" in filtr: values[prefix + "material_types"] = self.get_material_types() if not filtr or prefix + "material_types_code" in filtr: values[prefix + "material_types_code"] = self.get_material_types_code() if not from_find and (not filtr or operation_in_filter or cr_in_filter): # assume that only one operation is in this container... # you should know what you are doing when using theses variables f = self.finds.all()[0] bf = f.get_first_base_find() if bf: cr = bf.context_record if not filtr or cr_in_filter: kwargs["exclude"] = base_exclude[:] + [prefix + "operation"] kwargs["force_no_base_finds"] = True for k, v in cr.get_values( prefix=prefix, no_values=True, filtr=None, **kwargs ).items(): values[prefix + "context_record_" + k] = v if not filtr or operation_in_filter: kwargs["exclude"] = base_exclude[:] + [prefix + "context_records"] for k, v in cr.operation.get_values( prefix=prefix, no_values=True, filtr=None, **kwargs ).items(): values[prefix + "operation_" + k] = v return values def get_extra_actions(self, request): """ extra actions for the sheet template """ # url, base_text, icon, extra_text, extra css class, is a quick action actions = super(Container, self).get_extra_actions(request) can_edit_find = self.can_do(request, "change_find") if can_edit_find: actions += [ ( reverse("container-add-treatment", args=[self.pk]), _("Add treatment"), "fa fa-flask", "", "", False, ), ] return actions def pre_save(self): if self.parent == self: self.parent = None if not self.responsibility_id and not self.responsibility: if self.location_id: self.responsibility_id = self.location_id else: try: self.responsibility = self.location except Warehouse.DoesNotExist: return if self.container_type.stationary: return q = Container.objects.filter(index=self.index, location=self.location) if self.id: q = q.exclude(id=self.id) if self.index and not q.count(): return q = Container.objects.filter(location=self.location, index__isnull=False) self.index = ( int(q.all().aggregate(Max("index"))["index__max"]) + 1 if q.count() else 1 ) if not self.cached_division: self.cached_division = self._generate_cached_division() def _update_warehouse_max_division(self): number = 0 parent = self.parent_id while parent: number += 1 parent = Container.objects.filter(pk=parent).values_list("parent_id")[0][0] if number > self.location.max_division_number: self.location.max_division_number = number self.location.save() def save(self, *args, **kwargs): self.pre_save() super(Container, self).save(*args, **kwargs) self._change_child_location(self) updated = False updated += self._prevent_parent_infinite_loop() self._update_warehouse_max_division() updated += self._calculate_weight() if not self.index and not self.container_type.stationary: self.skip_history_when_saving = True q = ( Container.objects.filter(location=self.location, index__isnull=False) .exclude(pk=self.pk) .order_by("-index") ) self.index = q.all()[0].index + 1 if q.count() else 1 updated = True self.skip_history_when_saving = True if not self.external_id or self.auto_external_id: external_id = get_generated_id("container_external_id", self) if external_id != self.external_id: updated = True self.auto_external_id = True self.external_id = external_id if updated: self._cached_label_checked = False self.save() return # remove old location in warehouse q = ContainerLocalisation.objects.filter(container=self).exclude( division__warehouse=self.location ) for loca in q.all(): loca.delete() def container_post_save(sender, **kwargs): cached_label_and_geo_changed(sender=sender, **kwargs) # TODO: to be deleted??? """ if not kwargs.get('instance'): return instance = kwargs.get('instance') for loca in ContainerLocalisation.objects.filter( container=instance).exclude( division__warehouse=instance.location).all(): q = WarehouseDivisionLink.objects.filter( warehouse=instance.location, division=loca.division.division ) if not q.count(): continue loca.division = q.all()[0] loca.save() """ def container_pre_delete(sender, **kwargs): instance = kwargs["instance"] q = Container.objects.filter(container_tree_child__container_parent=instance) q.update(cached_division="") def container_post_delete(sender, **kwargs): instance = kwargs["instance"] q = Container.objects.filter(cached_division="", location=instance.location) for c in q.all(): c.save() post_save.connect(container_post_save, sender=Container) pre_delete.connect(container_pre_delete, sender=Container) post_delete.connect(container_post_delete, sender=Container) m2m_changed.connect(document_attached_changed, sender=Container.documents.through) class ContainerLocalisationManager(models.Manager): # TODO: to be deleted.... def get_by_natural_key(self, container, warehouse, container_type): return self.get( container__uuid=container, division__warehouse__uuid=warehouse, division__container_type__txt_idx=container_type, ) class ContainerLocalisation(models.Model): # TODO: to be deleted.... container = models.ForeignKey( Container, verbose_name=_("Container"), related_name="division" ) division = models.ForeignKey(WarehouseDivisionLink, verbose_name=_("Division")) reference = models.CharField(_("Reference"), max_length=200, default="") objects = ContainerLocalisationManager() TO_BE_DELETED = True class Meta: verbose_name = _("Container localisation") verbose_name_plural = _("Container localisations") ordering = ("container", "division__order") def __str__(self): return " - ".join((str(self.container), str(self.division), self.reference)) def natural_key(self): return ( self.container.uuid, self.division.warehouse.uuid, self.division.container_type.txt_idx, ) def save(self, *args, **kwargs): super(ContainerLocalisation, self).save(*args, **kwargs) self.container.skip_history_when_saving = True cached_label_changed(Container, instance=self.container, force_update=True)