diff options
Diffstat (limited to 'ishtar_common')
| -rw-r--r-- | ishtar_common/models_common.py | 61 | ||||
| -rw-r--r-- | ishtar_common/tests.py | 102 | ||||
| -rw-r--r-- | ishtar_common/utils.py | 192 | 
3 files changed, 162 insertions, 193 deletions
| diff --git a/ishtar_common/models_common.py b/ishtar_common/models_common.py index 19ee2c613..d32f835bd 100644 --- a/ishtar_common/models_common.py +++ b/ishtar_common/models_common.py @@ -2581,6 +2581,47 @@ class GeoVectorData(Imported, OwnPerms):  post_save.connect(post_save_geodata, sender=GeoVectorData) +def geodata_attached_changed(sender, **kwargs): +    # manage main geoitem and cascade association +    instance = kwargs.get("instance", None) +    model = kwargs.get("model", None) +    pk_set = kwargs.get("pk_set", None) +    if not instance or not model or not pk_set: +        return +    item_pks = list(model.objects.filter(pk__in=pk_set).values_list("pk", flat=True)) +    if not item_pks: +        return + +    if not hasattr(instance, "_geodata"):  # use a cache to manage during geodata attach +        instance._geodata = [] +    if not instance.main_geodata_id: +        instance.main_geodata_id = item_pks[0] +        instance.skip_history_when_saving = True +        instance._no_move = True +        if not hasattr(instance, "_geodata"): +            instance._geodata = [] +        instance._geodata += [pk for pk in item_pks if pk not in instance._geodata] +        instance.save() + +    # for all sub item verify that the geo items are present +    for query in instance.geodata_child_item_queries(): +        child_model = query.model +        m2m_model = child_model.geodata.through +        m2m_key = f"{child_model._meta.model_name}_id" +        geoitems = {} +        for child_id in query.values_list("id", flat=True): +            child = None +            for pk in item_pks: +                q = m2m_model.objects.filter(**{m2m_key: child_id, +                                                "geovectordata_id": pk}) +                if not q.count(): +                    if not child: +                        child = model.objects.get(pk=pk) +                    if not pk in geoitems: +                        geoitems[pk] = GeoVectorData.objects.get(pk=pk) +                    child_model.objects.get(pk=child_id).geodata.add(geoitems[pk]) + +  class GeographicItem(models.Model):      main_geodata = models.ForeignKey(          GeoVectorData, @@ -2596,6 +2637,16 @@ class GeographicItem(models.Model):      class Meta:          abstract = True +    def geodata_child_item_queries(self): +        """ +        :return: list of queries associated geographically with this item. When +        geographic data is add to this item all sub items get the geographic data. +        For instance an operation return the list of context records associated, so +        when you add the syrvey limit, it is associated to all context records of +        the operation. +        """ +        return [] +      def save(          self, force_insert=False, force_update=False, using=None, update_fields=None      ): @@ -2605,8 +2656,12 @@ class GeographicItem(models.Model):              using=using,              update_fields=update_fields,          ) -        if self.main_geodata and not self.geodata.filter(pk=self.main_geodata.pk): +        if not hasattr(self, "_geodata"):  # use a cache to manage during geodata attach +            self._geodata = [] +        if self.main_geodata and not self.geodata.filter(pk=self.main_geodata.pk) and\ +                self.main_geodata.pk not in self._geodata:              self.geodata.add(self.main_geodata) +            self._geodata.append(self.main_geodata.pk)          elif not self.main_geodata and self.geodata.count():              # arbitrary associate the first to geodata              self.main_geodata = self.geodata.order_by("pk").all()[0] @@ -2707,6 +2762,9 @@ class Town(GeographicItem, Imported, models.Model):      def __str__(self):          return self.cached_label or "" +    def geodata_child_item_queries(self): +        return [self.sites, self.operations] +      @property      def label_with_areas(self):          label = [self.name] @@ -2808,6 +2866,7 @@ def post_save_town(sender, **kwargs):  post_save.connect(post_save_town, sender=Town) +m2m_changed.connect(geodata_attached_changed, sender=Town.geodata.through)  def town_child_changed(sender, **kwargs): diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py index ae115f654..74e746c61 100644 --- a/ishtar_common/tests.py +++ b/ishtar_common/tests.py @@ -3316,6 +3316,108 @@ class GeoVectorFormTest(TestCase):          ) +class GeoVectorTest(TestCase): +    fixtures = FIND_FIXTURES + +    def setUp(self): +        # accounts +        self.username, self.password, self.user = create_superuser() + +        # operation, context record, find +        operation_type = models.OperationType.objects.get(txt_idx="arch_diagnostic") +        data = { +            "year": 2010, +            "operation_type_id": operation_type.pk, +            "history_modifier": self.user, +            "code_patriarche": 99999 +        } +        Operation = apps.get_model("archaeological_operations", "Operation") +        self.operation = Operation.objects.create(**data) +        data = { +            "operation_id": self.operation.pk, +            "label": "Context record", +            "history_modifier": self.user, +        } +        ContextRecord = apps.get_model("archaeological_context_records", +                                       "ContextRecord") +        self.context_record = ContextRecord.objects.create(**data) +        data = { +            "context_record_id": self.context_record.pk, +            "label": "Find", +            "history_modifier": self.user, +        } +        BaseFind = apps.get_model("archaeological_finds", "BaseFind") +        self.base_find = BaseFind.objects.create(**data) + +        # for geodata +        self.ct = ContentType.objects.get_for_model(models.GeoVectorData) +        self.origin = models.GeoOriginType.objects.get( +            txt_idx="georeferencement", +        ) +        self.data_type = models.GeoDataType.objects.get( +            txt_idx="operation-center", +        ) +        self.provider = models.GeoProviderType.objects.get( +            txt_idx="france-ign", +        ) +        self.srs, _ = models.SpatialReferenceSystem.objects.get_or_create( +            label="EPSG-27572", +            txt_idx="epsg-27572", +            srid=2154 +        ) +        self.app_source = "archaeological_operations" +        self.model_source = "operation" +        self.source_pk = self.operation.pk +        self.source_content_type_pk = ContentType.objects.get( +            app_label=self.app_source, +            model=self.model_source +        ).pk + +    def _reinit_objects(self): +        # get object from db +        Operation = apps.get_model("archaeological_operations", "Operation") +        self.operation = Operation.objects.get(pk=self.operation.pk) +        ContextRecord = apps.get_model("archaeological_context_records", +                                       "ContextRecord") +        self.context_record = ContextRecord.objects.get(pk=self.context_record.pk) +        BaseFind = apps.get_model("archaeological_finds", "BaseFind") +        self.base_find = BaseFind.objects.get(pk=self.base_find.pk) + +    def _create_geodata(self): +        return models.GeoVectorData.objects.create( +            source_content_type_id=self.source_content_type_pk, +            source_id=self.source_pk, +            name="Test geo", +            origin=self.origin, +            data_type=self.data_type, +            provider=self.provider, +            comment="This is a comment." +        ) + +    def test_cascade_add(self): +        self.assertIsNone(self.operation.main_geodata) +        self.assertEqual(self.operation.geodata.count(), 0) +        self.assertIsNone(self.context_record.main_geodata) +        self.assertEqual(self.context_record.geodata.count(), 0) +        self.assertIsNone(self.base_find.main_geodata) +        self.assertEqual(self.base_find.geodata.count(), 0) + +        geo_vector = self._create_geodata() +        self.operation.geodata.add(geo_vector) + +        self._reinit_objects() +        self.assertEqual(self.operation.geodata.count(), 1) +        self.assertEqual(self.operation.main_geodata, geo_vector) +        self.assertEqual(self.context_record.geodata.count(), 1) +        self.assertEqual(self.context_record.main_geodata, geo_vector) +        self.assertEqual(self.base_find.geodata.count(), 1) +        self.assertEqual(self.base_find.main_geodata, geo_vector) + +        # test geo item remove +        # test town add +        # test town remove + +  class NewItems(TestCase):      fixtures = COMMON_FIXTURES diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py index a99a40d02..6ffda0eb6 100644 --- a/ishtar_common/utils.py +++ b/ishtar_common/utils.py @@ -871,198 +871,6 @@ def _post_save_geo(sender, **kwargs):              instance.cascade_update()      cache_key, __ = get_cache(sender, ["post_save_geo", instance.pk])      cache.set(cache_key, None, settings.CACHE_TASK_TIMEOUT) - -    return -    # TODO to delete - -    kls_name = instance.__class__.__name__ - -    if not profile.locate_warehouses and ( -        "Container" in kls_name or "Warehouse" in kls_name -    ): -        return - -    if getattr(instance, "_post_saved_geo", False): -        return - -    # print(sender, "post_save_geo") - -    current_source = "default" -    if hasattr(instance.__class__, "_meta"): -        current_source = str(instance.__class__._meta.verbose_name) - -    modified = False -    if hasattr(instance, "multi_polygon") and not getattr( -        instance, "DISABLE_POLYGONS", False -    ): -        if ( -            instance.multi_polygon_source_item -            and instance.multi_polygon_source_item != current_source -        ):  # refetch -            instance.multi_polygon = None -            instance.multi_polygon_source = None -            modified = True - -        if instance.multi_polygon and not instance.multi_polygon_source: -            # should be a db source -            instance.multi_polygon_source = "P" -            instance.multi_polygon_source_item = current_source -        elif instance.multi_polygon_source != "P": -            precise_poly = instance.get_precise_polygons() -            if precise_poly: -                poly, source_item = precise_poly -                instance.multi_polygon = poly -                instance.multi_polygon_source = "P" -                instance.multi_polygon_source_item = source_item -                modified = True -            elif profile.use_town_for_geo: -                poly = instance.get_town_polygons() -                if poly: -                    poly, poly_source = poly -                    if poly != instance.multi_polygon: -                        instance.multi_polygon_source_item = poly_source -                        instance.multi_polygon_source = "T"  # town -                        try: -                            instance.multi_polygon = poly -                            modified = True -                        except TypeError: -                            print(instance, instance.pk) - -    if ( -        instance.point_source_item and instance.point_source_item != current_source -    ) or ( -        instance.point_source == "M" -    ):  # refetch -        csrs = instance.spatial_reference_system - -        if instance.x and instance.y: -            new_point = GEOSGeometry( -                "POINT({} {})".format(instance.x, instance.y), srid=csrs.srid -            ) -            if instance.point_2d: -                proj_point = instance.point_2d.transform(csrs.srid, clone=True) -                if new_point.distance(proj_point) < 0.01: -                    instance.x, instance.y = None, None -        instance.point, instance.point_2d = None, None -        instance.point_source = None - -    point = instance.point -    point_2d = instance.point_2d - -    if ( -        (point or point_2d) and instance.x is None and not instance.point_source -    ):  # db source -        if point: -            current_point = point -            instance.z = point.z -        else: -            current_point = point_2d -        instance.x = current_point.x -        instance.y = current_point.y -        srs = get_srid_obj_from_point(current_point) -        instance.spatial_reference_system = srs -        instance.point_source = "P" -        instance.point_source_item = current_source -        if not point_2d: -            instance.point_2d = convert_coordinates_to_point( -                instance.point.x, instance.point.y, srid=current_point.srid -            ) -        modified = True -    elif ( -        instance.x -        and instance.y -        and instance.spatial_reference_system -        and instance.spatial_reference_system.auth_name == "EPSG" -        and instance.spatial_reference_system.srid != 0 -    ): -        # form input or already precise -        try: -            point_2d = convert_coordinates_to_point( -                instance.x, instance.y, srid=instance.spatial_reference_system.srid -            ) -        except forms.ValidationError: -            return  # irrelevant data in DB -        distance = 1  # arbitrary -        if point_2d and instance.point_2d: -            distance = point_2d.transform(4326, clone=True).distance( -                instance.point_2d.transform(4326, clone=True) -            ) - -        if instance.z: -            point = convert_coordinates_to_point( -                instance.x, -                instance.y, -                instance.z, -                srid=instance.spatial_reference_system.srid, -            ) - -        # no change if distance inf to 1 mm -        if distance >= 0.0001 and ( -            point_2d != instance.point_2d or point != instance.point -        ): -            instance.point = point -            instance.point_2d = point_2d -            instance.point_source = "P" -            instance.point_source_item = current_source -            modified = True -    else: -        instance.point_source = None -        # get coordinates from parents -        precise_points = instance.get_precise_points() -        if not (instance.multi_polygon and instance.multi_polygon_source == "P") and \ -                precise_points: -            point_2d, point, source_item = precise_points -            instance.point_2d = point_2d -            instance.point = point -            instance.point_source = "P" -            instance.point_source_item = source_item -            instance.x = point_2d.x -            instance.y = point_2d.y -            if point: -                instance.z = point.z -            srs = get_srid_obj_from_point(point_2d) -            instance.spatial_reference_system = srs -            modified = True -        else: -            centroid, source, point_source = None, None, None -            if instance.multi_polygon and instance.multi_polygon_source == "P": -                source = current_source -                centroid = instance.multi_polygon.centroid -                point_source = "M" -            if not centroid and profile.use_town_for_geo:  # try to get from -                # parent -                town_centroid = instance.get_town_centroid() -                if town_centroid: -                    centroid, source = town_centroid -                    point_source = "T" -            if centroid: -                instance.point_2d, instance.point_source_item = centroid, source -                instance.point = None -                instance.point_source = point_source -                instance.x = instance.point_2d.x -                instance.y = instance.point_2d.y -                srs = get_srid_obj_from_point(instance.point_2d) -                instance.spatial_reference_system = srs -                modified = True -            else: -                instance.point_2d, instance.point_source_item = None, None -                instance.point = None -                instance.point_source = None -                modified = True - -    if hasattr(instance, "need_update") and instance.need_update: -        instance.need_update = False -        modified = True - -    if modified: -        instance.skip_history_when_saving = True -        instance._post_saved_geo = True -        instance._cached_label_checked = False -        instance.save() -        if hasattr(instance, "cascade_update"): -            instance.cascade_update() -    cache_key, __ = get_cache(sender, ["post_save_geo", instance.pk]) -    cache.set(cache_key, None, settings.CACHE_TASK_TIMEOUT)      return | 
