#!/usr/bin/env python # -*- coding: utf-8 -*- import csv import datetime import os import sys from django.conf import settings from django.contrib.contenttypes.models import ContentType from django.core.management.base import BaseCommand from ishtar_common.utils import ugettext_lazy as _, get_log_time, get_percent, get_eta from ishtar_common import models_common, models from archaeological_operations.models import Operation, ArchaeologicalSite from archaeological_context_records.models import ContextRecord from archaeological_finds.models import BaseFind from archaeological_warehouse.models import Warehouse, Container log_path = os.sep.join([settings.ROOT_PATH, "logs"]) if not os.path.exists(log_path): os.mkdir(log_path, mode=0o770) def write_output(model_name, idx, nb, ref_time=None): lbl = f"\r[{get_percent(idx, nb)}] Migrate {model_name}s {idx + 1}/{nb}" if ref_time: lbl += f" ({get_eta(idx, nb, ref_time, datetime.datetime.now())} left)" sys.stdout.write(lbl) sys.stdout.flush() def migrate(quiet=False, log=True): changed = [] # create towns q = models_common.Town.objects.exclude( center__isnull=True, limit__isnull=True ).exclude(main_geodata__isnull=False) nb = q.count() town_content_type = ContentType.objects.get(app_label="ishtar_common", model="town") data_type, __ = models_common.GeoDataType.objects.get_or_create( txt_idx="town-limit", defaults={"label": "Limites commune"} ) provider, __ = models_common.GeoProviderType.objects.get_or_create( txt_idx="france-ign", defaults={"label": "IGN"} ) ref_time = datetime.datetime.now() for idx, town in enumerate(q.all()): if not quiet: write_output("town", idx, nb, ref_time) attrs = { "name": town._generate_cached_label(), "source_content_type": town_content_type, "source_id": town.pk, "data_type": data_type, "provider": provider, } if town.limit: attrs["multi_polygon"] = town.limit else: attrs["point_2d"] = town.center data, created = models_common.GeoVectorData.objects.get_or_create(**attrs) if created: changed.append(["geovectordata", data.name, data.pk, "Création commune"]) town.main_geodata = data town.save() if not quiet and nb: sys.stdout.write(f"\r[{get_log_time()}] Towns migrated \n") sys.stdout.flush() model_list = [ ("operation", "opération", "de l'opération", Operation), ("archaeologicalsite", "site", "du site", ArchaeologicalSite), ] for model_slug, model_name, model_full_name, model in model_list: # manage operation vector sources model_content_type = ContentType.objects.get( app_label="archaeological_operations", model=model_slug ) q = model.objects.exclude(main_geodata__isnull=False) nb = q.count() data_type_area, __ = models_common.GeoDataType.objects.get_or_create( txt_idx=f"{model_slug}-area", defaults={"label": f"Emprise {model_full_name}"}, ) data_type_center, __ = models_common.GeoDataType.objects.get_or_create( txt_idx=f"{model_slug}-center", defaults={"label": f"Centre {model_full_name}"}, ) ref_time = datetime.datetime.now() for idx, obj in enumerate(q.all()): if not quiet: write_output(model_name, idx, nb, ref_time) obj._no_move = True obj.skip_history_when_saving = True obj.save() # auto manage geo town association q_towns = obj.towns.filter(main_geodata__multi_polygon__isnull=False) if q_towns.count() > 1: changed.append( [model_slug, str(obj), obj.pk, "Association géo de zone communale"] ) elif q_towns.count() == 1: changed.append( [model_slug, str(obj), obj.pk, "Association géo de commune"] ) if obj.multi_polygon_source == "P" and obj.multi_polygon: attrs = { "name": f"{_(model_name.capitalize())}{_(':')} {str(obj)}", "source_content_type": model_content_type, "source_id": obj.pk, "multi_polygon": obj.multi_polygon, "data_type": data_type_area, } data = models_common.GeoVectorData.objects.create(**attrs) obj.main_geodata = data obj.save() changed.append( [ "geovectordata", data.name, data.pk, f"Multi-polygone {model_name}", ] ) if obj.point_source == "P" and obj.point_2d: if obj.x and obj.y: attrs = { "name": f"{_(model_name.capitalize())}{_(':')} {str(obj)}", "source_content_type": model_content_type, "source_id": obj.pk, "data_type": data_type_center, "x": obj.x, "y": obj.y, "z": obj.z, } data = models_common.GeoVectorData.objects.create(**attrs) obj.main_geodata = data obj.save() changed.append( [ "geovectordata", data.name, data.pk, f"Coordonnées {model_name}", ] ) elif obj.point_2d: attrs = { "name": f"{_(model_name.capitalize())}{_(':')} {str(obj)}", "source_content_type": model_content_type, "source_id": obj.pk, "data_type": data_type_center, } if obj.point: attrs["point_3d"] = obj.point else: attrs["point_2d"] = obj.point_2d data = models_common.GeoVectorData.objects.create(**attrs) obj.main_geodata = data obj.save() changed.append( ["geovectordata", data.name, data.pk, f"Point {model_name}"] ) if not quiet and nb: sys.stdout.write( f"\r[{get_log_time()}] {model_name.capitalize()} migrated" + " " * 20 + "\n" ) sys.stdout.flush() model_list = [ ( "archaeological_context_records", "contextrecord", "unité d'enregistrement", "de l'unité d'enregistrement", ContextRecord, ), ( "archaeological_finds", "basefind", "mobilier d'origine", "du mobilier d'origine", BaseFind, ), ] for app, model_slug, model_name, model_full_name, model in model_list: model_content_type = ContentType.objects.get(app_label=app, model=model_slug) q = model.objects.exclude(main_geodata__isnull=False) nb = q.count() data_type_outline, __ = models_common.GeoDataType.objects.get_or_create( txt_idx=f"{model_slug}-outline", defaults={"label": f"Contour d'{model_name}"}, ) data_type_center, __ = models_common.GeoDataType.objects.get_or_create( txt_idx=f"{model_slug}-center", defaults={"label": f"Centre {model_full_name}"}, ) ref_time = datetime.datetime.now() for idx, obj in enumerate(q.all()): if not quiet: write_output(model_name, idx, nb, ref_time) obj._no_move = True obj.skip_history_when_saving = True obj.save() # auto manage geo town association if obj.main_geodata: changed.append( [model_slug, str(obj), obj.pk, "Association géo de zone communale"] ) if obj.multi_polygon_source == "P" and obj.multi_polygon: attrs = { "name": f"{_(model_name.capitalize())}{_(':')} {str(obj)}", "source_content_type": model_content_type, "source_id": obj.pk, "multi_polygon": obj.multi_polygon, "data_type": data_type_outline, } data = models_common.GeoVectorData.objects.create(**attrs) obj.main_geodata = data obj.save() changed.append( [ "geovectordata", data.name, data.pk, f"Multi-polygone {model_name}", ] ) if obj.point_source == "P" and obj.point_2d: if obj.x and obj.y: attrs = { "name": f"{_(model_name.capitalize())}{_(':')} {str(obj)}", "source_content_type": model_content_type, "source_id": obj.pk, "data_type": data_type_center, "x": obj.x, "y": obj.y, "z": obj.z, } data = models_common.GeoVectorData.objects.create(**attrs) obj.main_geodata = data obj.save() changed.append( [ "geovectordata", data.name, data.pk, f"Coordonnées {model_name}", ] ) elif obj.point_2d: attrs = { "name": f"{_(model_name.capitalize())}{_(':')} {str(obj)}", "source_content_type": model_content_type, "source_id": obj.pk, "data_type": data_type_center, } if obj.point: attrs["point_3d"] = obj.point else: attrs["point_2d"] = obj.point_2d data = models_common.GeoVectorData.objects.create(**attrs) obj.main_geodata = data obj.save() changed.append( ["geovectordata", data.name, data.pk, f"Point {model_name}"] ) if not quiet and nb: sys.stdout.write( f"\r[{get_log_time()}] {model_name.capitalize()} migrated" + " " * 20 + "\n" ) sys.stdout.flush() model_list = [Warehouse, Container] for model in model_list: ref_time = datetime.datetime.now() q = model.objects.exclude(main_geodata__isnull=False) nb = q.count() for idx, obj in enumerate(q.all()): if not quiet: write_output(model.__name__, idx, nb, ref_time) obj.save() if not quiet and nb: sys.stdout.write( f"\r[{get_log_time()}] {model.__name__.capitalize()} migrated" + " " * 20 + "\n" ) sys.stdout.flush() if log and changed: filename = f"geo_migration-created-{get_log_time().replace(':', '')}.csv" path = os.sep.join([log_path, filename]) with open(path, "w+") as fle: writer = csv.writer(fle) writer.writerow(["model", "name", "id", "context"]) for change in changed: writer.writerow(change) if not quiet: sys.stdout.write(f"[{get_log_time()}] Log: {path} written\n") class Command(BaseCommand): help = "Migrate to new geo data management" def add_arguments(self, parser): parser.add_argument( "--quiet", dest="quiet", action="store_true", help="Quiet output" ) parser.add_argument( "--log", dest="log", action="store_false", help="Log into a file" ) def handle(self, *args, **options): log = options["log"] quiet = options["quiet"] if not quiet: sys.stdout.write(f"[{get_log_time()}] Processing migration\n") errors = migrate(quiet=quiet, log=log) if not errors: if not quiet: sys.stdout.write(f"[{get_log_time()}] Migration finished\n") sys.exit() if not quiet: sys.stdout.write("\n".join(errors)) sys.exit(1)