diff options
Diffstat (limited to 'ishtar_common')
| -rw-r--r-- | ishtar_common/models_common.py | 222 | ||||
| -rw-r--r-- | ishtar_common/utils.py | 60 | 
2 files changed, 281 insertions, 1 deletions
| diff --git a/ishtar_common/models_common.py b/ishtar_common/models_common.py index 921b5d59b..5cbb5da1e 100644 --- a/ishtar_common/models_common.py +++ b/ishtar_common/models_common.py @@ -22,6 +22,7 @@ from django.apps import apps  from django.conf import settings  from django.contrib.auth.models import User, Group  from django.contrib.contenttypes.models import ContentType +from django.contrib.contenttypes.fields import GenericForeignKey  from django.contrib.gis.db import models  from django.contrib.gis.geos import Point  from django.contrib.gis.gdal.error import GDALException @@ -65,6 +66,7 @@ from ishtar_common.utils import (      merge_tsvectors,      cached_label_changed,      post_save_geo, +    post_save_geodata,      task,      duplicate_item,      get_generated_id, @@ -1230,6 +1232,7 @@ class HistoricalRecords(BaseHistoricalRecords):                      item = q.all()[0]                      if attr in item.history_m2m:                          return item.history_m2m[attr] +              return _get_history_m2m          def get_serialize_call(attr): @@ -1237,6 +1240,7 @@ class HistoricalRecords(BaseHistoricalRecords):                  q = model.objects.filter(pk=getattr(self, model._meta.pk.attname))                  if q.count():                      return getattr(q.all()[0], attr)() +              return _get_serialize_call          def get_serialize_properties(attr): @@ -1244,6 +1248,7 @@ class HistoricalRecords(BaseHistoricalRecords):                  q = model.objects.filter(pk=getattr(self, model._meta.pk.attname))                  if q.count():                      return getattr(q.all()[0], attr) +              return _get_serialize_properties          extra_fields = super().get_extra_fields(model, fields) @@ -2889,7 +2894,221 @@ post_save.connect(post_save_cache, sender=SpatialReferenceSystem)  post_delete.connect(post_save_cache, sender=SpatialReferenceSystem) +class GeoVectorData(models.Model): +    name = models.CharField(_("Name"), default=_("Default"), max_length=200) +    source_content_type = models.ForeignKey( +        ContentType, related_name="content_type_geovectordata", +        on_delete=models.CASCADE) +    source_id = models.PositiveIntegerField() +    source = GenericForeignKey('content_type', 'object_id') + +    x = models.FloatField(_("X"), blank=True, null=True, help_text=_("User input")) +    y = models.FloatField(_("Y"), blank=True, null=True, help_text=_("User input")) +    z = models.FloatField(_("Z"), blank=True, null=True, help_text=_("User input")) +    # x == cached_x if user input else get it from point_2d +    # cached is converted to the display SRID +    cached_x = models.FloatField(_("X (cached)"), blank=True, null=True) +    cached_y = models.FloatField(_("Y (cached)"), blank=True, null=True) +    cached_z = models.FloatField(_("Z (cached)"), blank=True, null=True) +    estimated_error_x = models.FloatField( +        _("Estimated error for X"), blank=True, null=True +    ) +    estimated_error_y = models.FloatField( +        _("Estimated error for Y"), blank=True, null=True +    ) +    estimated_error_z = models.FloatField( +        _("Estimated error for Z"), blank=True, null=True +    ) +    spatial_reference_system = models.ForeignKey( +        SpatialReferenceSystem, +        verbose_name=_("Spatial Reference System"), +        blank=True, +        null=True, +        on_delete=models.PROTECT, +    ) +    point = models.PointField(_("Point"), blank=True, null=True, dim=3) +    point_2d = models.PointField(_("Point (2D)"), blank=True, null=True) +    multi_polygon = models.MultiPolygonField(_("Multi polygon"), blank=True, null=True) +    need_update = models.BooleanField(_("Need update"), default=False) + +    class Meta: +        verbose_name = _("Geographic vector data") +        verbose_name_plural = _("Geographic vector data") + +    def display_coordinates(self, rounded=5, dim=2, cache=True): +        srid = None +        profile = get_current_profile() +        if profile.display_srs and profile.display_srs.srid: +            srid = profile.display_srs.srid +        return self.get_coordinates(rounded=rounded, srid=srid, dim=dim, cache=cache) + +    def get_coordinates(self, rounded=5, srid: int = None, dim=2, cache=False): +        if dim not in (2, 3): +            raise ValueError(_("Only 2 or 3 dimension")) +        if cache: +            coordinates = [self.cached_x, self.cached_y] +            if dim == 3: +                coordinates.append(self.cached_z) +        else: +            if self.x and self.y:  # user input +                if not srid or srid == self.spatial_reference_system.srid: +                    coordinates = [self.x, self.y] +                    if dim == 3: +                        coordinates.append(self.z) +                else: +                    args = {"x": self.x, "y": self.y, +                            "srid": self.spatial_reference_system.srid} +                    if dim == 3: +                        args["z"] = self.z +                    point = Point(**args).transform(srid, clone=True) +                    coordinates = [point.x, point.y] +                    if dim == 3: +                        coordinates.append(point.z) +            elif self.point_2d and dim == 2: +                point = self.point_2d.transform(srid, clone=True) +                coordinates = [point.x, point.y] +            elif self.point and dim == 3: +                point = self.point.transform(srid, clone=True) +                coordinates = [point.x, point.y, point.z] +            else: +                return +        if not rounded: +            return coordinates +        return [round(coord, rounded) for coord in coordinates] + +    def get_coordinates_from_polygon(self, rounded=5, srid: int = None): +        if self.multi_polygon: +            return self.convert_coordinates(self.multi_polygon.centroid, +                                            rounded=rounded, srid=srid) + +    def get_x(self, srid: int = None) -> float: +        coord = self.get_coordinates(srid) +        if coord: +            return coord[0] + +    def get_y(self, srid: int = None) -> float: +        coord = self.get_coordinates(srid) +        if coord: +            return coord[1] + +    def get_z(self, srid: int = None) -> float: +        coord = self.get_coordinates(srid, dim=3) +        if coord: +            return coord[2] + +    @property +    def display_spatial_reference_system(self): +        profile = get_current_profile() +        if not profile.display_srs or not profile.display_srs.srid: +            return self.spatial_reference_system +        return profile.display_srs + +    def get_geo_items(self, get_polygons, rounded=5): +        label = self.label if hasattr(self, "label") else self.short_label +        dct = {"type": "Feature", "geometry": {}, "properties": {"label": label}} +        if get_polygons: +            list_coords = [] +            if self.multi_polygon: +                for polygon in self.multi_polygon: +                    list_coords.append([]) +                    for linear_ring in range(len(polygon)): +                        list_coords[-1].append([]) +                        for coords in polygon[linear_ring].coords: +                            point_2d = Point( +                                coords[0], coords[1], srid=self.multi_polygon.srid +                            ) +                            list_coords[-1][linear_ring].append( +                                self.convert_coordinates(point_2d, rounded) +                            ) +            dct["geometry"]["type"] = "MultiPolygon" +            dct["geometry"]["coordinates"] = list_coords +        else: +            dct["geometry"]["type"] = "Point" +            coords = self.display_coordinates() +            if coords: +                dct["geometry"]["coordinates"] = coords +            elif self.multi_polygon: +                dct["geometry"]["coordinates"] = self.convert_coordinates( +                    self.multi_polygon.centroid, rounded +                ) +            else: +                return {} +        return dct + +    def convert_coordinates(self, point_2d, rounded=5, srid=None): +        if not srid: +            profile = get_current_profile() +            if profile.display_srs and profile.display_srs.srid: +                srid = profile.display_srs.srid +        if not srid: +            x, y = point_2d.x, point_2d.y +        else: +            point = point_2d.transform(srid, clone=True) +            x, y = point.x, point.y +        if rounded: +            return [round(x, rounded), round(y, rounded)] +        return [x, y] + +    def most_precise_geo(self): +        if self.multi_polygon: +            return "multi_polygon" +        if self.point_2d: +            return "point" + +    def _geojson_serialize(self, geom_attr): +        if not hasattr(self, geom_attr): +            return "" +        geojson = serialize( +            "geojson", +            self.__class__.objects.filter(pk=self.pk), +            geometry_field=geom_attr, +            fields=("name",), +        ) +        geojson_dct = json.loads(geojson) +        profile = get_current_profile() +        precision = profile.point_precision + +        features = geojson_dct.pop("features") +        for idx in range(len(features)): +            feature = features[idx] +            lbl = feature["properties"].pop("name") +            feature["properties"]["name"] = lbl +            feature["properties"]["id"] = self.pk +            if precision is not None: +                geom_type = feature["geometry"].get("type", None) +                if geom_type == "Point": +                    feature["geometry"]["coordinates"] = [ +                        round(coord, precision) +                        for coord in feature["geometry"]["coordinates"] +                    ] +        geojson_dct["features"] = features +        geojson_dct["link_template"] = simple_link_to_window(self).replace( +            "999999", "<pk>" +        ) +        geojson = json.dumps(geojson_dct) +        return geojson + +    @property +    def point_2d_geojson(self): +        return self._geojson_serialize("point_2d") + +    @property +    def multi_polygon_geojson(self): +        return self._geojson_serialize("multi_polygon") + + +post_save.connect(post_save_geodata, sender=GeoVectorData) + +  class GeoItem(models.Model): +    main_geodata = models.ForeignKey( +        GeoVectorData, on_delete=models.SET_NULL, blank=True, null=True, +        related_name="main_related_items_%(app_label)s_%(class)s" +    ) +    geodata = models.ManyToManyField( +        GeoVectorData, null=True, related_name="related_items_%(app_label)s_%(class)s" +    ) +      GEO_SOURCE = (("T", _("Town")), ("P", _("Precise")), ("M", _("Polygon")))      # gis @@ -3390,7 +3609,8 @@ class SerializeItem:                      ):                          # print(field.name, self.__class__, self)                          values = [ -                            v.full_serialize(search_model, recursion=True) for v in values.all() +                            v.full_serialize(search_model, recursion=True) +                            for v in values.all()                          ]                      else:                          if first_value in self.SERIALIZATION_FILES: diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py index 80d5af9d6..eab25a56f 100644 --- a/ishtar_common/utils.py +++ b/ishtar_common/utils.py @@ -740,6 +740,66 @@ def get_srid_obj_from_point(point):          ) +def post_save_geodata(sender, **kwargs): +    instance = kwargs.get("instance", None) +    if not instance: +        return +    if hasattr(instance, "_no_geo_check") and instance._no_geo_check: +        return +    if not settings.USE_BACKGROUND_TASK: +        return _post_save_geodata(sender, **kwargs) +    sender, kwargs = serialize_args_for_tasks( +        sender, instance, kwargs, EXTRA_KWARGS_TRIGGER +    ) +    task_item = _post_save_geodata.delay(sender, **kwargs) +    revoke_old_task(kwargs, "post_save_geodata", task_item.id, instance.__class__) +    return task_item + + +@task() +def _post_save_geodata(sender, **kwargs): +    """ +    Save cached_x, cached_y, cached_z using display srid +    """ +    sender, instance = deserialize_args_for_tasks(sender, kwargs, EXTRA_KWARGS_TRIGGER) +    if not instance: +        return + +    if getattr(instance, "_post_saved_geo", False): +        return + +    modified = False + +    cached_x, cached_y, cached_z = None, None, None +    coords = instance.display_coordinates(rounded=False, dim=3) +    if coords: +        cached_x, cached_y, cached_z = coords +    else: +        coords = instance.display_coordinates(rounded=False, dim=2) +        if not coords: +            coords = instance.get_coordinates_from_polygon(rounded=False) +        if coords: +            cached_x, cached_y = coords + +    if instance.cached_x != cached_x or instance.cached_y != cached_y \ +            or instance.cached_z != cached_z: +        modified = True +        instance.cached_x = cached_x +        instance.cached_y = cached_y +        instance.cached_z = cached_z + +    if hasattr(instance, "need_update") and instance.need_update: +        instance.need_update = False +        modified = True + +    if modified: +        instance._post_saved_geo = True +        instance.save() +    cache_key, __ = get_cache(sender, ("post_save_geo", instance.pk)) +    cache.set(cache_key, None, settings.CACHE_TASK_TIMEOUT) +    return + +  def post_save_geo(sender, **kwargs):      """      Convert raw x, y, z point to real geo field | 
