#!/usr/bin/env python3 # -*- coding: utf-8 -*- from argparse import RawTextHelpFormatter import csv import datetime import json import os import tempfile import shutil import sys from django.apps import apps from django.conf import settings from django.contrib.auth.management import create_permissions from django.core.exceptions import FieldDoesNotExist from django.core.management.base import BaseCommand, CommandError from django.template.defaultfilters import slugify from ishtar_common import models_common from ishtar_common.utils import create_default_areas APPS = ( "ishtar_common", "archaeological_operations", "archaeological_context_records", "archaeological_files", "archaeological_finds", "archaeological_warehouse", ) log_path = os.sep.join([settings.ROOT_PATH, "logs"]) if not os.path.exists(log_path): os.mkdir(log_path, mode=0o770) def task_update_search_vectors(options): quiet = options.get("quiet", False) log = options.get("log", False) filtr = options.get("filter", "") for model in apps.get_models(): if options.get('model', None) and model.__name__ != options['model']: continue if not hasattr(model, "update_search_vector") or \ not (getattr(model, "BASE_SEARCH_VECTORS", None) or getattr(model, "INT_SEARCH_VECTORS", None) or getattr(model, "M2M_SEARCH_VECTORS", None) or getattr(model, "PARENT_SEARCH_VECTORS", None)): continue store_results = [] msg = "-> processing {}: ".format(model._meta.verbose_name) q = model.objects if filtr: q = q.filter(**filtr) ln = q.count() changed_nb = 0 for idx, obj_id in enumerate(q.values("pk").all()): obj = model.objects.get(pk=obj_id["pk"]) if not quiet: cmsg = "\r{} {}/{}".format(msg, idx + 1, ln) sys.stdout.write(cmsg) sys.stdout.flush() changed = obj.update_search_vector() if changed: if log: store_results.append( ( obj.pk, str(obj), ) ) changed_nb += 1 log_name = "update-search-vector-" + slugify(model.__name__) csv_cols = ["id", "name"] _end_task(changed_nb, msg, quiet, store_results, log, log_name, csv_cols) def task_check_cached_label(options): quiet = options.get("quiet", False) log = options.get("log", False) filtr = options.get("filter", "") for model in apps.get_models(): if options.get('model', None) and model.__name__ != options['model']: continue if model.__name__.startswith("Historical"): continue if hasattr(model, "CACHED_LABELS") and model.CACHED_LABELS: cached_keys = model.CACHED_LABELS elif hasattr(model, "cached_label") and "Basket" not in model.__name__: cached_keys = ["cached_label"] else: continue store_results = [] msg = "-> processing {}: ".format(model._meta.verbose_name) q = model.objects if filtr: q = q.filter(**filtr) ln = q.count() changed_nb = 0 for idx, obj_id in enumerate(q.values("pk").all()): obj = model.objects.get(pk=obj_id["pk"]) if not quiet: cmsg = "\r{} {}/{}".format(msg, idx + 1, ln) sys.stdout.write(cmsg) sys.stdout.flush() changed = False for cached_key in cached_keys: gen_func = "_generate_" + cached_key if not hasattr(obj, gen_func): continue lbl = getattr(obj, gen_func)() if lbl != getattr(obj, cached_key): setattr(obj, cached_key, lbl) if log: store_results.append( ( obj.pk, str(obj), cached_key, getattr(obj, cached_key), lbl, ) ) changed = True changed_nb += 1 if changed: obj.skip_history_when_saving = True obj._no_move = True obj._no_geo_check = True obj.save() log_name = "update-cached-label-" + slugify(model.__name__) csv_cols = ["id", "name", "attribute", "old", "new"] _end_task(changed_nb, msg, quiet, store_results, log, log_name, csv_cols) def task_update_external_id(options): quiet = options.get("quiet", False) filtr = options.get("filter", "") for model in apps.get_models(): if options.get('model', None) and model.__name__ != options['model']: continue if model.__name__.startswith("Historical"): continue if not bool( [k for k in dir(model) if k == "external_id"]): continue msg = "-> processing {}: ".format(model._meta.verbose_name) q = model.objects if filtr: q = q.filter(**filtr) ln = q.count() for idx, item in enumerate(q.all()): item.skip_history_when_saving = True item.external_id = "" item.complete_identifier = "" item._no_move = True if not quiet: cmsg = "\r{} {}/{}".format(msg, idx + 1, ln) sys.stdout.write(cmsg) sys.stdout.flush() item.save() if not quiet: sys.stdout.write("\n") def _end_task(changed_nb, msg, quiet, store_results, log, log_name, csv_cols): if not quiet: if changed_nb: cmsg = f"\r{msg} {changed_nb} updated" + 20 * " " + "\n" else: cmsg = " " * 80 sys.stdout.write(cmsg) if log and changed_nb: filename = f"{get_time().replace(':', '')}-{log_name}.csv" path = os.sep.join([log_path, filename]) with open(path, 'w+') as fle: writer = csv.writer(fle) writer.writerow(csv_cols) writer.writerows(store_results) if not quiet: sys.stdout.write(f"log: {path} written.\n") def task_main_image(options): quiet = options.get("quiet", False) filtr = options.get("filter", "") for model in apps.get_models(): if not issubclass(model, models_common.DocumentItem): continue if not hasattr(model, "main_image"): continue q = model.objects.filter( main_image__isnull=True, documents__image__isnull=False ).exclude(documents__image="") if filtr: q = q.filter(**filtr) nb = q.count() if not nb: # no image attached continue for item in q.all(): q_docs = item.documents.filter(image__isnull=False).exclude(image="") item.main_image = q_docs.order_by("pk").all()[0] item.skip_history_when_saving = True item.save() if not quiet: sys.stdout.write(f"{nb} main image fixed for {model.__name__}\n") def task_regenerate_qrcodes(options): quiet = options.get("quiet", False) secure = not options['no-https'] filtr = options.get("filter", "") for model in apps.get_models(): if options.get('model', None) and model.__name__ != options['model']: continue try: model._meta.get_field('qrcode') except FieldDoesNotExist: continue msg = "-> processing {}: ".format(model._meta.verbose_name) q = model.objects if filtr: q = q.filter(**filtr) ln = q.count() tmpdir = tempfile.mkdtemp("-qrcode") for idx, obj in enumerate(q.all()): obj.skip_history_when_saving = True obj._no_move = True if not quiet: cmsg = "\r{} {}/{}".format(msg, idx + 1, ln) sys.stdout.write(cmsg) sys.stdout.flush() obj.generate_qrcode(secure=secure, tmpdir=tmpdir) shutil.rmtree(tmpdir) if not quiet: sys.stdout.write("\n") def task_regenerate_permissions(options): for app in ['ishtar_common', 'archaeological_operations', 'archaeological_context_records', 'archaeological_finds', 'archaeological_warehouse']: create_permissions(apps.get_app_config(app)) def task_default_areas(options): verbose = not options.get("quiet", False) create_default_areas(verbose=verbose) def task_missing_parcels(options): quiet = options.get("quiet", False) Parcel = apps.get_model("archaeological_operations", "Parcel") q = Parcel.objects.filter(context_record__isnull=False, operation=None) nb = q.count() for idx, parcel in enumerate(q.all()): if not quiet: sys.stdout.write(f"\r[{percent(idx, nb)}] {idx + 1}/{nb}") # assume all context record associated to this parcel are of the same operation cr = parcel.context_record.all()[0] if parcel.operation: continue parcel.operation = cr.operation parcel.skip_history_when_saving = True parcel.save() if not quiet: sys.stdout.write("\n") if not options.get("clean", False): return q = Parcel.objects.filter(associated_file=None, operation=None) nb = q.count() q.delete() if not quiet: sys.stdout.write(f"{nb} orphan parcel deleted.\n") def percent(current, total): return f"{(current + 1) / total * 100:.1f}".rjust(4, "0") + "%" def get_time(): return datetime.datetime.now().isoformat().split(".")[0] def get_filter(filter_str): if not filter_str: return {} return json.loads(filter_str) TASKS = { "admin_permissions": { "help": "regenerate basic model permissions", "action": task_regenerate_permissions, }, "admin_default_areas": { "help": "create default areas from department and states", "action": task_default_areas, }, "fix_main_image": { "help": "for items with images and no main image, put the first one created as a main image", "action": task_main_image, }, "fix_missing_parcels": { "help": "fix lost parcel association on operation from context records. " "With --clean option delete orphan parcels.", "action": task_missing_parcels, }, "update_cached_label": { "help": "regenerate cached label", "action": task_check_cached_label, }, "update_qrcodes": { "help": "regenerate qrcodes", "action": task_regenerate_qrcodes, }, "update_search_vector": { "help": "regenerate search vectors", "action": task_update_search_vectors, }, "update_external_ids": { "help": "regenerate external and complete ID", "action": task_update_external_id, }, } class Command(BaseCommand): help = "Launch maintenance tasks" def parser_error(self, message=""): sys.stderr.write(f"{message}\n") self.parser.print_help() sys.exit(2) def create_parser(self, *args, **kwargs): parser = super(Command, self).create_parser(*args, **kwargs) parser.formatter_class = RawTextHelpFormatter self.parser = parser parser.error = self.parser_error return parser def add_arguments(self, parser): task_help = "\n".join( [f"{k} - {TASKS[k]['help']}" for k in sorted(TASKS.keys())] ) parser.add_argument("task", help=task_help) parser.add_argument( '--model', type=str, default='', dest='model', help='Specific model to update') parser.add_argument( '--filter', type=str, default='', dest='filter', help='Filter query') parser.add_argument( "--quiet", dest="quiet", action="store_true", help="Quiet output" ) parser.add_argument( "--log", dest="log", action="store_true", help="Log into a file" ) parser.add_argument( '--no-https', dest='no-https', action='store_true', default=False, help="[update_qrcodes] Use this parameter if https is not available") parser.add_argument( '--clean', dest='clean', action='store_true', default=False, help="[operation_missing_parcels] Delete orphan parcel after fix") parser.add_argument( '--test', dest='test', action='store_true', default=False, help="Used on tests: no sys.exit") def handle(self, *args, **options): settings.USE_BACKGROUND_TASK = False if options["test"]: options["quiet"] = True if options["task"] not in TASKS.keys(): msg = f"{options['task']} is not a valid task. Available tasks are:\n" msg += "\n".join(TASKS.keys()) raise CommandError(msg) if options["filter"] and not options["model"]: msg = "model option is mandatory if you use filter option" raise CommandError(msg) if options["filter"]: try: options["filter"] = get_filter(options.get("filter", "")) except json.decoder.JSONDecodeError: msg = f"\"{options['filter']}\" is not a valid filter: it must a valid JSON dict string" raise CommandError(msg) quiet = options["quiet"] if not quiet: sys.stdout.write(f"[{get_time()}] Processing task {options['task']}\n") errors = TASKS[options["task"]]["action"](options) if not errors: if not quiet: sys.stdout.write(f"\n[{get_time()}] Task {options['task']} finished\n") if options["test"]: return sys.exit() if not quiet: sys.stdout.write("\n".join(errors)) if options["test"]: return sys.exit(1)