# -*- coding: utf-8 -*- import datetime import json import os import sys import uuid from django.core.files import File from django.db import connection def migrate_simple_image_to_m2m(base_model, image_model, rel_model, verbose=False): missing, moved = 0, 0 for item in base_model.objects.all(): if not item.image: continue image_instance = image_model.objects.create() try: image_instance.image.save( os.path.basename(item.image.path), File(open(item.image.path)) ) image_instance.thumbnail.save( os.path.basename(item.thumbnail.path), File(open(item.thumbnail.path)) ) except IOError: # image not on hard-drive item.image = None item.thumbnail = None item.save() image_instance.delete() missing += 1 continue image_instance.save() rel_model.objects.create(item=item, image=image_instance, is_main=True) # clean item.image = None item.thumbnail = None item.save() moved += 1 if verbose: print("") print(base_model) print("{} missing".format(missing)) print("{} moved".format(moved)) def migrate_images(apps, base_model, rel_model): IshtarImage = apps.get_model("ishtar_common", "IshtarImage") Document = apps.get_model("ishtar_common", "Document") for image_rel in rel_model.objects.order_by("is_main").all(): image = IshtarImage.objects.get(pk=image_rel.image.pk) doc = Document.objects.create(image=image.image, thumbnail=image.thumbnail) item = base_model.objects.get(pk=image_rel.item.pk) item.documents.add(doc) def migrate_sources(apps, base_model, source_model, item_attr): Document = apps.get_model("ishtar_common", "Document") for source in source_model.objects.all(): doc = Document.objects.create() for attr in [ "title", "index", "external_id", "reference", "internal_reference", "source_type", "support_type", "format_type", "scale", "associated_url", "receipt_date", "creation_date", "receipt_date_in_documentation", "item_number", "description", "comment", "additional_information", "duplicate", ]: setattr(doc, attr, getattr(source, attr)) doc.save() for author in source.authors.all(): doc.authors.add(author) item = base_model.objects.get(pk=getattr(source, item_attr).pk) item.documents.add(doc) def reinit_last_modified(apps, app_name, models): for model_name in models: model = apps.get_model(app_name, model_name) try: historical_model = apps.get_model(app_name, "Historical" + model_name) except: continue for item in model.objects.all(): q = historical_model.objects.filter(id=item.pk).order_by("-history_date") if not q.count(): return edit_date = q.all()[0].history_date if not edit_date: return item.last_modified = edit_date item.skip_history_when_saving = True item.save() def migrate_main_image(apps, app_name, model_name, verbose=False): model = apps.get_model(app_name, model_name) q = model.objects.filter(documents__image__isnull=False).exclude( main_image__isnull=False ) ln = q.count() for idx, item in enumerate(q.all()): if verbose: if not idx: sys.stdout.write("\n") sys.stdout.write( " * {}.{}: {}/{}\r".format(app_name, model_name, idx + 1, ln) ) sys.stdout.flush() q = item.documents.filter(image__isnull=False).exclude(image="") if not q.count(): # no image continue # by default get the lowest pk item.main_image = q.order_by("pk").all()[0] item.skip_history_when_saving = True item._no_move = True item.save() if verbose and ln: sys.stdout.write("\n") def m2m_historization_init(obj): hist_values = obj.history_m2m or {} for attr in obj.HISTORICAL_M2M: values = [] for value in getattr(obj, attr).all(): if not hasattr(value, "history_compress"): continue values.append(value.history_compress()) hist_values[attr] = values for hist in obj.history.all(): hist.history_m2m = hist_values d = hist.history_date date = datetime.datetime( year=d.year, month=d.month, day=d.day, hour=d.hour, minute=d.minute, second=d.second, microsecond=d.microsecond, ) hist.history_date = date hist.last_modified = date hist.save() with connection.cursor() as cursor: # not clean... but json fields seems to be not well managed by # cursor.execute cursor.execute( 'UPDATE "' + obj.__class__._meta.db_table + '" SET ' "history_m2m = '" + json.dumps(hist_values).replace("'", "''") + "'::json WHERE id = %s", [obj.pk], ) def migrate_uuid(model): for obj in model.objects.all(): obj.uuid = uuid.uuid4() obj.skip_history_when_saving = True obj.save() def set_uuid_helper(module, model_name): def set_uuid(apps, schema_editor): model = apps.get_model(module, model_name) migrate_uuid(model) return set_uuid def migrate_created_field(apps, app_name, model_names): for model_name in model_names: model = apps.get_model(app_name, model_name) try: model_history = apps.get_model(app_name, "Historical" + model_name) except: continue for item in model.objects.all(): q = model_history.objects.filter(id=item.pk).order_by("history_date") if not q.count(): continue history_date = q.all()[0].history_date item.__class__.objects.filter(pk=item.pk).update(created=history_date)