summaryrefslogtreecommitdiff
path: root/ishtar_common/models_common.py
diff options
context:
space:
mode:
Diffstat (limited to 'ishtar_common/models_common.py')
-rw-r--r--ishtar_common/models_common.py245
1 files changed, 243 insertions, 2 deletions
diff --git a/ishtar_common/models_common.py b/ishtar_common/models_common.py
index 949cab832..bfbcb2748 100644
--- a/ishtar_common/models_common.py
+++ b/ishtar_common/models_common.py
@@ -16,6 +16,7 @@ import os
import pyqrcode
import re
import shutil
+import sys
import tempfile
import time
from unidecode import unidecode
@@ -46,6 +47,7 @@ from django.template.defaultfilters import slugify
from django.utils.safestring import SafeText, mark_safe
from django.utils.translation import activate, deactivate
from ishtar_common.utils import (
+ BColors,
ugettext_lazy as _,
pgettext_lazy,
get_image_path,
@@ -53,7 +55,8 @@ from ishtar_common.utils import (
human_date,
HistoryError,
SearchAltName,
- SheetItem
+ SheetItem,
+ get_progress
)
from simple_history.models import HistoricalRecords as BaseHistoricalRecords
from simple_history.signals import (
@@ -3638,6 +3641,243 @@ class Town(GeographicItem, Imported, DocumentItem, MainItem, models.Model):
return 0
return round(self.surface / 10000.0, 5)
+ @classmethod
+ def _geodata_import_update_town(cls, fields, geo_datas, verbose=False, limit=None):
+ """
+ specific geodata_import method - import town
+ """
+ values = fields.copy()
+ geos = []
+ missing_lbl = _("INSEE code is missing")
+ missing_geo_lbl = _("Geo INSEE is missing")
+ for geo in values.pop("geodata"):
+ geo_id = geo[2]
+ if geo_id not in geo_datas:
+ if verbose:
+ sys.stdout.write(f"\n{BColors.FAIL}geodata: {missing_lbl} ")
+ sys.stdout.write(f"{geo_id}{BColors.ENDC}\n")
+ else:
+ geos.append(geo_datas[geo_id])
+ main_geo = values["main_geodata"][2]
+ if main_geo not in geo_datas:
+ if verbose:
+ sys.stdout.write(f"\n{BColors.FAIL}main_geodata: {missing_geo_lbl}")
+ sys.stdout.write(f" {main_geo}{BColors.ENDC}\n")
+ values.pop("main_geodata")
+ else:
+ values["main_geodata"] = geo_datas[main_geo]
+
+ q = Town.objects.filter(numero_insee=values["numero_insee"])
+ created = False
+ if q.count():
+ q.update(**values)
+ town = q.all()[0]
+ else:
+ created = True
+ cls.__town_created += 1
+ town = Town.objects.create(**values)
+ for geo in geos:
+ town.geodata.add(geo)
+ return town, created
+
+ @classmethod
+ def _geodata_import_update_geodata(cls, fields, geo_datas, limit=None):
+ """
+ specific geodata_import method - import geodata
+ """
+ if not getattr(cls, "_town_content_type", None):
+ cls._town_content_type = ContentType.objects.get(
+ app_label="ishtar_common", model="town"
+ )
+ if not getattr(cls, "_town_town_data_type", None):
+ cls._town_data_type, __ = GeoDataType.objects.get_or_create(
+ txt_idx="town-limit", defaults={"label": _("Town limit")}
+ )
+
+ numero_insee = fields.pop('source')
+ if limit and not numero_insee.startswith(limit):
+ return
+ q = Town.objects.filter(numero_insee=numero_insee)
+ values = {
+ "provider": GeoProviderType.objects.get(txt_idx=fields["provider"]),
+ "comment": fields["comment"],
+ 'multi_polygon': fields["multi_polygon"]
+ }
+ if q.count():
+ source_id = q.all()[0].pk
+ q2 = GeoVectorData.objects.filter(
+ source_id=source_id,
+ source_content_type=cls._town_content_type,
+ data_type=cls._town_data_type
+ )
+ if q2.count():
+ geo = q2.all()[0]
+ changed = False
+ for k in values:
+ if k == "multi_polygon":
+ if geo.multi_polygon.wkt != values[k]:
+ setattr(geo, k, values[k])
+ changed = True
+ elif getattr(geo, k) != values[k]:
+ setattr(geo, k, values[k])
+ changed = True
+ if changed:
+ cls.__geo_updated += 1
+ geo.save()
+ geo_datas[numero_insee] = geo
+ return
+ values.update({
+ "source_content_type": cls._town_content_type,
+ "data_type": cls._town_data_type
+ })
+ cls.__geo_created += 1
+ geo = GeoVectorData.objects.create(**values)
+ geo_datas[numero_insee] = geo
+
+ @classmethod
+ def _geodata_import_get_children(cls, insee, towns, children, verbose=False):
+ """
+ specific geodata_import method - make link between towns
+ """
+ missing_lbl = _("INSEE code is missing")
+ if insee not in towns:
+ if verbose:
+ sys.stdout.write(f"\n{BColors.FAIL}children: {missing_lbl}")
+ sys.stdout.write(f" {insee}{BColors.ENDC}\n")
+ return
+ town = towns[insee]
+ current_children = list(town.children.values_list("id", flat=True))
+ for child in children[insee]:
+ if child not in towns:
+ q = Town.objects.filter(numero_insee=child)
+ if not q.count():
+ sys.stdout.write(f"\n{BColors.FAIL}children-child: {missing_lbl}")
+ sys.stdout.write(f" {child}{BColors.ENDC}\n")
+ continue
+ towns[child] = q.all()[0]
+ if towns[child].id in current_children:
+ continue
+ cls.__nb_rels += 1
+ town.children.add(towns[child])
+
+ @classmethod
+ def geodata_import(cls, src, limit=None, log_path=None, verbose=False):
+ """
+ Import custom geodata format
+ - src: geodata dict
+ - limit: if provided, only import town with code starting with this limit
+ - verbose: verbose output
+ """
+ nb_lines = len(src)
+ started = datetime.datetime.now()
+ cls.__geo_updated, cls.__geo_created = 0, 0
+ cls.__town_created = 0
+
+ towns, geo_datas, children = {}, {}, {}
+ for idx, values in enumerate(src):
+ if verbose:
+ sys.stdout.write(get_progress("processing", idx, nb_lines, started))
+ sys.stdout.flush()
+ fields = values["fields"]
+ if values["model"] == "ishtar_common.town":
+ if limit and not values["numero_insee"].startswith(limit):
+ continue
+ c_children = fields.pop("children")
+ if c_children:
+ children[fields["numero_insee"]] = c_children
+ towns[fields["numero_insee"]], created = cls._geodata_import_update_town(
+ fields, geo_datas, verbose, limit
+ )
+ if values["model"] == "ishtar_common.geovectordata":
+ cls._geodata_import_update_geodata(fields, geo_datas, limit)
+
+ # manage geo sources
+ missing_lbl = _("INSEE code is missing")
+ for insee in geo_datas:
+ if insee not in towns:
+ if verbose:
+ sys.stdout.write(
+ f"\n{BColors.FAIL}geodata source: {missing_lbl} "
+ )
+ sys.stdout.write(
+ f" {insee}{BColors.ENDC}\n"
+ )
+ else:
+ g = geo_datas[insee]
+ if g.source_id != towns[insee].pk:
+ g.source_id = towns[insee].pk
+ g.save()
+
+ nb_lines = len(children)
+ started = datetime.datetime.now()
+ cls.__nb_rels = 0
+ if verbose:
+ print()
+ # management childrens
+ for idx, insee in enumerate(children):
+ if verbose:
+ sys.stdout.write(get_progress("update children", idx, nb_lines, started))
+ sys.stdout.flush()
+ cls._geodata_import_get_children(insee, towns, children, verbose=verbose)
+ return cls.__town_created, cls.__geo_created, cls.__geo_updated, cls.__nb_rels
+
+ @property
+ def geodata_export(self):
+ """
+ Custom geodata export format to manage easily parent and main geodata
+ """
+ main_geodata, geodata = None, []
+ if self.main_geodata:
+ main_geodata = ["ishtar_common", "town", self.numero_insee]
+ geodata = [main_geodata]
+ geo = self.main_geodata
+ result = [
+ {
+ "model": "ishtar_common.town",
+ "fields": {
+ "name": self.name,
+ "surface": self.surface,
+ "numero_insee": self.numero_insee,
+ "notice": self.notice,
+ "year": self.year,
+ "cached_label": self.cached_label,
+ "main_geodata": main_geodata,
+ "geodata": geodata,
+ "children": [
+ t.numero_insee
+ for t in self.children.filter(numero_insee__isnull=False).all()
+ ]
+ }
+ }
+ ]
+ if not self.main_geodata:
+ return result
+ # put the geodata before the town
+ result = [
+ {
+ "model": "ishtar_common.geovectordata",
+ "fields": {
+ "name": geo.name if geo.name and geo.name != "-"
+ else self.cached_label,
+ "source_content_type": [
+ "ishtar_common",
+ "town"
+ ],
+ "source": self.numero_insee,
+ "data_type": [
+ "town-limit"
+ ],
+ "provider": geo.provider.txt_idx,
+ "comment": geo.comment,
+ "cached_x": geo.cached_x,
+ "cached_y": geo.cached_y,
+ "spatial_reference_system": None,
+ "multi_polygon": geo.multi_polygon.wkt
+ }
+ }
+ ] + result
+ return result
+
def get_filename(self):
if self.numero_insee:
return f"{self.numero_insee} - {slugify(self.name)}"
@@ -3768,7 +4008,8 @@ class Town(GeographicItem, Imported, DocumentItem, MainItem, models.Model):
def _generate_cached_label(self):
cached_label = self.name
- if settings.COUNTRY == "fr" and self.numero_insee:
+ if settings.COUNTRY == "fr" and self.numero_insee \
+ and "X" not in self.numero_insee:
dpt_len = 2
if (
self.numero_insee.startswith("97")