diff options
Diffstat (limited to 'ishtar_common/models_common.py')
-rw-r--r-- | ishtar_common/models_common.py | 245 |
1 files changed, 243 insertions, 2 deletions
diff --git a/ishtar_common/models_common.py b/ishtar_common/models_common.py index 949cab832..bfbcb2748 100644 --- a/ishtar_common/models_common.py +++ b/ishtar_common/models_common.py @@ -16,6 +16,7 @@ import os import pyqrcode import re import shutil +import sys import tempfile import time from unidecode import unidecode @@ -46,6 +47,7 @@ from django.template.defaultfilters import slugify from django.utils.safestring import SafeText, mark_safe from django.utils.translation import activate, deactivate from ishtar_common.utils import ( + BColors, ugettext_lazy as _, pgettext_lazy, get_image_path, @@ -53,7 +55,8 @@ from ishtar_common.utils import ( human_date, HistoryError, SearchAltName, - SheetItem + SheetItem, + get_progress ) from simple_history.models import HistoricalRecords as BaseHistoricalRecords from simple_history.signals import ( @@ -3638,6 +3641,243 @@ class Town(GeographicItem, Imported, DocumentItem, MainItem, models.Model): return 0 return round(self.surface / 10000.0, 5) + @classmethod + def _geodata_import_update_town(cls, fields, geo_datas, verbose=False, limit=None): + """ + specific geodata_import method - import town + """ + values = fields.copy() + geos = [] + missing_lbl = _("INSEE code is missing") + missing_geo_lbl = _("Geo INSEE is missing") + for geo in values.pop("geodata"): + geo_id = geo[2] + if geo_id not in geo_datas: + if verbose: + sys.stdout.write(f"\n{BColors.FAIL}geodata: {missing_lbl} ") + sys.stdout.write(f"{geo_id}{BColors.ENDC}\n") + else: + geos.append(geo_datas[geo_id]) + main_geo = values["main_geodata"][2] + if main_geo not in geo_datas: + if verbose: + sys.stdout.write(f"\n{BColors.FAIL}main_geodata: {missing_geo_lbl}") + sys.stdout.write(f" {main_geo}{BColors.ENDC}\n") + values.pop("main_geodata") + else: + values["main_geodata"] = geo_datas[main_geo] + + q = Town.objects.filter(numero_insee=values["numero_insee"]) + created = False + if q.count(): + q.update(**values) + town = q.all()[0] + else: + created = True + cls.__town_created += 1 + town = Town.objects.create(**values) + for geo in geos: + town.geodata.add(geo) + return town, created + + @classmethod + def _geodata_import_update_geodata(cls, fields, geo_datas, limit=None): + """ + specific geodata_import method - import geodata + """ + if not getattr(cls, "_town_content_type", None): + cls._town_content_type = ContentType.objects.get( + app_label="ishtar_common", model="town" + ) + if not getattr(cls, "_town_town_data_type", None): + cls._town_data_type, __ = GeoDataType.objects.get_or_create( + txt_idx="town-limit", defaults={"label": _("Town limit")} + ) + + numero_insee = fields.pop('source') + if limit and not numero_insee.startswith(limit): + return + q = Town.objects.filter(numero_insee=numero_insee) + values = { + "provider": GeoProviderType.objects.get(txt_idx=fields["provider"]), + "comment": fields["comment"], + 'multi_polygon': fields["multi_polygon"] + } + if q.count(): + source_id = q.all()[0].pk + q2 = GeoVectorData.objects.filter( + source_id=source_id, + source_content_type=cls._town_content_type, + data_type=cls._town_data_type + ) + if q2.count(): + geo = q2.all()[0] + changed = False + for k in values: + if k == "multi_polygon": + if geo.multi_polygon.wkt != values[k]: + setattr(geo, k, values[k]) + changed = True + elif getattr(geo, k) != values[k]: + setattr(geo, k, values[k]) + changed = True + if changed: + cls.__geo_updated += 1 + geo.save() + geo_datas[numero_insee] = geo + return + values.update({ + "source_content_type": cls._town_content_type, + "data_type": cls._town_data_type + }) + cls.__geo_created += 1 + geo = GeoVectorData.objects.create(**values) + geo_datas[numero_insee] = geo + + @classmethod + def _geodata_import_get_children(cls, insee, towns, children, verbose=False): + """ + specific geodata_import method - make link between towns + """ + missing_lbl = _("INSEE code is missing") + if insee not in towns: + if verbose: + sys.stdout.write(f"\n{BColors.FAIL}children: {missing_lbl}") + sys.stdout.write(f" {insee}{BColors.ENDC}\n") + return + town = towns[insee] + current_children = list(town.children.values_list("id", flat=True)) + for child in children[insee]: + if child not in towns: + q = Town.objects.filter(numero_insee=child) + if not q.count(): + sys.stdout.write(f"\n{BColors.FAIL}children-child: {missing_lbl}") + sys.stdout.write(f" {child}{BColors.ENDC}\n") + continue + towns[child] = q.all()[0] + if towns[child].id in current_children: + continue + cls.__nb_rels += 1 + town.children.add(towns[child]) + + @classmethod + def geodata_import(cls, src, limit=None, log_path=None, verbose=False): + """ + Import custom geodata format + - src: geodata dict + - limit: if provided, only import town with code starting with this limit + - verbose: verbose output + """ + nb_lines = len(src) + started = datetime.datetime.now() + cls.__geo_updated, cls.__geo_created = 0, 0 + cls.__town_created = 0 + + towns, geo_datas, children = {}, {}, {} + for idx, values in enumerate(src): + if verbose: + sys.stdout.write(get_progress("processing", idx, nb_lines, started)) + sys.stdout.flush() + fields = values["fields"] + if values["model"] == "ishtar_common.town": + if limit and not values["numero_insee"].startswith(limit): + continue + c_children = fields.pop("children") + if c_children: + children[fields["numero_insee"]] = c_children + towns[fields["numero_insee"]], created = cls._geodata_import_update_town( + fields, geo_datas, verbose, limit + ) + if values["model"] == "ishtar_common.geovectordata": + cls._geodata_import_update_geodata(fields, geo_datas, limit) + + # manage geo sources + missing_lbl = _("INSEE code is missing") + for insee in geo_datas: + if insee not in towns: + if verbose: + sys.stdout.write( + f"\n{BColors.FAIL}geodata source: {missing_lbl} " + ) + sys.stdout.write( + f" {insee}{BColors.ENDC}\n" + ) + else: + g = geo_datas[insee] + if g.source_id != towns[insee].pk: + g.source_id = towns[insee].pk + g.save() + + nb_lines = len(children) + started = datetime.datetime.now() + cls.__nb_rels = 0 + if verbose: + print() + # management childrens + for idx, insee in enumerate(children): + if verbose: + sys.stdout.write(get_progress("update children", idx, nb_lines, started)) + sys.stdout.flush() + cls._geodata_import_get_children(insee, towns, children, verbose=verbose) + return cls.__town_created, cls.__geo_created, cls.__geo_updated, cls.__nb_rels + + @property + def geodata_export(self): + """ + Custom geodata export format to manage easily parent and main geodata + """ + main_geodata, geodata = None, [] + if self.main_geodata: + main_geodata = ["ishtar_common", "town", self.numero_insee] + geodata = [main_geodata] + geo = self.main_geodata + result = [ + { + "model": "ishtar_common.town", + "fields": { + "name": self.name, + "surface": self.surface, + "numero_insee": self.numero_insee, + "notice": self.notice, + "year": self.year, + "cached_label": self.cached_label, + "main_geodata": main_geodata, + "geodata": geodata, + "children": [ + t.numero_insee + for t in self.children.filter(numero_insee__isnull=False).all() + ] + } + } + ] + if not self.main_geodata: + return result + # put the geodata before the town + result = [ + { + "model": "ishtar_common.geovectordata", + "fields": { + "name": geo.name if geo.name and geo.name != "-" + else self.cached_label, + "source_content_type": [ + "ishtar_common", + "town" + ], + "source": self.numero_insee, + "data_type": [ + "town-limit" + ], + "provider": geo.provider.txt_idx, + "comment": geo.comment, + "cached_x": geo.cached_x, + "cached_y": geo.cached_y, + "spatial_reference_system": None, + "multi_polygon": geo.multi_polygon.wkt + } + } + ] + result + return result + def get_filename(self): if self.numero_insee: return f"{self.numero_insee} - {slugify(self.name)}" @@ -3768,7 +4008,8 @@ class Town(GeographicItem, Imported, DocumentItem, MainItem, models.Model): def _generate_cached_label(self): cached_label = self.name - if settings.COUNTRY == "fr" and self.numero_insee: + if settings.COUNTRY == "fr" and self.numero_insee \ + and "X" not in self.numero_insee: dpt_len = 2 if ( self.numero_insee.startswith("97") |