From 986511b2572acb369086ed613adc32ff1d1863a9 Mon Sep 17 00:00:00 2001 From: Étienne Loks Date: Tue, 19 Feb 2019 17:36:25 +0100 Subject: Geo: manage container and warehouse - manage geo save dependencies --- ishtar_common/models.py | 528 +++++++++++++++++++++++++++--------------------- 1 file changed, 298 insertions(+), 230 deletions(-) (limited to 'ishtar_common/models.py') diff --git a/ishtar_common/models.py b/ishtar_common/models.py index 778461562..28e469e31 100644 --- a/ishtar_common/models.py +++ b/ishtar_common/models.py @@ -1612,23 +1612,96 @@ class DocumentItem(object): return actions -class GeoItem(object): +class SpatialReferenceSystem(GeneralType): + order = models.IntegerField(_(u"Order"), default=10) + auth_name = models.CharField( + _(u"Authority name"), default=u'EPSG', max_length=256) + srid = models.IntegerField(_(u"Authority SRID")) + + class Meta: + verbose_name = _(u"Spatial reference system") + verbose_name_plural = _(u"Spatial reference systems") + ordering = ('label',) + + +post_save.connect(post_save_cache, sender=SpatialReferenceSystem) +post_delete.connect(post_save_cache, sender=SpatialReferenceSystem) + + +class GeoItem(models.Model): GEO_SOURCE = ('T', _(u"Town")), ('P', _(u"Precise")) + # gis + x = models.FloatField(_(u'X'), blank=True, null=True) + y = models.FloatField(_(u'Y'), blank=True, null=True) + z = models.FloatField(_(u'Z'), blank=True, null=True) + estimated_error_x = models.FloatField(_(u'Estimated error for X'), + blank=True, null=True) + estimated_error_y = models.FloatField(_(u'Estimated error for Y'), + blank=True, null=True) + estimated_error_z = models.FloatField(_(u'Estimated error for Z'), + blank=True, null=True) + spatial_reference_system = models.ForeignKey( + SpatialReferenceSystem, verbose_name=_(u"Spatial Reference System"), + blank=True, null=True) + point = models.PointField(_(u"Point"), blank=True, null=True, dim=3) + point_2d = models.PointField(_(u"Point (2D)"), blank=True, null=True) + point_source = models.CharField( + _(u"Point source"), choices=GEO_SOURCE, max_length=1, blank=True, + null=True) + point_source_item = models.CharField( + _(u"Point source item"), max_length=100, blank=True, null=True) + multi_polygon = models.MultiPolygonField(_(u"Multi polygon"), blank=True, + null=True) + multi_polygon_source = models.CharField( + _(u"Multi-polygon source"), choices=GEO_SOURCE, max_length=1, + blank=True, null=True) + multi_polygon_source_item = models.CharField( + _(u"Multi polygon source item"), max_length=100, blank=True, null=True) + + GEO_LABEL = "" + + class Meta: + abstract = True + + def get_town_centroid(self): + raise NotImplementedError + + def get_town_polygons(self): + raise NotImplementedError + + def get_precise_points(self): + if self.point_source == 'P' and self.point_2d: + return self.point_2d, self.point, self.point_source_item + + def get_precise_polygons(self): + if self.multi_polygon_source == 'P' and self.multi_polygon: + return self.multi_polygon, self.multi_polygon_source_item + def geo_point_source(self): if not self.point_source: return "" - return dict(self.GEO_SOURCE)[self.point_source] + src = u"{} - {}".format( + dict(self.GEO_SOURCE)[self.point_source], + self.point_source_item + ) + return src def geo_polygon_source(self): if not self.multi_polygon_source: return "" - return dict(self.GEO_SOURCE)[self.multi_polygon_source] + src = u"{} - {}".format( + dict(self.GEO_SOURCE)[self.multi_polygon_source], + self.multi_polygon_source_item + ) + return src def _geojson_serialize(self, geom_attr): if not hasattr(self, geom_attr): return "" cached_label_key = 'cached_label' + if self.GEO_LABEL: + cached_label_key = self.GEO_LABEL if getattr(self, "CACHED_LABELS", None): cached_label_key = self.CACHED_LABELS[-1] geojson = serialize( @@ -1658,7 +1731,7 @@ class GeoItem(object): return self._geojson_serialize('multi_polygon') -class BaseHistorizedItem(DocumentItem, FullSearch, Imported, JsonData, GeoItem, +class BaseHistorizedItem(DocumentItem, FullSearch, Imported, JsonData, FixAssociated): """ Historized item with external ID management. @@ -2883,13 +2956,225 @@ class Department(models.Model): return res +class Arrondissement(models.Model): + name = models.CharField(u"Nom", max_length=30) + department = models.ForeignKey(Department, verbose_name=u"Département") + + def __unicode__(self): + return settings.JOINT.join((self.name, unicode(self.department))) + + +class Canton(models.Model): + name = models.CharField(u"Nom", max_length=30) + arrondissement = models.ForeignKey(Arrondissement, + verbose_name=u"Arrondissement") + + def __unicode__(self): + return settings.JOINT.join( + (self.name, unicode(self.arrondissement))) + + +class TownManager(models.GeoManager): + def get_by_natural_key(self, numero_insee, year): + return self.get(numero_insee=numero_insee, year=year) + + +class Town(Imported, models.Model): + name = models.CharField(_(u"Name"), max_length=100) + surface = models.IntegerField(_(u"Surface (m2)"), blank=True, null=True) + center = models.PointField(_(u"Localisation"), srid=settings.SRID, + blank=True, null=True) + limit = models.MultiPolygonField(_(u"Limit"), blank=True, null=True) + numero_insee = models.CharField(u"Code commune (numéro INSEE)", + max_length=120) + departement = models.ForeignKey( + Department, verbose_name=_(u"Department"), + on_delete=models.SET_NULL, null=True, blank=True) + year = models.IntegerField( + _("Year of creation"), null=True, blank=True, + help_text=_(u"Filling this field is relevant to distinguish old towns " + u"from new towns.")) + children = models.ManyToManyField( + 'Town', verbose_name=_(u"Town children"), blank=True, + related_name='parents') + cached_label = models.CharField(_(u"Cached name"), max_length=500, + null=True, blank=True, db_index=True) + objects = TownManager() + + class Meta: + verbose_name = _(u"Town") + verbose_name_plural = _(u"Towns") + if settings.COUNTRY == 'fr': + ordering = ['numero_insee'] + unique_together = (('numero_insee', 'year'),) + + def natural_key(self): + return (self.numero_insee, self.year) + + def history_compress(self): + values = {'numero_insee': self.numero_insee, + 'year': self.year or ""} + return values + + @classmethod + def history_decompress(cls, full_value, create=False): + if not full_value: + return [] + res = [] + for value in full_value: + try: + res.append( + cls.objects.get(numero_insee=value['numero_insee'], + year=value['year'] or None)) + except cls.DoesNotExist: + continue + return res + + def __unicode__(self): + if self.cached_label: + return self.cached_label + self.save() + return self.cached_label + + @property + def label_with_areas(self): + label = [self.name] + if self.numero_insee: + label.append(u"({})".format(self.numero_insee)) + for area in self.areas.all(): + label.append(u" - ") + label.append(area.full_label) + return u" ".join(label) + + def generate_geo(self, force=False): + force = self.generate_limit(force=force) + self.generate_center(force=force) + self.generate_area(force=force) + + def generate_limit(self, force=False): + if not force and self.limit: + return + parents = None + if not self.parents.count(): + return + for parent in self.parents.all(): + if not parent.limit: + return + if not parents: + parents = parent.limit + else: + parents = parents.union(parent.limit) + # if union is a simple polygon make it a multi + if 'MULTI' not in parents.wkt: + parents = parents.wkt.replace('POLYGON', 'MULTIPOLYGON(') + ")" + if not parents: + return + self.limit = parents + self.save() + return True + + def generate_center(self, force=False): + if not force and (self.center or not self.limit): + return + self.center = self.limit.centroid + if not self.center: + return False + self.save() + return True + + def generate_area(self, force=False): + if not force and (self.surface or not self.limit): + return + self.surface = self.limit.transform(settings.SURFACE_SRID, + clone=True).area + if not self.surface: + return False + self.save() + return True + + def update_town_code(self): + if not self.numero_insee or not self.children.count() or not self.year: + return + old_num = self.numero_insee[:] + numero = old_num.split('-')[0] + self.numero_insee = u"{}-{}".format(numero, self.year) + if self.numero_insee != old_num: + return True + + def _generate_cached_label(self): + cached_label = self.name + if settings.COUNTRY == "fr" and self.numero_insee: + dpt_len = 2 + if self.numero_insee.startswith('97') or \ + self.numero_insee.startswith('98') or \ + self.numero_insee[0] not in ('0', '1', '2', '3', '4', '5', + '6', '7', '8', '9'): + dpt_len = 3 + cached_label = u"%s - %s" % (self.name, self.numero_insee[:dpt_len]) + if self.year and self.children.count(): + cached_label += u" ({})".format(self.year) + return cached_label + + +def post_save_town(sender, **kwargs): + cached_label_changed(sender, **kwargs) + town = kwargs['instance'] + town.generate_geo() + if town.update_town_code(): + town.save() + + +post_save.connect(post_save_town, sender=Town) + + +def town_child_changed(sender, **kwargs): + town = kwargs['instance'] + if town.update_town_code(): + town.save() + + +m2m_changed.connect(town_child_changed, sender=Town.children.through) + + +class Area(HierarchicalType): + towns = models.ManyToManyField(Town, verbose_name=_(u"Towns"), blank=True, + related_name='areas') + reference = models.CharField(_(u"Reference"), max_length=200, blank=True, + null=True) + parent = models.ForeignKey( + 'self', blank=True, null=True, verbose_name=_(u"Parent"), + help_text=_(u"Only four level of parent are managed."), + related_name='children', on_delete=models.SET_NULL + ) + + class Meta: + verbose_name = _(u"Area") + verbose_name_plural = _(u"Areas") + ordering = ('label',) + + def __unicode__(self): + if not self.reference: + return self.label + return u"{} ({})".format(self.label, self.reference) + + @property + def full_label(self): + label = [unicode(self)] + if self.parent: + label.append(self.parent.full_label) + return u" / ".join(label) + + class Address(BaseHistorizedItem): address = models.TextField(_(u"Address"), null=True, blank=True) address_complement = models.TextField(_(u"Address complement"), null=True, blank=True) postal_code = models.CharField(_(u"Postal code"), max_length=10, null=True, blank=True) - town = models.CharField(_(u"Town"), max_length=70, null=True, blank=True) + town = models.CharField(_(u"Town (freeform)"), max_length=150, null=True, + blank=True) + precise_town = models.ForeignKey( + Town, verbose_name=_(u"Town (precise)"), null=True, blank=True) country = models.CharField(_(u"Country"), max_length=30, null=True, blank=True) alt_address = models.TextField(_(u"Other address: address"), null=True, @@ -2925,6 +3210,14 @@ class Address(BaseHistorizedItem): class Meta: abstract = True + def get_town_centroid(self): + if self.precise_town: + return self.precise_town.center, self._meta.verbose_name + + def get_town_polygons(self): + if self.precise_town: + return self.precise_town.limit, self._meta.verbose_name + def simple_lbl(self): return unicode(self) @@ -4400,215 +4693,6 @@ def document_attached_changed(sender, **kwargs): post_save.connect(cached_label_changed, sender=Document) -class Arrondissement(models.Model): - name = models.CharField(u"Nom", max_length=30) - department = models.ForeignKey(Department, verbose_name=u"Département") - - def __unicode__(self): - return settings.JOINT.join((self.name, unicode(self.department))) - - -class Canton(models.Model): - name = models.CharField(u"Nom", max_length=30) - arrondissement = models.ForeignKey(Arrondissement, - verbose_name=u"Arrondissement") - - def __unicode__(self): - return settings.JOINT.join( - (self.name, unicode(self.arrondissement))) - - -class TownManager(models.GeoManager): - def get_by_natural_key(self, numero_insee, year): - return self.get(numero_insee=numero_insee, year=year) - - -class Town(Imported, models.Model): - name = models.CharField(_(u"Name"), max_length=100) - surface = models.IntegerField(_(u"Surface (m2)"), blank=True, null=True) - center = models.PointField(_(u"Localisation"), srid=settings.SRID, - blank=True, null=True) - limit = models.MultiPolygonField(_(u"Limit"), blank=True, null=True) - numero_insee = models.CharField(u"Code commune (numéro INSEE)", - max_length=120) - departement = models.ForeignKey( - Department, verbose_name=_(u"Department"), - on_delete=models.SET_NULL, null=True, blank=True) - year = models.IntegerField( - _("Year of creation"), null=True, blank=True, - help_text=_(u"Filling this field is relevant to distinguish old towns " - u"from new towns.")) - children = models.ManyToManyField( - 'Town', verbose_name=_(u"Town children"), blank=True, - related_name='parents') - cached_label = models.CharField(_(u"Cached name"), max_length=500, - null=True, blank=True, db_index=True) - objects = TownManager() - - class Meta: - verbose_name = _(u"Town") - verbose_name_plural = _(u"Towns") - if settings.COUNTRY == 'fr': - ordering = ['numero_insee'] - unique_together = (('numero_insee', 'year'),) - - def natural_key(self): - return (self.numero_insee, self.year) - - def history_compress(self): - values = {'numero_insee': self.numero_insee, - 'year': self.year or ""} - return values - - @classmethod - def history_decompress(cls, full_value, create=False): - if not full_value: - return [] - res = [] - for value in full_value: - try: - res.append( - cls.objects.get(numero_insee=value['numero_insee'], - year=value['year'] or None)) - except cls.DoesNotExist: - continue - return res - - def __unicode__(self): - if self.cached_label: - return self.cached_label - self.save() - return self.cached_label - - @property - def label_with_areas(self): - label = [self.name] - if self.numero_insee: - label.append(u"({})".format(self.numero_insee)) - for area in self.areas.all(): - label.append(u" - ") - label.append(area.full_label) - return u" ".join(label) - - def generate_geo(self, force=False): - force = self.generate_limit(force=force) - self.generate_center(force=force) - self.generate_area(force=force) - - def generate_limit(self, force=False): - if not force and self.limit: - return - parents = None - if not self.parents.count(): - return - for parent in self.parents.all(): - if not parent.limit: - return - if not parents: - parents = parent.limit - else: - parents = parents.union(parent.limit) - # if union is a simple polygon make it a multi - if 'MULTI' not in parents.wkt: - parents = parents.wkt.replace('POLYGON', 'MULTIPOLYGON(') + ")" - if not parents: - return - self.limit = parents - self.save() - return True - - def generate_center(self, force=False): - if not force and (self.center or not self.limit): - return - self.center = self.limit.centroid - if not self.center: - return False - self.save() - return True - - def generate_area(self, force=False): - if not force and (self.surface or not self.limit): - return - self.surface = self.limit.transform(settings.SURFACE_SRID, - clone=True).area - if not self.surface: - return False - self.save() - return True - - def update_town_code(self): - if not self.numero_insee or not self.children.count() or not self.year: - return - old_num = self.numero_insee[:] - numero = old_num.split('-')[0] - self.numero_insee = u"{}-{}".format(numero, self.year) - if self.numero_insee != old_num: - return True - - def _generate_cached_label(self): - cached_label = self.name - if settings.COUNTRY == "fr" and self.numero_insee: - dpt_len = 2 - if self.numero_insee.startswith('97') or \ - self.numero_insee.startswith('98') or \ - self.numero_insee[0] not in ('0', '1', '2', '3', '4', '5', - '6', '7', '8', '9'): - dpt_len = 3 - cached_label = u"%s - %s" % (self.name, self.numero_insee[:dpt_len]) - if self.year and self.children.count(): - cached_label += u" ({})".format(self.year) - return cached_label - - -def post_save_town(sender, **kwargs): - cached_label_changed(sender, **kwargs) - town = kwargs['instance'] - town.generate_geo() - if town.update_town_code(): - town.save() - - -post_save.connect(post_save_town, sender=Town) - - -def town_child_changed(sender, **kwargs): - town = kwargs['instance'] - if town.update_town_code(): - town.save() - - -m2m_changed.connect(town_child_changed, sender=Town.children.through) - - -class Area(HierarchicalType): - towns = models.ManyToManyField(Town, verbose_name=_(u"Towns"), blank=True, - related_name='areas') - reference = models.CharField(_(u"Reference"), max_length=200, blank=True, - null=True) - parent = models.ForeignKey( - 'self', blank=True, null=True, verbose_name=_(u"Parent"), - help_text=_(u"Only four level of parent are managed."), - related_name='children', on_delete=models.SET_NULL - ) - - class Meta: - verbose_name = _(u"Area") - verbose_name_plural = _(u"Areas") - ordering = ('label',) - - def __unicode__(self): - if not self.reference: - return self.label - return u"{} ({})".format(self.label, self.reference) - - @property - def full_label(self): - label = [unicode(self)] - if self.parent: - label.append(self.parent.full_label) - return u" / ".join(label) - - class OperationType(GeneralType): order = models.IntegerField(_(u"Order"), default=1) preventive = models.BooleanField(_(u"Is preventive"), default=True) @@ -4691,22 +4775,6 @@ post_save.connect(post_save_cache, sender=OperationType) post_delete.connect(post_save_cache, sender=OperationType) -class SpatialReferenceSystem(GeneralType): - order = models.IntegerField(_(u"Order"), default=10) - auth_name = models.CharField( - _(u"Authority name"), default=u'EPSG', max_length=256) - srid = models.IntegerField(_(u"Authority SRID")) - - class Meta: - verbose_name = _(u"Spatial reference system") - verbose_name_plural = _(u"Spatial reference systems") - ordering = ('label',) - - -post_save.connect(post_save_cache, sender=SpatialReferenceSystem) -post_delete.connect(post_save_cache, sender=SpatialReferenceSystem) - - class AdministrationScript(models.Model): path = models.CharField(_(u"Filename"), max_length=30) name = models.TextField(_(u"Name"), -- cgit v1.2.3