#!/usr/bin/env python # -*- coding: utf-8 -*- from argparse import RawTextHelpFormatter import csv import datetime import os import sys from django.apps import apps from django.conf import settings from django.core.management.base import BaseCommand, CommandError from django.template.defaultfilters import slugify from ishtar_common import models_common APPS = ( "ishtar_common", "archaeological_operations", "archaeological_context_records", "archaeological_files", "archaeological_finds", "archaeological_warehouse", ) log_path = os.sep.join([settings.ROOT_PATH, "logs"]) if not os.path.exists(log_path): os.mkdir(log_path, mode=0o770) def task_check_cached_label(quiet=False, log=False): for model in apps.get_models(): if model.__name__.startswith("Historical"): continue if hasattr(model, "CACHED_LABELS") and model.CACHED_LABELS: cached_keys = model.CACHED_LABELS elif hasattr(model, "cached_label") and "Basket" not in model.__name__: cached_keys = ["cached_label"] else: continue store_results = [] msg = "-> processing {}: ".format(model._meta.verbose_name) q = model.objects ln = q.count() changed_nb = 0 for idx, obj_id in enumerate(q.values("pk").all()): obj = model.objects.get(pk=obj_id["pk"]) if not quiet: cmsg = "\r{} {}/{}".format(msg, idx + 1, ln) sys.stdout.write(cmsg) sys.stdout.flush() changed = False for cached_key in cached_keys: gen_func = "_generate_" + cached_key if not hasattr(obj, gen_func): continue lbl = getattr(obj, gen_func)() if lbl != getattr(obj, cached_key): setattr(obj, cached_key, lbl) if log: store_results.append( ( obj.pk, str(obj), cached_key, getattr(obj, cached_key), lbl, ) ) changed = True changed_nb += 1 if changed: obj.skip_history_when_saving = True obj._no_move = True obj._no_geo_check = True obj.save() if not quiet: if changed_nb: cmsg = f"\r{msg} {changed_nb} updated" + 20 * " " + "\n" else: cmsg = " " * 80 sys.stdout.write(cmsg) if log and changed_nb: filename = f"{slugify(model.__name__)}-{get_time().replace(':', '')}.csv" path = os.sep.join([log_path, filename]) with open(path, 'w+') as fle: writer = csv.writer(fle) writer.writerow(["id", "name", "attribute", "old", "new"]) writer.writerows(store_results) if not quiet: sys.stdout.write(f"log: {path} written.") def task_main_image(quiet=False, log=False): for model in apps.get_models(): if not issubclass(model, models_common.DocumentItem): continue if not hasattr(model, "main_image"): continue q = model.objects.filter( main_image__isnull=True, documents__image__isnull=False ).exclude(documents__image="") nb = q.count() if not nb: # no image attached continue for item in q.all(): q_docs = item.documents.filter(image__isnull=False).exclude(image="") item.main_image = q_docs.order_by("pk").all()[0] item.skip_history_when_saving = True item.save() if not quiet: sys.stdout.write(f"{nb} main image fixed for {model.__name__}\n") def get_time(): return datetime.datetime.now().isoformat().split(".")[0] TASKS = { "main_image": { "help": "for items with images and no main image, put the first one created as a main image.", "action": task_main_image, }, "cached_label": { "help": "regenerate cached label on all tables if necessary", "action": task_check_cached_label, }, } class Command(BaseCommand): help = "Launch maintenance tasks: \n" + "\n".join( [f"* {t}: {TASKS[t]['help']}" for t in TASKS.keys()] ) def create_parser(self, *args, **kwargs): parser = super(Command, self).create_parser(*args, **kwargs) parser.formatter_class = RawTextHelpFormatter return parser def add_arguments(self, parser): parser.add_argument("task") parser.add_argument( "--quiet", dest="quiet", action="store_true", help="Quiet output" ) parser.add_argument( "--log", dest="log", action="store_true", help="Log into a file" ) def handle(self, *args, **options): if options["task"] not in TASKS.keys(): msg = f"{options['task']} is not a valid task. Available tasks are:\n" msg += "\n".join(TASKS.keys()) raise CommandError(msg) log = options["log"] quiet = options["quiet"] if not quiet: sys.stdout.write(f"[{get_time()}] Processing task {options['task']}\n") errors = TASKS[options["task"]]["action"](quiet=quiet, log=log) if not errors: if not quiet: sys.stdout.write(f"[{get_time()}] Task {options['task']} finished\n") sys.exit() if not quiet: sys.stdout.write("\n".join(errors)) sys.exit(1)