#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2017-2025 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import csv import os import re import sys from django.conf import settings from django.core.management.base import BaseCommand from ishtar_common.models import Town from ishtar_common.utils import BColors, get_log_time class Command(BaseCommand): help = 'Import INSEE csv' def add_arguments(self, parser): parser.add_argument('csv_file') parser.add_argument( '--year', type=int, default=2015, dest='year', help='Year to affect to new towns') parser.add_argument( '--quiet', dest='quiet', action='store_true', help='Quiet output') def handle(self, *args, **options): log_path = os.sep.join([settings.ROOT_PATH, "logs"]) if not os.path.exists(log_path): os.mkdir(log_path, mode=0o770) csv_file = options['csv_file'] default_year = options['year'] quiet = options['quiet'] if not quiet: sys.stdout.write(BColors.OKGREEN) sys.stdout.write(f'* Using year {default_year} for new towns\n') sys.stdout.write(f'* Opening file {csv_file}{BColors.ENDC}\n') r = re.compile(r"(.*)\((.*)\)") nb_created = 0 nb_link = 0 missing = [] strange = [] linked = set() log_filename = f"import_insee-{get_log_time().replace(':', '')}.csv" log_path = os.sep.join([log_path, log_filename]) with open(log_path, "w+") as fle: writer = csv.writer(fle) writer.writerow(["new insee", "new town"]) with open(csv_file, 'rt') as csvfile: reader = csv.DictReader(csvfile) for idx, row in enumerate(reader): new_insee = row['DepComN'] if len(new_insee) < 5: new_insee = '0' + new_insee if not idx: # test if first do not exist q = Town.objects.filter(numero_insee=new_insee, year=default_year) if q.count(): sys.stdout.write( f"{BColors.FAIL}First town already exists for this year{BColors.ENDC}\n") return if not quiet: sys.stdout.write('Processing town %d.\r' % (idx + 1)) sys.stdout.flush() old_insee = row['DepComA'] if len(old_insee) < 5: old_insee = '0' + old_insee q = Town.objects.filter(numero_insee=old_insee) if not q.count(): missing.append((old_insee, row['NomCA'])) continue if q.count() > 1: q = q.filter(year__lt=default_year).order_by('-year') if not q.count(): strange.append((old_insee, row['NomCA'])) continue old_town = q.all()[0] q = Town.objects.filter(numero_insee=new_insee, year=default_year) if not q.count(): nb_created += 1 name = row['NomCN'].strip() name = r.sub(r"\2 \1", name).strip() new_town = Town.objects.create(name=name, year=default_year, numero_insee=new_insee) writer.writerow([new_town.numero_insee, new_town.name]) else: new_town = q.all()[0] if new_town in old_town.children.all(): continue # link already created nb_link += 1 old_town.children.add(new_town) linked.add(new_town) nb_limit = 0 if not quiet: sys.stdout.write('\nGenerate limits...') for town in linked: if town.generate_geo(): nb_limit += 1 if quiet: return sys.stdout.write(BColors.OKGREEN) sys.stdout.write('\n* {} town created\n'.format(nb_created)) sys.stdout.write('* {} link created\n'.format(nb_link)) sys.stdout.write('* {} limit generated\n'.format(nb_limit)) if missing: sys.stdout.write('* these towns are missing:\n') for insee, name in missing: sys.stdout.write('* {} ({})\n'.format(name, insee)) if strange: sys.stdout.write('* these towns have newer version:\n') for insee, name in strange: sys.stdout.write('* {} ({})\n'.format(name, insee)) sys.stdout.write(f"\n[{get_log_time()}] log file:") sys.stdout.write(f"\n{BColors.WARNING}{log_path}{BColors.ENDC}\n") sys.stdout.flush()