#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2017-2025 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import csv import datetime import os import sys from django.core.management.base import BaseCommand from django.contrib.gis.geos import GEOSGeometry, Point from django.db import transaction from django.db.utils import DataError from django.conf import settings from django.contrib.contenttypes.models import ContentType from ishtar_common.models import Town, GeoVectorData, GeoDataType, GeoProviderType from ishtar_common.utils import BColors, fast_line_count, get_log_time, get_progress town_content_type = ContentType.objects.get(app_label="ishtar_common", model="town") data_type, __ = GeoDataType.objects.get_or_create( txt_idx="town-limit", defaults={"label": "Limites commune"} ) provider, __ = GeoProviderType.objects.get_or_create( txt_idx="france-ign", defaults={"label": "IGN"} ) class Command(BaseCommand): help = 'Import GEOFLA csv' def add_arguments(self, parser): parser.add_argument('csv_file') parser.add_argument( '--year', type=int, default=2014, dest='year', help='Default year to affect to the town') parser.add_argument( '--quiet', dest='quiet', action='store_true', help='Quiet output') parser.add_argument( '--create-only', dest='create_only', action='store_true', help='No update') parser.add_argument( '--create-only-geo', dest='create_only_geo', action='store_true', help='Create only missing geo') parser.add_argument( '--srid', type=int, default=2154, dest='srid', help='SRID uses. Default: 2154.') def get_town(self, num_insee, name, default_year): q = Town.objects.filter(numero_insee=num_insee) created = False if q.count(): if q.filter(year=default_year).count(): town = q.filter(year=default_year).all()[0] else: town = q.order_by('-year').all()[0] else: created = True town = Town(name=name, numero_insee=num_insee) return town, created @transaction.atomic def handle(self, *args, **options): log_path = os.sep.join([settings.ROOT_PATH, "logs"]) if not os.path.exists(log_path): os.mkdir(log_path, mode=0o770) csv.field_size_limit(sys.maxsize) csv_file = options['csv_file'] default_year = options['year'] srid = options['srid'] quiet = options['quiet'] create_only = options["create_only"] create_only_geo = options["create_only_geo"] if not quiet: sys.stdout.write(BColors.OKGREEN) sys.stdout.write(f'* using year {default_year} as a default\n') sys.stdout.write(f'* opening file {csv_file}{BColors.ENDC}\n') nb_created, nb_error, nb_geo = 0, 0, 0 nb_csv_lines = fast_line_count(csv_file) - 1 started = datetime.datetime.now() log_filename = f"import_ign-{get_log_time().replace(':', '')}.csv" log_path = os.sep.join([log_path, log_filename]) with open(log_path, "w+") as fle: writer = csv.writer(fle) writer.writerow(["insee", "town", "created", "error"]) with open(csv_file, 'rt') as csvfile: header = csvfile.readline() geom_colum = header.split(",")[0] csvfile.seek(0) reader = csv.DictReader(csvfile) for idx, row in enumerate(reader): if not quiet: sys.stdout.write( get_progress("processing town", idx, nb_csv_lines, started) ) sys.stdout.flush() num_insee = row['INSEE_COM'] if len(num_insee) < 5: num_insee = '0' + num_insee if 'NOM_COM_M' in row: name = row['NOM_COM_M'] elif 'NOM_M' in row: name = row['NOM_M'] elif 'NOM' in row: name = row['NOM'].upper() else: name = row['NOM_COM'].upper() town, created = self.get_town(num_insee, name, default_year) if created: if create_only_geo: continue nb_created += 1 elif (create_only or create_only_geo) and town.main_geodata: continue geom = row[geom_colum].upper() if 'MULTI' not in geom: geom = geom.replace('POLYGON', 'MULTIPOLYGON(') + ')' limit = GEOSGeometry(geom, srid=srid) if 'X_CENTROID' in row: center = Point( float(row['X_CENTROID']), float(row['Y_CENTROID']), srid=srid) else: center = None values = {} values['center'] = None if not town.year and default_year: values['year'] = default_year if 'SUPERFICIE' in row: values['surface'] = row['SUPERFICIE'] else: values['surface'] = None for k in values: setattr(town, k, values[k]) error = False try: with transaction.atomic(): town.save() except DataError: nb_error += 1 town, created = self.get_town(num_insee, name, default_year) for k in values: setattr(town, k, values[k]) town.save() error = True writer.writerow([town.numero_insee, town.name, created, error]) attrs = { "name": town._generate_cached_label(), "source_content_type": town_content_type, "source_id": town.pk, "data_type": data_type, "provider": provider, } if limit: attrs["multi_polygon"] = limit else: attrs["point_2d"] = center try: data, created = GeoVectorData.objects.get_or_create(**attrs) except DataError: if limit: print(f"\nError {town} polygon\n") attrs.pop("multi_polygon") attrs["point_2d"] = center data, created = GeoVectorData.objects.get_or_create(**attrs) else: print(f"\nError {town} center\n") town.main_geodata = data town._post_save_geo_ok = False town.save() if created: nb_geo += 1 if quiet: return sys.stdout.write(BColors.OKGREEN) if nb_created: sys.stdout.write('\n* {} town created'.format(nb_created)) if nb_geo: sys.stdout.write('\n* {} geo created'.format(nb_geo)) if nb_error: sys.stdout.write('\n* {} town with geometry error\n'.format(nb_error)) sys.stdout.write(f"\n[{get_log_time()}] log file:") sys.stdout.write(f"\n{BColors.WARNING}{log_path}{BColors.ENDC}\n") sys.stdout.flush()