diff options
| author | Étienne Loks <etienne.loks@iggdrasil.net> | 2019-02-19 17:36:25 +0100 | 
|---|---|---|
| committer | Étienne Loks <etienne.loks@iggdrasil.net> | 2019-04-24 19:38:56 +0200 | 
| commit | 986511b2572acb369086ed613adc32ff1d1863a9 (patch) | |
| tree | 1ac2de99841b7a28fee128a68700f59458594f14 /ishtar_common/models.py | |
| parent | a6b1ad5c73d2a51ab2bf6ba1a3451d7298993722 (diff) | |
| download | Ishtar-986511b2572acb369086ed613adc32ff1d1863a9.tar.bz2 Ishtar-986511b2572acb369086ed613adc32ff1d1863a9.zip | |
Geo: manage container and warehouse - manage geo save dependencies
Diffstat (limited to 'ishtar_common/models.py')
| -rw-r--r-- | ishtar_common/models.py | 528 | 
1 files changed, 298 insertions, 230 deletions
| diff --git a/ishtar_common/models.py b/ishtar_common/models.py index 778461562..28e469e31 100644 --- a/ishtar_common/models.py +++ b/ishtar_common/models.py @@ -1612,23 +1612,96 @@ class DocumentItem(object):          return actions -class GeoItem(object): +class SpatialReferenceSystem(GeneralType): +    order = models.IntegerField(_(u"Order"), default=10) +    auth_name = models.CharField( +        _(u"Authority name"), default=u'EPSG', max_length=256) +    srid = models.IntegerField(_(u"Authority SRID")) + +    class Meta: +        verbose_name = _(u"Spatial reference system") +        verbose_name_plural = _(u"Spatial reference systems") +        ordering = ('label',) + + +post_save.connect(post_save_cache, sender=SpatialReferenceSystem) +post_delete.connect(post_save_cache, sender=SpatialReferenceSystem) + + +class GeoItem(models.Model):      GEO_SOURCE = ('T', _(u"Town")), ('P', _(u"Precise")) +    # gis +    x = models.FloatField(_(u'X'), blank=True, null=True) +    y = models.FloatField(_(u'Y'), blank=True, null=True) +    z = models.FloatField(_(u'Z'), blank=True, null=True) +    estimated_error_x = models.FloatField(_(u'Estimated error for X'), +                                          blank=True, null=True) +    estimated_error_y = models.FloatField(_(u'Estimated error for Y'), +                                          blank=True, null=True) +    estimated_error_z = models.FloatField(_(u'Estimated error for Z'), +                                          blank=True, null=True) +    spatial_reference_system = models.ForeignKey( +        SpatialReferenceSystem, verbose_name=_(u"Spatial Reference System"), +        blank=True, null=True) +    point = models.PointField(_(u"Point"), blank=True, null=True, dim=3) +    point_2d = models.PointField(_(u"Point (2D)"), blank=True, null=True) +    point_source = models.CharField( +        _(u"Point source"), choices=GEO_SOURCE, max_length=1, blank=True, +        null=True) +    point_source_item = models.CharField( +        _(u"Point source item"), max_length=100, blank=True, null=True) +    multi_polygon = models.MultiPolygonField(_(u"Multi polygon"), blank=True, +                                             null=True) +    multi_polygon_source = models.CharField( +        _(u"Multi-polygon source"), choices=GEO_SOURCE, max_length=1, +        blank=True, null=True) +    multi_polygon_source_item = models.CharField( +        _(u"Multi polygon source item"), max_length=100, blank=True, null=True) + +    GEO_LABEL = "" + +    class Meta: +        abstract = True + +    def get_town_centroid(self): +        raise NotImplementedError + +    def get_town_polygons(self): +        raise NotImplementedError + +    def get_precise_points(self): +        if self.point_source == 'P' and self.point_2d: +            return self.point_2d, self.point, self.point_source_item + +    def get_precise_polygons(self): +        if self.multi_polygon_source == 'P' and self.multi_polygon: +            return self.multi_polygon, self.multi_polygon_source_item +      def geo_point_source(self):          if not self.point_source:              return "" -        return dict(self.GEO_SOURCE)[self.point_source] +        src = u"{} - {}".format( +            dict(self.GEO_SOURCE)[self.point_source], +            self.point_source_item +        ) +        return src      def geo_polygon_source(self):          if not self.multi_polygon_source:              return "" -        return dict(self.GEO_SOURCE)[self.multi_polygon_source] +        src = u"{} - {}".format( +            dict(self.GEO_SOURCE)[self.multi_polygon_source], +            self.multi_polygon_source_item +        ) +        return src      def _geojson_serialize(self, geom_attr):          if not hasattr(self, geom_attr):              return ""          cached_label_key = 'cached_label' +        if self.GEO_LABEL: +            cached_label_key = self.GEO_LABEL          if getattr(self, "CACHED_LABELS", None):              cached_label_key = self.CACHED_LABELS[-1]          geojson = serialize( @@ -1658,7 +1731,7 @@ class GeoItem(object):          return self._geojson_serialize('multi_polygon') -class BaseHistorizedItem(DocumentItem, FullSearch, Imported, JsonData, GeoItem, +class BaseHistorizedItem(DocumentItem, FullSearch, Imported, JsonData,                           FixAssociated):      """      Historized item with external ID management. @@ -2883,13 +2956,225 @@ class Department(models.Model):          return res +class Arrondissement(models.Model): +    name = models.CharField(u"Nom", max_length=30) +    department = models.ForeignKey(Department, verbose_name=u"Département") + +    def __unicode__(self): +        return settings.JOINT.join((self.name, unicode(self.department))) + + +class Canton(models.Model): +    name = models.CharField(u"Nom", max_length=30) +    arrondissement = models.ForeignKey(Arrondissement, +                                       verbose_name=u"Arrondissement") + +    def __unicode__(self): +        return settings.JOINT.join( +            (self.name, unicode(self.arrondissement))) + + +class TownManager(models.GeoManager): +    def get_by_natural_key(self, numero_insee, year): +        return self.get(numero_insee=numero_insee, year=year) + + +class Town(Imported, models.Model): +    name = models.CharField(_(u"Name"), max_length=100) +    surface = models.IntegerField(_(u"Surface (m2)"), blank=True, null=True) +    center = models.PointField(_(u"Localisation"), srid=settings.SRID, +                               blank=True, null=True) +    limit = models.MultiPolygonField(_(u"Limit"), blank=True, null=True) +    numero_insee = models.CharField(u"Code commune (numéro INSEE)", +                                    max_length=120) +    departement = models.ForeignKey( +        Department, verbose_name=_(u"Department"), +        on_delete=models.SET_NULL, null=True, blank=True) +    year = models.IntegerField( +        _("Year of creation"), null=True, blank=True, +        help_text=_(u"Filling this field is relevant to distinguish old towns " +                    u"from new towns.")) +    children = models.ManyToManyField( +        'Town', verbose_name=_(u"Town children"), blank=True, +        related_name='parents') +    cached_label = models.CharField(_(u"Cached name"), max_length=500, +                                    null=True, blank=True, db_index=True) +    objects = TownManager() + +    class Meta: +        verbose_name = _(u"Town") +        verbose_name_plural = _(u"Towns") +        if settings.COUNTRY == 'fr': +            ordering = ['numero_insee'] +            unique_together = (('numero_insee', 'year'),) + +    def natural_key(self): +        return (self.numero_insee, self.year) + +    def history_compress(self): +        values = {'numero_insee': self.numero_insee, +                  'year': self.year or ""} +        return values + +    @classmethod +    def history_decompress(cls, full_value, create=False): +        if not full_value: +            return [] +        res = [] +        for value in full_value: +            try: +                res.append( +                    cls.objects.get(numero_insee=value['numero_insee'], +                                    year=value['year'] or None)) +            except cls.DoesNotExist: +                continue +        return res + +    def __unicode__(self): +        if self.cached_label: +            return self.cached_label +        self.save() +        return self.cached_label + +    @property +    def label_with_areas(self): +        label = [self.name] +        if self.numero_insee: +            label.append(u"({})".format(self.numero_insee)) +        for area in self.areas.all(): +            label.append(u" - ") +            label.append(area.full_label) +        return u" ".join(label) + +    def generate_geo(self, force=False): +        force = self.generate_limit(force=force) +        self.generate_center(force=force) +        self.generate_area(force=force) + +    def generate_limit(self, force=False): +        if not force and self.limit: +            return +        parents = None +        if not self.parents.count(): +            return +        for parent in self.parents.all(): +            if not parent.limit: +                return +            if not parents: +                parents = parent.limit +            else: +                parents = parents.union(parent.limit) +        # if union is a simple polygon make it a multi +        if 'MULTI' not in parents.wkt: +            parents = parents.wkt.replace('POLYGON', 'MULTIPOLYGON(') + ")" +        if not parents: +            return +        self.limit = parents +        self.save() +        return True + +    def generate_center(self, force=False): +        if not force and (self.center or not self.limit): +            return +        self.center = self.limit.centroid +        if not self.center: +            return False +        self.save() +        return True + +    def generate_area(self, force=False): +        if not force and (self.surface or not self.limit): +            return +        self.surface = self.limit.transform(settings.SURFACE_SRID, +                                            clone=True).area +        if not self.surface: +            return False +        self.save() +        return True + +    def update_town_code(self): +        if not self.numero_insee or not self.children.count() or not self.year: +            return +        old_num = self.numero_insee[:] +        numero = old_num.split('-')[0] +        self.numero_insee = u"{}-{}".format(numero, self.year) +        if self.numero_insee != old_num: +            return True + +    def _generate_cached_label(self): +        cached_label = self.name +        if settings.COUNTRY == "fr" and self.numero_insee: +            dpt_len = 2 +            if self.numero_insee.startswith('97') or \ +                    self.numero_insee.startswith('98') or \ +                    self.numero_insee[0] not in ('0', '1', '2', '3', '4', '5', +                                                 '6', '7', '8', '9'): +                dpt_len = 3 +            cached_label = u"%s - %s" % (self.name, self.numero_insee[:dpt_len]) +        if self.year and self.children.count(): +            cached_label += u" ({})".format(self.year) +        return cached_label + + +def post_save_town(sender, **kwargs): +    cached_label_changed(sender, **kwargs) +    town = kwargs['instance'] +    town.generate_geo() +    if town.update_town_code(): +        town.save() + + +post_save.connect(post_save_town, sender=Town) + + +def town_child_changed(sender, **kwargs): +    town = kwargs['instance'] +    if town.update_town_code(): +        town.save() + + +m2m_changed.connect(town_child_changed, sender=Town.children.through) + + +class Area(HierarchicalType): +    towns = models.ManyToManyField(Town, verbose_name=_(u"Towns"), blank=True, +                                   related_name='areas') +    reference = models.CharField(_(u"Reference"), max_length=200, blank=True, +                                 null=True) +    parent = models.ForeignKey( +        'self', blank=True, null=True, verbose_name=_(u"Parent"), +        help_text=_(u"Only four level of parent are managed."), +        related_name='children', on_delete=models.SET_NULL +    ) + +    class Meta: +        verbose_name = _(u"Area") +        verbose_name_plural = _(u"Areas") +        ordering = ('label',) + +    def __unicode__(self): +        if not self.reference: +            return self.label +        return u"{} ({})".format(self.label, self.reference) + +    @property +    def full_label(self): +        label = [unicode(self)] +        if self.parent: +            label.append(self.parent.full_label) +        return u" / ".join(label) + +  class Address(BaseHistorizedItem):      address = models.TextField(_(u"Address"), null=True, blank=True)      address_complement = models.TextField(_(u"Address complement"), null=True,                                            blank=True)      postal_code = models.CharField(_(u"Postal code"), max_length=10, null=True,                                     blank=True) -    town = models.CharField(_(u"Town"), max_length=70, null=True, blank=True) +    town = models.CharField(_(u"Town (freeform)"), max_length=150, null=True, +                            blank=True) +    precise_town = models.ForeignKey( +        Town, verbose_name=_(u"Town (precise)"), null=True, blank=True)      country = models.CharField(_(u"Country"), max_length=30, null=True,                                 blank=True)      alt_address = models.TextField(_(u"Other address: address"), null=True, @@ -2925,6 +3210,14 @@ class Address(BaseHistorizedItem):      class Meta:          abstract = True +    def get_town_centroid(self): +        if self.precise_town: +            return self.precise_town.center, self._meta.verbose_name + +    def get_town_polygons(self): +        if self.precise_town: +            return self.precise_town.limit, self._meta.verbose_name +      def simple_lbl(self):          return unicode(self) @@ -4400,215 +4693,6 @@ def document_attached_changed(sender, **kwargs):  post_save.connect(cached_label_changed, sender=Document) -class Arrondissement(models.Model): -    name = models.CharField(u"Nom", max_length=30) -    department = models.ForeignKey(Department, verbose_name=u"Département") - -    def __unicode__(self): -        return settings.JOINT.join((self.name, unicode(self.department))) - - -class Canton(models.Model): -    name = models.CharField(u"Nom", max_length=30) -    arrondissement = models.ForeignKey(Arrondissement, -                                       verbose_name=u"Arrondissement") - -    def __unicode__(self): -        return settings.JOINT.join( -            (self.name, unicode(self.arrondissement))) - - -class TownManager(models.GeoManager): -    def get_by_natural_key(self, numero_insee, year): -        return self.get(numero_insee=numero_insee, year=year) - - -class Town(Imported, models.Model): -    name = models.CharField(_(u"Name"), max_length=100) -    surface = models.IntegerField(_(u"Surface (m2)"), blank=True, null=True) -    center = models.PointField(_(u"Localisation"), srid=settings.SRID, -                               blank=True, null=True) -    limit = models.MultiPolygonField(_(u"Limit"), blank=True, null=True) -    numero_insee = models.CharField(u"Code commune (numéro INSEE)", -                                    max_length=120) -    departement = models.ForeignKey( -        Department, verbose_name=_(u"Department"), -        on_delete=models.SET_NULL, null=True, blank=True) -    year = models.IntegerField( -        _("Year of creation"), null=True, blank=True, -        help_text=_(u"Filling this field is relevant to distinguish old towns " -                    u"from new towns.")) -    children = models.ManyToManyField( -        'Town', verbose_name=_(u"Town children"), blank=True, -        related_name='parents') -    cached_label = models.CharField(_(u"Cached name"), max_length=500, -                                    null=True, blank=True, db_index=True) -    objects = TownManager() - -    class Meta: -        verbose_name = _(u"Town") -        verbose_name_plural = _(u"Towns") -        if settings.COUNTRY == 'fr': -            ordering = ['numero_insee'] -            unique_together = (('numero_insee', 'year'),) - -    def natural_key(self): -        return (self.numero_insee, self.year) - -    def history_compress(self): -        values = {'numero_insee': self.numero_insee, -                  'year': self.year or ""} -        return values - -    @classmethod -    def history_decompress(cls, full_value, create=False): -        if not full_value: -            return [] -        res = [] -        for value in full_value: -            try: -                res.append( -                    cls.objects.get(numero_insee=value['numero_insee'], -                                    year=value['year'] or None)) -            except cls.DoesNotExist: -                continue -        return res - -    def __unicode__(self): -        if self.cached_label: -            return self.cached_label -        self.save() -        return self.cached_label - -    @property -    def label_with_areas(self): -        label = [self.name] -        if self.numero_insee: -            label.append(u"({})".format(self.numero_insee)) -        for area in self.areas.all(): -            label.append(u" - ") -            label.append(area.full_label) -        return u" ".join(label) - -    def generate_geo(self, force=False): -        force = self.generate_limit(force=force) -        self.generate_center(force=force) -        self.generate_area(force=force) - -    def generate_limit(self, force=False): -        if not force and self.limit: -            return -        parents = None -        if not self.parents.count(): -            return -        for parent in self.parents.all(): -            if not parent.limit: -                return -            if not parents: -                parents = parent.limit -            else: -                parents = parents.union(parent.limit) -        # if union is a simple polygon make it a multi -        if 'MULTI' not in parents.wkt: -            parents = parents.wkt.replace('POLYGON', 'MULTIPOLYGON(') + ")" -        if not parents: -            return -        self.limit = parents -        self.save() -        return True - -    def generate_center(self, force=False): -        if not force and (self.center or not self.limit): -            return -        self.center = self.limit.centroid -        if not self.center: -            return False -        self.save() -        return True - -    def generate_area(self, force=False): -        if not force and (self.surface or not self.limit): -            return -        self.surface = self.limit.transform(settings.SURFACE_SRID, -                                            clone=True).area -        if not self.surface: -            return False -        self.save() -        return True - -    def update_town_code(self): -        if not self.numero_insee or not self.children.count() or not self.year: -            return -        old_num = self.numero_insee[:] -        numero = old_num.split('-')[0] -        self.numero_insee = u"{}-{}".format(numero, self.year) -        if self.numero_insee != old_num: -            return True - -    def _generate_cached_label(self): -        cached_label = self.name -        if settings.COUNTRY == "fr" and self.numero_insee: -            dpt_len = 2 -            if self.numero_insee.startswith('97') or \ -                    self.numero_insee.startswith('98') or \ -                    self.numero_insee[0] not in ('0', '1', '2', '3', '4', '5', -                                                 '6', '7', '8', '9'): -                dpt_len = 3 -            cached_label = u"%s - %s" % (self.name, self.numero_insee[:dpt_len]) -        if self.year and self.children.count(): -            cached_label += u" ({})".format(self.year) -        return cached_label - - -def post_save_town(sender, **kwargs): -    cached_label_changed(sender, **kwargs) -    town = kwargs['instance'] -    town.generate_geo() -    if town.update_town_code(): -        town.save() - - -post_save.connect(post_save_town, sender=Town) - - -def town_child_changed(sender, **kwargs): -    town = kwargs['instance'] -    if town.update_town_code(): -        town.save() - - -m2m_changed.connect(town_child_changed, sender=Town.children.through) - - -class Area(HierarchicalType): -    towns = models.ManyToManyField(Town, verbose_name=_(u"Towns"), blank=True, -                                   related_name='areas') -    reference = models.CharField(_(u"Reference"), max_length=200, blank=True, -                                 null=True) -    parent = models.ForeignKey( -        'self', blank=True, null=True, verbose_name=_(u"Parent"), -        help_text=_(u"Only four level of parent are managed."), -        related_name='children', on_delete=models.SET_NULL -    ) - -    class Meta: -        verbose_name = _(u"Area") -        verbose_name_plural = _(u"Areas") -        ordering = ('label',) - -    def __unicode__(self): -        if not self.reference: -            return self.label -        return u"{} ({})".format(self.label, self.reference) - -    @property -    def full_label(self): -        label = [unicode(self)] -        if self.parent: -            label.append(self.parent.full_label) -        return u" / ".join(label) - -  class OperationType(GeneralType):      order = models.IntegerField(_(u"Order"), default=1)      preventive = models.BooleanField(_(u"Is preventive"), default=True) @@ -4691,22 +4775,6 @@ post_save.connect(post_save_cache, sender=OperationType)  post_delete.connect(post_save_cache, sender=OperationType) -class SpatialReferenceSystem(GeneralType): -    order = models.IntegerField(_(u"Order"), default=10) -    auth_name = models.CharField( -        _(u"Authority name"), default=u'EPSG', max_length=256) -    srid = models.IntegerField(_(u"Authority SRID")) - -    class Meta: -        verbose_name = _(u"Spatial reference system") -        verbose_name_plural = _(u"Spatial reference systems") -        ordering = ('label',) - - -post_save.connect(post_save_cache, sender=SpatialReferenceSystem) -post_delete.connect(post_save_cache, sender=SpatialReferenceSystem) - -  class AdministrationScript(models.Model):      path = models.CharField(_(u"Filename"), max_length=30)      name = models.TextField(_(u"Name"), | 
