# -*- coding: utf-8 -*-
import datetime
import json
import os
import sys
import uuid
from django.core.files import File
from django.core.management import call_command
from django.db import connection
from django.utils.translation import ugettext_lazy
HOMEPAGE_TITLE = ugettext_lazy("Welcome in Ishtar, open source software for management and inventory of archaeological data")
HOMEPAGE_DEFAULT = """{% load i18n %}
{% trans "Some useful links:" %}
"""
def migrations_load_data(module_name, migration_filename):
json_path = os.sep.join(os.path.abspath(__file__).split(os.sep)[:-2] + [
module_name, "migrations", migration_filename])
call_command("loaddata", json_path)
def migrate_simple_image_to_m2m(base_model, image_model, rel_model, verbose=False):
missing, moved = 0, 0
for item in base_model.objects.all():
if not item.image:
continue
image_instance = image_model.objects.create()
try:
image_instance.image.save(
os.path.basename(item.image.path), File(open(item.image.path))
)
image_instance.thumbnail.save(
os.path.basename(item.thumbnail.path), File(open(item.thumbnail.path))
)
except IOError:
# image not on hard-drive
item.image = None
item.thumbnail = None
item.save()
image_instance.delete()
missing += 1
continue
image_instance.save()
rel_model.objects.create(item=item, image=image_instance, is_main=True)
# clean
item.image = None
item.thumbnail = None
item.save()
moved += 1
if verbose:
print("")
print(base_model)
print("{} missing".format(missing))
print("{} moved".format(moved))
def migrate_images(apps, base_model, rel_model):
IshtarImage = apps.get_model("ishtar_common", "IshtarImage")
Document = apps.get_model("ishtar_common", "Document")
for image_rel in rel_model.objects.order_by("is_main").all():
image = IshtarImage.objects.get(pk=image_rel.image.pk)
doc = Document.objects.create(image=image.image, thumbnail=image.thumbnail)
item = base_model.objects.get(pk=image_rel.item.pk)
item.documents.add(doc)
def migrate_sources(apps, base_model, source_model, item_attr):
Document = apps.get_model("ishtar_common", "Document")
for source in source_model.objects.all():
doc = Document.objects.create()
for attr in [
"title",
"index",
"external_id",
"reference",
"internal_reference",
"source_type",
"support_type",
"format_type",
"scale",
"associated_url",
"receipt_date",
"creation_date",
"receipt_date_in_documentation",
"item_number",
"description",
"comment",
"additional_information",
"duplicate",
]:
setattr(doc, attr, getattr(source, attr))
doc.save()
for author in source.authors.all():
doc.authors.add(author)
item = base_model.objects.get(pk=getattr(source, item_attr).pk)
item.documents.add(doc)
def reinit_last_modified(apps, app_name, models):
for model_name in models:
model = apps.get_model(app_name, model_name)
try:
historical_model = apps.get_model(app_name, "Historical" + model_name)
except LookupError:
continue
for item in model.objects.all():
q = historical_model.objects.filter(id=item.pk).order_by("-history_date")
if not q.count():
return
edit_date = q.all()[0].history_date
if not edit_date:
return
item.last_modified = edit_date
item.skip_history_when_saving = True
item.save()
def migrate_main_image(apps, app_name, model_name, verbose=False):
model = apps.get_model(app_name, model_name)
q = model.objects.filter(documents__image__isnull=False).exclude(
main_image__isnull=False
)
ln = q.count()
for idx, item in enumerate(q.all()):
if verbose:
if not idx:
sys.stdout.write("\n")
sys.stdout.write(
" * {}.{}: {}/{}\r".format(app_name, model_name, idx + 1, ln)
)
sys.stdout.flush()
q = item.documents.filter(image__isnull=False).exclude(image="")
if not q.count(): # no image
continue
# by default get the lowest pk
item.main_image = q.order_by("pk").all()[0]
item.skip_history_when_saving = True
item._no_move = True
item.save()
if verbose and ln:
sys.stdout.write("\n")
def m2m_historization_init(obj):
hist_values = obj.history_m2m or {}
for attr in obj.HISTORICAL_M2M:
values = []
for value in getattr(obj, attr).all():
if not hasattr(value, "history_compress"):
continue
values.append(value.history_compress())
hist_values[attr] = values
for hist in obj.history.all():
hist.history_m2m = hist_values
d = hist.history_date
date = datetime.datetime(
year=d.year,
month=d.month,
day=d.day,
hour=d.hour,
minute=d.minute,
second=d.second,
microsecond=d.microsecond,
)
hist.history_date = date
hist.last_modified = date
hist.save()
with connection.cursor() as cursor:
# not clean... but json fields seems to be not well managed by
# cursor.execute
cursor.execute(
'UPDATE "' + obj.__class__._meta.db_table + '" SET '
"history_m2m = '"
+ json.dumps(hist_values).replace("'", "''")
+ "'::json WHERE id = %s",
[obj.pk],
)
def migrate_uuid(model):
for obj in model.objects.all():
obj.uuid = uuid.uuid4()
obj.skip_history_when_saving = True
obj.save()
def set_uuid_helper(module, model_name):
def set_uuid(apps, schema_editor):
model = apps.get_model(module, model_name)
migrate_uuid(model)
return set_uuid
def migrate_created_field(apps, app_name, model_names):
for model_name in model_names:
model = apps.get_model(app_name, model_name)
try:
model_history = apps.get_model(app_name, "Historical" + model_name)
except LookupError:
continue
for item in model.objects.all():
q = model_history.objects.filter(id=item.pk).order_by("history_date")
if not q.count():
continue
history_date = q.all()[0].history_date
item.__class__.objects.filter(pk=item.pk).update(created=history_date)