diff options
Diffstat (limited to 'ishtar_common/management/commands/load_towns.py')
| -rw-r--r-- | ishtar_common/management/commands/load_towns.py | 203 | 
1 files changed, 203 insertions, 0 deletions
diff --git a/ishtar_common/management/commands/load_towns.py b/ishtar_common/management/commands/load_towns.py new file mode 100644 index 000000000..1ab73d677 --- /dev/null +++ b/ishtar_common/management/commands/load_towns.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright (C) 2025  Étienne Loks  <etienne.loks_AT_peacefrogsDOTnet> + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. + +# See the file COPYING for details. + +import datetime +import json +import os +import sys + +from django.conf import settings +from django.core.management.base import BaseCommand +from django.contrib.contenttypes.models import ContentType +from django.db import transaction + +from ishtar_common.utils import BColors, get_log_time, get_progress +from ishtar_common.models import Town, GeoVectorData, GeoDataType, GeoProviderType + + +town_content_type = ContentType.objects.get(app_label="ishtar_common", model="town") +town_data_type, __ = GeoDataType.objects.get_or_create( +    txt_idx="town-limit", defaults={"label": "Limites commune"} +) + + +class Command(BaseCommand): +    help = 'Import towns fixtures' + +    def add_arguments(self, parser): +        parser.add_argument('towns_file') +        parser.add_argument( +            '--limit', type=str, default="", dest='limit', +            help='Limit import town to town code begining with the string provided.' +            'Example: --limit=35') +        parser.add_argument( +            '--quiet', dest='quiet', action='store_true', +            help='Quiet output') + +    @transaction.atomic +    def handle(self, *args, **options): +        log_path = os.sep.join([settings.ROOT_PATH, "logs"]) +        if not os.path.exists(log_path): +            os.mkdir(log_path, mode=0o770) +        quiet = options['quiet'] +        self.limit = options['limit'] +        towns_file = options['towns_file'] + +        with open(towns_file, "r") as t: +            src = json.loads(t.read()) +        nb_lines = len(src) +        started = datetime.datetime.now() +        self.geo_updated, self.geo_created = 0, 0 +        self.town_created = 0 + +        log_filename = f"load_towns-{get_log_time().replace(':', '')}.csv" +        log_path = os.sep.join([log_path, log_filename]) +        towns, geo_datas, children = {}, {}, {} +        for idx, values in enumerate(src): +            sys.stdout.write(get_progress("processing", idx, nb_lines, started)) +            sys.stdout.flush() +            fields = values["fields"] +            if values["model"] == "ishtar_common.town": +                if self.limit and not values["numero_insee"].startswith(self.limit): +                    continue +                c_children = fields.pop("children") +                if c_children: +                    children[fields["numero_insee"]] = c_children +                towns[fields["numero_insee"]], created = self.update_town( +                    fields, geo_datas +                ) +            if values["model"] == "ishtar_common.geovectordata": +                self.update_geodata(fields, geo_datas) +        # manage geo sources +        for insee in geo_datas: +            if insee not in towns: +                sys.stdout.write( +                    f"\n{BColors.FAIL}geodata source : INSEE manquant {insee}{BColors.ENDC}\n" +                ) +            else: +                g = geo_datas[insee] +                if g.source_id != towns[insee].pk: +                    g.source_id = towns[insee].pk +                    g.save() + +        nb_lines = len(children) +        started = datetime.datetime.now() +        self.nb_rels = 0 +        print() +        # management childrens +        for idx, insee in enumerate(children): +            sys.stdout.write(get_progress("update children", idx, nb_lines, started)) +            sys.stdout.flush() +            self.get_children(insee, towns, children) +        sys.stdout.write(BColors.OKGREEN) +        if self.town_created: +            sys.stdout.write(f'\n* {self.town_created} town created') +        if self.geo_created: +            sys.stdout.write(f'\n* {self.geo_created} geo created') +        if self.geo_updated: +            sys.stdout.write(f'\n* {self.geo_updated} geo updated') +        if self.nb_rels: +            sys.stdout.write(f'\n* {self.nb_rels} relations updated') +        sys.stdout.write(BColors.ENDC + "\n") +        sys.stdout.flush() + +    def update_town(self, fields, geo_datas): +        values = fields.copy() +        geos = [] +        for geo in values.pop("geodata"): +            geo_id = geo[2] +            if geo_id not in geo_datas: +                sys.stdout.write(f"\n{BColors.FAIL}geodata : Geo INSEE manquant {geo_id}{BColors.ENDC}\n") +            else: +                geos.append(geo_datas[geo_id]) +        main_geo = values["main_geodata"][2] +        if main_geo not in geo_datas: +            sys.stdout.write(f"\n{BColors.FAIL}main_geodata : Geo INSEE manquant {main_geo}{BColors.ENDC}\n") +            values.pop(main_geo) +        else: +            values["main_geodata"] = geo_datas[main_geo] + +        q = Town.objects.filter(numero_insee=values["numero_insee"]) +        created = False +        if q.count(): +            q.update(**values) +            town = q.all()[0] +        else: +            created = True +            self.town_created += 1 +            town = Town.objects.create(**values) +        for geo in geos: +            town.geodata.add(geo) +        return town, created + +    def update_geodata(self, fields, geo_datas): +        numero_insee = fields.pop('source') +        if self.limit and not numero_insee.startswith(self.limit): +            return +        q = Town.objects.filter(numero_insee=numero_insee) +        values = { +            "provider": GeoProviderType.objects.get(txt_idx=fields["provider"]), +            "comment": fields["comment"], +            'multi_polygon': fields["multi_polygon"] +        } +        if q.count(): +            source_id = q.all()[0].pk +            q2 = GeoVectorData.objects.filter( +                source_id=source_id, +                source_content_type=town_content_type, +                data_type=town_data_type +            ) +            if q2.count(): +                geo = q2.all()[0] +                changed = False +                for k in values: +                    if k == "multi_polygon": +                        if geo.multi_polygon.wkt != values[k]: +                            setattr(geo, k, values[k]) +                            changed = True +                    elif getattr(geo, k) != values[k]: +                        setattr(geo, k, values[k]) +                        changed = True +                if changed: +                    self.geo_updated += 1 +                    geo.save() +                geo_datas[numero_insee] = geo +                return +        values.update({ +            "source_content_type": town_content_type, +            "data_type": town_data_type +        }) +        self.geo_created += 1 +        geo = GeoVectorData.objects.create(**values) +        geo_datas[numero_insee] = geo + +    def get_children(self, insee, towns, children): +        if insee not in towns: +            sys.stdout.write(f"\n{BColors.FAIL}children : INSEE manquant {insee}{BColors.ENDC}\n") +            return +        town = towns[insee] +        current_children = list(town.children.values_list("id", flat=True)) +        for child in children[insee]: +            if child not in towns: +                sys.stdout.write(f"\n{BColors.FAIL}children-child : INSEE manquant {insee}{BColors.ENDC}\n") +                continue +            if towns[child].id in current_children: +                continue +            self.nb_rels += 1 +            town.children.add(towns[child])  | 
