diff options
Diffstat (limited to 'ishtar_common')
-rw-r--r-- | ishtar_common/models_common.py | 61 | ||||
-rw-r--r-- | ishtar_common/tests.py | 102 | ||||
-rw-r--r-- | ishtar_common/utils.py | 192 |
3 files changed, 162 insertions, 193 deletions
diff --git a/ishtar_common/models_common.py b/ishtar_common/models_common.py index 19ee2c613..d32f835bd 100644 --- a/ishtar_common/models_common.py +++ b/ishtar_common/models_common.py @@ -2581,6 +2581,47 @@ class GeoVectorData(Imported, OwnPerms): post_save.connect(post_save_geodata, sender=GeoVectorData) +def geodata_attached_changed(sender, **kwargs): + # manage main geoitem and cascade association + instance = kwargs.get("instance", None) + model = kwargs.get("model", None) + pk_set = kwargs.get("pk_set", None) + if not instance or not model or not pk_set: + return + item_pks = list(model.objects.filter(pk__in=pk_set).values_list("pk", flat=True)) + if not item_pks: + return + + if not hasattr(instance, "_geodata"): # use a cache to manage during geodata attach + instance._geodata = [] + if not instance.main_geodata_id: + instance.main_geodata_id = item_pks[0] + instance.skip_history_when_saving = True + instance._no_move = True + if not hasattr(instance, "_geodata"): + instance._geodata = [] + instance._geodata += [pk for pk in item_pks if pk not in instance._geodata] + instance.save() + + # for all sub item verify that the geo items are present + for query in instance.geodata_child_item_queries(): + child_model = query.model + m2m_model = child_model.geodata.through + m2m_key = f"{child_model._meta.model_name}_id" + geoitems = {} + for child_id in query.values_list("id", flat=True): + child = None + for pk in item_pks: + q = m2m_model.objects.filter(**{m2m_key: child_id, + "geovectordata_id": pk}) + if not q.count(): + if not child: + child = model.objects.get(pk=pk) + if not pk in geoitems: + geoitems[pk] = GeoVectorData.objects.get(pk=pk) + child_model.objects.get(pk=child_id).geodata.add(geoitems[pk]) + + class GeographicItem(models.Model): main_geodata = models.ForeignKey( GeoVectorData, @@ -2596,6 +2637,16 @@ class GeographicItem(models.Model): class Meta: abstract = True + def geodata_child_item_queries(self): + """ + :return: list of queries associated geographically with this item. When + geographic data is add to this item all sub items get the geographic data. + For instance an operation return the list of context records associated, so + when you add the syrvey limit, it is associated to all context records of + the operation. + """ + return [] + def save( self, force_insert=False, force_update=False, using=None, update_fields=None ): @@ -2605,8 +2656,12 @@ class GeographicItem(models.Model): using=using, update_fields=update_fields, ) - if self.main_geodata and not self.geodata.filter(pk=self.main_geodata.pk): + if not hasattr(self, "_geodata"): # use a cache to manage during geodata attach + self._geodata = [] + if self.main_geodata and not self.geodata.filter(pk=self.main_geodata.pk) and\ + self.main_geodata.pk not in self._geodata: self.geodata.add(self.main_geodata) + self._geodata.append(self.main_geodata.pk) elif not self.main_geodata and self.geodata.count(): # arbitrary associate the first to geodata self.main_geodata = self.geodata.order_by("pk").all()[0] @@ -2707,6 +2762,9 @@ class Town(GeographicItem, Imported, models.Model): def __str__(self): return self.cached_label or "" + def geodata_child_item_queries(self): + return [self.sites, self.operations] + @property def label_with_areas(self): label = [self.name] @@ -2808,6 +2866,7 @@ def post_save_town(sender, **kwargs): post_save.connect(post_save_town, sender=Town) +m2m_changed.connect(geodata_attached_changed, sender=Town.geodata.through) def town_child_changed(sender, **kwargs): diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py index ae115f654..74e746c61 100644 --- a/ishtar_common/tests.py +++ b/ishtar_common/tests.py @@ -3316,6 +3316,108 @@ class GeoVectorFormTest(TestCase): ) +class GeoVectorTest(TestCase): + fixtures = FIND_FIXTURES + + def setUp(self): + # accounts + self.username, self.password, self.user = create_superuser() + + # operation, context record, find + operation_type = models.OperationType.objects.get(txt_idx="arch_diagnostic") + data = { + "year": 2010, + "operation_type_id": operation_type.pk, + "history_modifier": self.user, + "code_patriarche": 99999 + } + Operation = apps.get_model("archaeological_operations", "Operation") + self.operation = Operation.objects.create(**data) + data = { + "operation_id": self.operation.pk, + "label": "Context record", + "history_modifier": self.user, + } + ContextRecord = apps.get_model("archaeological_context_records", + "ContextRecord") + self.context_record = ContextRecord.objects.create(**data) + data = { + "context_record_id": self.context_record.pk, + "label": "Find", + "history_modifier": self.user, + } + BaseFind = apps.get_model("archaeological_finds", "BaseFind") + self.base_find = BaseFind.objects.create(**data) + + # for geodata + self.ct = ContentType.objects.get_for_model(models.GeoVectorData) + self.origin = models.GeoOriginType.objects.get( + txt_idx="georeferencement", + ) + self.data_type = models.GeoDataType.objects.get( + txt_idx="operation-center", + ) + self.provider = models.GeoProviderType.objects.get( + txt_idx="france-ign", + ) + self.srs, _ = models.SpatialReferenceSystem.objects.get_or_create( + label="EPSG-27572", + txt_idx="epsg-27572", + srid=2154 + ) + self.app_source = "archaeological_operations" + self.model_source = "operation" + self.source_pk = self.operation.pk + self.source_content_type_pk = ContentType.objects.get( + app_label=self.app_source, + model=self.model_source + ).pk + + def _reinit_objects(self): + # get object from db + Operation = apps.get_model("archaeological_operations", "Operation") + self.operation = Operation.objects.get(pk=self.operation.pk) + ContextRecord = apps.get_model("archaeological_context_records", + "ContextRecord") + self.context_record = ContextRecord.objects.get(pk=self.context_record.pk) + BaseFind = apps.get_model("archaeological_finds", "BaseFind") + self.base_find = BaseFind.objects.get(pk=self.base_find.pk) + + def _create_geodata(self): + return models.GeoVectorData.objects.create( + source_content_type_id=self.source_content_type_pk, + source_id=self.source_pk, + name="Test geo", + origin=self.origin, + data_type=self.data_type, + provider=self.provider, + comment="This is a comment." + ) + + def test_cascade_add(self): + self.assertIsNone(self.operation.main_geodata) + self.assertEqual(self.operation.geodata.count(), 0) + self.assertIsNone(self.context_record.main_geodata) + self.assertEqual(self.context_record.geodata.count(), 0) + self.assertIsNone(self.base_find.main_geodata) + self.assertEqual(self.base_find.geodata.count(), 0) + + geo_vector = self._create_geodata() + self.operation.geodata.add(geo_vector) + + self._reinit_objects() + self.assertEqual(self.operation.geodata.count(), 1) + self.assertEqual(self.operation.main_geodata, geo_vector) + self.assertEqual(self.context_record.geodata.count(), 1) + self.assertEqual(self.context_record.main_geodata, geo_vector) + self.assertEqual(self.base_find.geodata.count(), 1) + self.assertEqual(self.base_find.main_geodata, geo_vector) + + # test geo item remove + # test town add + # test town remove + + class NewItems(TestCase): fixtures = COMMON_FIXTURES diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py index a99a40d02..6ffda0eb6 100644 --- a/ishtar_common/utils.py +++ b/ishtar_common/utils.py @@ -871,198 +871,6 @@ def _post_save_geo(sender, **kwargs): instance.cascade_update() cache_key, __ = get_cache(sender, ["post_save_geo", instance.pk]) cache.set(cache_key, None, settings.CACHE_TASK_TIMEOUT) - - return - # TODO to delete - - kls_name = instance.__class__.__name__ - - if not profile.locate_warehouses and ( - "Container" in kls_name or "Warehouse" in kls_name - ): - return - - if getattr(instance, "_post_saved_geo", False): - return - - # print(sender, "post_save_geo") - - current_source = "default" - if hasattr(instance.__class__, "_meta"): - current_source = str(instance.__class__._meta.verbose_name) - - modified = False - if hasattr(instance, "multi_polygon") and not getattr( - instance, "DISABLE_POLYGONS", False - ): - if ( - instance.multi_polygon_source_item - and instance.multi_polygon_source_item != current_source - ): # refetch - instance.multi_polygon = None - instance.multi_polygon_source = None - modified = True - - if instance.multi_polygon and not instance.multi_polygon_source: - # should be a db source - instance.multi_polygon_source = "P" - instance.multi_polygon_source_item = current_source - elif instance.multi_polygon_source != "P": - precise_poly = instance.get_precise_polygons() - if precise_poly: - poly, source_item = precise_poly - instance.multi_polygon = poly - instance.multi_polygon_source = "P" - instance.multi_polygon_source_item = source_item - modified = True - elif profile.use_town_for_geo: - poly = instance.get_town_polygons() - if poly: - poly, poly_source = poly - if poly != instance.multi_polygon: - instance.multi_polygon_source_item = poly_source - instance.multi_polygon_source = "T" # town - try: - instance.multi_polygon = poly - modified = True - except TypeError: - print(instance, instance.pk) - - if ( - instance.point_source_item and instance.point_source_item != current_source - ) or ( - instance.point_source == "M" - ): # refetch - csrs = instance.spatial_reference_system - - if instance.x and instance.y: - new_point = GEOSGeometry( - "POINT({} {})".format(instance.x, instance.y), srid=csrs.srid - ) - if instance.point_2d: - proj_point = instance.point_2d.transform(csrs.srid, clone=True) - if new_point.distance(proj_point) < 0.01: - instance.x, instance.y = None, None - instance.point, instance.point_2d = None, None - instance.point_source = None - - point = instance.point - point_2d = instance.point_2d - - if ( - (point or point_2d) and instance.x is None and not instance.point_source - ): # db source - if point: - current_point = point - instance.z = point.z - else: - current_point = point_2d - instance.x = current_point.x - instance.y = current_point.y - srs = get_srid_obj_from_point(current_point) - instance.spatial_reference_system = srs - instance.point_source = "P" - instance.point_source_item = current_source - if not point_2d: - instance.point_2d = convert_coordinates_to_point( - instance.point.x, instance.point.y, srid=current_point.srid - ) - modified = True - elif ( - instance.x - and instance.y - and instance.spatial_reference_system - and instance.spatial_reference_system.auth_name == "EPSG" - and instance.spatial_reference_system.srid != 0 - ): - # form input or already precise - try: - point_2d = convert_coordinates_to_point( - instance.x, instance.y, srid=instance.spatial_reference_system.srid - ) - except forms.ValidationError: - return # irrelevant data in DB - distance = 1 # arbitrary - if point_2d and instance.point_2d: - distance = point_2d.transform(4326, clone=True).distance( - instance.point_2d.transform(4326, clone=True) - ) - - if instance.z: - point = convert_coordinates_to_point( - instance.x, - instance.y, - instance.z, - srid=instance.spatial_reference_system.srid, - ) - - # no change if distance inf to 1 mm - if distance >= 0.0001 and ( - point_2d != instance.point_2d or point != instance.point - ): - instance.point = point - instance.point_2d = point_2d - instance.point_source = "P" - instance.point_source_item = current_source - modified = True - else: - instance.point_source = None - # get coordinates from parents - precise_points = instance.get_precise_points() - if not (instance.multi_polygon and instance.multi_polygon_source == "P") and \ - precise_points: - point_2d, point, source_item = precise_points - instance.point_2d = point_2d - instance.point = point - instance.point_source = "P" - instance.point_source_item = source_item - instance.x = point_2d.x - instance.y = point_2d.y - if point: - instance.z = point.z - srs = get_srid_obj_from_point(point_2d) - instance.spatial_reference_system = srs - modified = True - else: - centroid, source, point_source = None, None, None - if instance.multi_polygon and instance.multi_polygon_source == "P": - source = current_source - centroid = instance.multi_polygon.centroid - point_source = "M" - if not centroid and profile.use_town_for_geo: # try to get from - # parent - town_centroid = instance.get_town_centroid() - if town_centroid: - centroid, source = town_centroid - point_source = "T" - if centroid: - instance.point_2d, instance.point_source_item = centroid, source - instance.point = None - instance.point_source = point_source - instance.x = instance.point_2d.x - instance.y = instance.point_2d.y - srs = get_srid_obj_from_point(instance.point_2d) - instance.spatial_reference_system = srs - modified = True - else: - instance.point_2d, instance.point_source_item = None, None - instance.point = None - instance.point_source = None - modified = True - - if hasattr(instance, "need_update") and instance.need_update: - instance.need_update = False - modified = True - - if modified: - instance.skip_history_when_saving = True - instance._post_saved_geo = True - instance._cached_label_checked = False - instance.save() - if hasattr(instance, "cascade_update"): - instance.cascade_update() - cache_key, __ = get_cache(sender, ["post_save_geo", instance.pk]) - cache.set(cache_key, None, settings.CACHE_TASK_TIMEOUT) return |