#!/usr/bin/env python # -*- coding: utf-8 -*- import csv import datetime import os import sys from django.conf import settings from django.contrib.contenttypes.models import ContentType from django.core.management.base import BaseCommand from ishtar_common.utils import ugettext_lazy as _ from ishtar_common import models_common, models from archaeological_operations.models import Operation log_path = os.sep.join([settings.ROOT_PATH, "logs"]) if not os.path.exists(log_path): os.mkdir(log_path, mode=0o770) def migrate(quiet=False, log=True): changed = [] # create towns q = models_common.Town.objects.exclude( center__isnull=True, limit__isnull=True ).exclude(main_geodata__isnull=False) nb = q.count() town_content_type = ContentType.objects.get(app_label="ishtar_common", model="town") data_type, __ = models_common.GeoDataType.objects.get_or_create( txt_idx="town-limit", defaults={"label": "Limites commune"} ) provider, __ = models_common.GeoProviderType.objects.get_or_create( txt_idx="france-ign", defaults={"label": "IGN"} ) for idx, town in enumerate(q.all()): if not quiet: sys.stdout.write(f"\r[{percent(idx, nb)}] Migrate towns {idx + 1}/{nb}") sys.stdout.flush() attrs = { "name": town._generate_cached_label(), "source_content_type": town_content_type, "source_id": town.pk, "data_type": data_type, "provider": provider, } if town.limit: attrs["multi_polygon"] = town.limit else: attrs["point_2d"] = town.center data, created = models_common.GeoVectorData.objects.get_or_create(**attrs) if created: changed.append(["geovectordata", data.name, data.pk, "Création commune"]) town.main_geodata = data town.save() # manage operation vector sources operation_content_type = ContentType.objects.get( app_label="archaeological_operations", model="operation" ) q = Operation.objects.exclude(main_geodata__isnull=False) nb = q.count() data_type_area, __ = models_common.GeoDataType.objects.get_or_create( txt_idx="operation-area", defaults={"label": "Emprise de l'opération"} ) data_type_center, __ = models_common.GeoDataType.objects.get_or_create( txt_idx="operation-center", defaults={"label": "Centre de l'opération"} ) for idx, operation in enumerate(q.all()): if not quiet: sys.stdout.write(f"\r[{percent(idx, nb)}] Migrate operations {idx + 1}/{nb}") sys.stdout.flush() operation._no_move = True operation.skip_history_when_saving = True operation.save() # auto manage geo town association q_towns = operation.towns.filter(main_geodata__multi_polygon__isnull=False) if q_towns.count() > 1: changed.append( ["operation", str(operation), operation.pk, "Association géo de zone communale"]) elif q_towns.count() == 1: changed.append( ["operation", str(operation), operation.pk, "Association géo de commune"]) if operation.multi_polygon_source == "P" and operation.multi_polygon: attrs = { "name": f"{_('Operation')}{_(':')} {str(operation)}", "source_content_type": operation_content_type, "source_id": operation.pk, "multi_polygon": operation.multi_polygon, "data_type": data_type_area, } data = models_common.GeoVectorData.objects.create(**attrs) operation.main_geodata = data operation.save() changed.append( ["geovectordata", data.name, data.pk, "Multi-polygone opération"]) if operation.point_source == "P" and operation.point_2d: if operation.x and operation.y: attrs = { "name": f"{_('Operation')}{_(':')} {str(operation)}", "source_content_type": operation_content_type, "source_id": operation.pk, "data_type": data_type_center, "x": operation.x, "y": operation.y, "z": operation.z, } data = models_common.GeoVectorData.objects.create(**attrs) operation.main_geodata = data operation.save() changed.append( ["geovectordata", data.name, data.pk, "Coordonnées opération"]) elif operation.point_2d: attrs = { "name": f"{_('Operation')}{_(':')} {str(operation)}", "source_content_type": operation_content_type, "source_id": operation.pk, "data_type": data_type_center, } if operation.point: attrs["point_3d"] = operation.point else: attrs["point_2d"] = operation.point_2d data = models_common.GeoVectorData.objects.create(**attrs) operation.main_geodata = data operation.save() changed.append( ["geovectordata", data.name, data.pk, "Point opération"]) if log and changed: filename = f"geo_migration-created-{get_time().replace(':', '')}.txt" path = os.sep.join([log_path, filename]) with open(path, "w+") as fle: writer = csv.writer(fle) writer.writerow(["model", "name", "id", "context"]) for change in changed: writer.writerow(change) if not quiet: sys.stdout.write(f"log: {path} written.") if not quiet: sys.stdout.write("\n") def percent(current, total): return f"{(current + 1) / total * 100:.1f}".rjust(4, "0") + "%" def get_time(): return datetime.datetime.now().isoformat().split(".")[0] class Command(BaseCommand): help = "Migrate to new geo data management" def add_arguments(self, parser): parser.add_argument( "--quiet", dest="quiet", action="store_true", help="Quiet output" ) parser.add_argument( "--log", dest="log", action="store_false", help="Log into a file" ) def handle(self, *args, **options): log = options["log"] quiet = options["quiet"] if not quiet: sys.stdout.write(f"[{get_time()}] Processing migration\n") errors = migrate(quiet=quiet, log=log) if not errors: if not quiet: sys.stdout.write(f"[{get_time()}] Migration finished\n") sys.exit() if not quiet: sys.stdout.write("\n".join(errors)) sys.exit(1)