#!/usr/bin/env python # -*- coding: utf-8 -*- import csv import datetime import os import sys from django.conf import settings from django.contrib.contenttypes.models import ContentType from django.core.management.base import BaseCommand from ishtar_common.utils import ugettext_lazy as _ from ishtar_common import models_common, models from archaeological_operations.models import Operation log_path = os.sep.join([settings.ROOT_PATH, "logs"]) if not os.path.exists(log_path): os.mkdir(log_path, mode=0o770) def migrate(quiet=False, log=True): changed = [] # create towns q = models_common.Town.objects.exclude( center__isnull=True, limit__isnull=True ).exclude(main_geodata__isnull=False) nb = q.count() town_content_type = ContentType.objects.get(app_label="ishtar_common", model="town") data_type, __ = models_common.GeoDataType.objects.get_or_create( txt_idx="town-limit", defaults={"label": "Limites commune"} ) provider, __ = models_common.GeoProviderType.objects.get_or_create( txt_idx="france-ign", defaults={"label": "IGN"} ) for idx, town in enumerate(q.all()): if not quiet: sys.stdout.write(f"\r[{percent(idx, nb)}] Migrate towns {idx + 1}/{nb}") sys.stdout.flush() attrs = { "name": town._generate_cached_label(), "source_content_type": town_content_type, "source_id": town.pk, "data_type": data_type, "provider": provider, } if town.limit: attrs["multi_polygon"] = town.limit else: attrs["point_2d"] = town.center data, created = models_common.GeoVectorData.objects.get_or_create(**attrs) if created: changed.append(["geovectordata", data.name, data.pk, "Création commune"]) town.main_geodata = data town.save() # manage operation vector sources operation_content_type = ContentType.objects.get( app_label="archaeological_operations", model="operation" ) q = Operation.objects.exclude(main_geodata__isnull=False) nb = q.count() for idx, operation in enumerate(q.all()): if operation.multi_polygon_source == "T": operation._no_move = True operation.skip_history_when_saving = True operation.save() # auto managed elif operation.multi_polygon_source == "P": # TODO """ q = operation.towns.filter(limit__isnull=False) nb = q.count if not nb: break elif nb == 1: town = q.all()[0] geo = models_common.GeoVectorData.objects.get( source_content_type=town_content_type, source_id=town.pk ) else: attrs = { "name": name, "source_content_type": town_content_type, "source_id": town.pk, "data_type": data_type, "provider": provider, "multi_polygon": poly, } geo, created = models_common.GeoVectorData.objects.get_or_create( **attrs) operation.main_geodata = geo operation.skip_history_when_saving = True operation._no_move = True operation.save() """ if log and changed: filename = f"geo_migration-created-{get_time().replace(':', '')}.txt" path = os.sep.join([log_path, filename]) with open(path, "w+") as fle: writer = csv.writer(fle) writer.writerow(["model", "name", "id", "context"]) for change in changed: writer.writerow(change) if not quiet: sys.stdout.write(f"log: {path} written.") if not quiet: sys.stdout.write("\n") def percent(current, total): return f"{(current + 1) / total * 100:.1f}".rjust(4, "0") + "%" def get_time(): return datetime.datetime.now().isoformat().split(".")[0] class Command(BaseCommand): help = "Migrate to new geo data management" def add_arguments(self, parser): parser.add_argument( "--quiet", dest="quiet", action="store_true", help="Quiet output" ) parser.add_argument( "--log", dest="log", action="store_false", help="Log into a file" ) def handle(self, *args, **options): log = options["log"] quiet = options["quiet"] if not quiet: sys.stdout.write(f"[{get_time()}] Processing migration\n") errors = migrate(quiet=quiet, log=log) if not errors: if not quiet: sys.stdout.write(f"[{get_time()}] Migration finished\n") sys.exit() if not quiet: sys.stdout.write("\n".join(errors)) sys.exit(1)