#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2025 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import datetime import json import os import sys from django.conf import settings from django.core.management.base import BaseCommand from django.contrib.contenttypes.models import ContentType from django.db import transaction from ishtar_common.utils import BColors, get_log_time, get_progress from ishtar_common.models import Town, GeoVectorData, GeoDataType, GeoProviderType town_content_type = ContentType.objects.get(app_label="ishtar_common", model="town") town_data_type, __ = GeoDataType.objects.get_or_create( txt_idx="town-limit", defaults={"label": "Limites commune"} ) class Command(BaseCommand): help = 'Import towns fixtures' def add_arguments(self, parser): parser.add_argument('towns_file') parser.add_argument( '--limit', type=str, default="", dest='limit', help='Limit import town to town code begining with the string provided.' 'Example: --limit=35') parser.add_argument( '--quiet', dest='quiet', action='store_true', help='Quiet output') @transaction.atomic def handle(self, *args, **options): log_path = os.sep.join([settings.ROOT_PATH, "logs"]) if not os.path.exists(log_path): os.mkdir(log_path, mode=0o770) quiet = options['quiet'] self.limit = options['limit'] towns_file = options['towns_file'] with open(towns_file, "r") as t: src = json.loads(t.read()) nb_lines = len(src) started = datetime.datetime.now() self.geo_updated, self.geo_created = 0, 0 self.town_created = 0 log_filename = f"load_towns-{get_log_time().replace(':', '')}.csv" log_path = os.sep.join([log_path, log_filename]) towns, geo_datas, children = {}, {}, {} for idx, values in enumerate(src): sys.stdout.write(get_progress("processing", idx, nb_lines, started)) sys.stdout.flush() fields = values["fields"] if values["model"] == "ishtar_common.town": if self.limit and not values["numero_insee"].startswith(self.limit): continue c_children = fields.pop("children") if c_children: children[fields["numero_insee"]] = c_children towns[fields["numero_insee"]], created = self.update_town( fields, geo_datas ) if values["model"] == "ishtar_common.geovectordata": self.update_geodata(fields, geo_datas) # manage geo sources for insee in geo_datas: if insee not in towns: sys.stdout.write( f"\n{BColors.FAIL}geodata source : INSEE manquant {insee}{BColors.ENDC}\n" ) else: g = geo_datas[insee] if g.source_id != towns[insee].pk: g.source_id = towns[insee].pk g.save() nb_lines = len(children) started = datetime.datetime.now() self.nb_rels = 0 print() # management childrens for idx, insee in enumerate(children): sys.stdout.write(get_progress("update children", idx, nb_lines, started)) sys.stdout.flush() self.get_children(insee, towns, children) sys.stdout.write(BColors.OKGREEN) if self.town_created: sys.stdout.write(f'\n* {self.town_created} town created') if self.geo_created: sys.stdout.write(f'\n* {self.geo_created} geo created') if self.geo_updated: sys.stdout.write(f'\n* {self.geo_updated} geo updated') if self.nb_rels: sys.stdout.write(f'\n* {self.nb_rels} relations updated') sys.stdout.write(BColors.ENDC + "\n") sys.stdout.flush() def update_town(self, fields, geo_datas): values = fields.copy() geos = [] for geo in values.pop("geodata"): geo_id = geo[2] if geo_id not in geo_datas: sys.stdout.write(f"\n{BColors.FAIL}geodata : Geo INSEE manquant {geo_id}{BColors.ENDC}\n") else: geos.append(geo_datas[geo_id]) main_geo = values["main_geodata"][2] if main_geo not in geo_datas: sys.stdout.write(f"\n{BColors.FAIL}main_geodata : Geo INSEE manquant {main_geo}{BColors.ENDC}\n") values.pop(main_geo) else: values["main_geodata"] = geo_datas[main_geo] q = Town.objects.filter(numero_insee=values["numero_insee"]) created = False if q.count(): q.update(**values) town = q.all()[0] else: created = True self.town_created += 1 town = Town.objects.create(**values) for geo in geos: town.geodata.add(geo) return town, created def update_geodata(self, fields, geo_datas): numero_insee = fields.pop('source') if self.limit and not numero_insee.startswith(self.limit): return q = Town.objects.filter(numero_insee=numero_insee) values = { "provider": GeoProviderType.objects.get(txt_idx=fields["provider"]), "comment": fields["comment"], 'multi_polygon': fields["multi_polygon"] } if q.count(): source_id = q.all()[0].pk q2 = GeoVectorData.objects.filter( source_id=source_id, source_content_type=town_content_type, data_type=town_data_type ) if q2.count(): geo = q2.all()[0] changed = False for k in values: if k == "multi_polygon": if geo.multi_polygon.wkt != values[k]: setattr(geo, k, values[k]) changed = True elif getattr(geo, k) != values[k]: setattr(geo, k, values[k]) changed = True if changed: self.geo_updated += 1 geo.save() geo_datas[numero_insee] = geo return values.update({ "source_content_type": town_content_type, "data_type": town_data_type }) self.geo_created += 1 geo = GeoVectorData.objects.create(**values) geo_datas[numero_insee] = geo def get_children(self, insee, towns, children): if insee not in towns: sys.stdout.write(f"\n{BColors.FAIL}children : INSEE manquant {insee}{BColors.ENDC}\n") return town = towns[insee] current_children = list(town.children.values_list("id", flat=True)) for child in children[insee]: if child not in towns: sys.stdout.write(f"\n{BColors.FAIL}children-child : INSEE manquant {insee}{BColors.ENDC}\n") continue if towns[child].id in current_children: continue self.nb_rels += 1 town.children.add(towns[child])