#!/usr/bin/env python # -*- coding: utf-8 -*- import sys from django.apps import apps from django.db import connection from django.conf import settings from django.contrib.contenttypes.models import ContentType from django.core.management.base import BaseCommand from django.utils.text import slugify from ishtar_common.models import ItemKey from ishtar_common.utils import get_log_time, get_percent, BColors def write_output(base_lbl, idx, total): lbl = f"\r{BColors.OKBLUE}[{get_percent(idx, total)}] {base_lbl} {idx + 1}/{total}" lbl += f"{BColors.ENDC}" sys.stdout.write(lbl) sys.stdout.flush() SQL_CHECK_DATABASE_EXISTS = """SELECT 1 FROM information_schema.tables WHERE table_type='BASE TABLE' AND table_schema='public' AND table_catalog='{database}' AND table_name='{table_name}';""" def migrate_item_key(clean_old=False, quiet=False): """ clean_old=False: set to True to migrate from non unicode to unicode """ SPACE = " " * 30 for app in ("ishtar_common", "archaeological_context_records", "archaeological_files", "archaeological_finds", "archaeological_operations", "archaeological_warehouse"): app_models = apps.get_app_config(app).get_models() is_init = False for model in app_models: if is_init: # no need to initialize break if any(1 for attr in ("available", "txt_idx", "comment", "available") if not hasattr(model, attr)): continue # not a general type content_type, __ = ContentType.objects.get_or_create( app_label=app, model=model._meta.model_name) sql = SQL_CHECK_DATABASE_EXISTS.format( database=settings.DATABASES["default"]["NAME"], table_name=model._meta.db_table) res = None with connection.cursor() as cursor: cursor.execute(sql) res = cursor.fetchall() if not res: # table is not created yet (database initialisation or migration) is_init = True break q = model.objects nb = q.count() for idx, item in enumerate(q.values_list("id", "label")): pk, label = item if not quiet: write_output(model._meta.verbose_name, idx, nb) if clean_old: ItemKey.objects.filter( key=slugify(label), content_type=content_type, importer_type=None, ishtar_import=None, user=None, group=None).delete() ItemKey.objects.get_or_create( key=slugify(label, allow_unicode=True), content_type=content_type, object_id=pk, importer_type=None, ishtar_import=None, user=None, group=None) lbl = f"\r{BColors.OKGREEN}* {model._meta.verbose_name} - OK{SPACE}{BColors.ENDC}\n" if not quiet: sys.stdout.write(lbl) class Command(BaseCommand): help = "Re-initialize ItemKey" def add_arguments(self, parser): parser.add_argument( "--quiet", dest="quiet", action="store_true", help="Quiet output" ) parser.add_argument( "--clean-old", dest="clean_old", action="store_true", help="V4.4 migration - clean old non-utf-8 keys" ) def handle(self, *args, **options): global quiet quiet = options["quiet"] if not quiet: sys.stdout.write(f"{BColors.OKGREEN}[{get_log_time()}] Processing{BColors.ENDC}") settings.USE_BACKGROUND_TASK = False migrate_item_key(clean_old=options["clean_old"], quiet=quiet)