1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
|
# -*- coding: utf-8 -*-
import datetime
import json
import os
import sys
import uuid
from django.core.files import File
from django.core.management import call_command
from django.db import connection
from django.utils.translation import ugettext_lazy
HOMEPAGE_TITLE = ugettext_lazy("Welcome in Ishtar, open source software for management and inventory of archaeological data")
def migrations_load_data(module_name, migration_filename):
json_path = os.sep.join(os.path.abspath(__file__).split(os.sep)[:-2] + [
module_name, "migrations", migration_filename])
call_command("loaddata", json_path)
def migrate_simple_image_to_m2m(base_model, image_model, rel_model, verbose=False):
missing, moved = 0, 0
for item in base_model.objects.all():
if not item.image:
continue
image_instance = image_model.objects.create()
try:
image_instance.image.save(
os.path.basename(item.image.path), File(open(item.image.path))
)
image_instance.thumbnail.save(
os.path.basename(item.thumbnail.path), File(open(item.thumbnail.path))
)
except IOError:
# image not on hard-drive
item.image = None
item.thumbnail = None
item.save()
image_instance.delete()
missing += 1
continue
image_instance.save()
rel_model.objects.create(item=item, image=image_instance, is_main=True)
# clean
item.image = None
item.thumbnail = None
item.save()
moved += 1
if verbose:
print("")
print(base_model)
print("{} missing".format(missing))
print("{} moved".format(moved))
def migrate_images(apps, base_model, rel_model):
IshtarImage = apps.get_model("ishtar_common", "IshtarImage")
Document = apps.get_model("ishtar_common", "Document")
for image_rel in rel_model.objects.order_by("is_main").all():
image = IshtarImage.objects.get(pk=image_rel.image.pk)
doc = Document.objects.create(image=image.image, thumbnail=image.thumbnail)
item = base_model.objects.get(pk=image_rel.item.pk)
item.documents.add(doc)
def migrate_sources(apps, base_model, source_model, item_attr):
Document = apps.get_model("ishtar_common", "Document")
for source in source_model.objects.all():
doc = Document.objects.create()
for attr in [
"title",
"index",
"external_id",
"reference",
"internal_reference",
"source_type",
"support_type",
"format_type",
"scale",
"associated_url",
"receipt_date",
"creation_date",
"receipt_date_in_documentation",
"item_number",
"description",
"comment",
"additional_information",
"duplicate",
]:
setattr(doc, attr, getattr(source, attr))
doc.save()
for author in source.authors.all():
doc.authors.add(author)
item = base_model.objects.get(pk=getattr(source, item_attr).pk)
item.documents.add(doc)
def reinit_last_modified(apps, app_name, models):
for model_name in models:
model = apps.get_model(app_name, model_name)
try:
historical_model = apps.get_model(app_name, "Historical" + model_name)
except LookupError:
continue
for item in model.objects.all():
q = historical_model.objects.filter(id=item.pk).order_by("-history_date")
if not q.count():
return
edit_date = q.all()[0].history_date
if not edit_date:
return
item.last_modified = edit_date
item.skip_history_when_saving = True
item.save()
def migrate_main_image(apps, app_name, model_name, verbose=False):
model = apps.get_model(app_name, model_name)
q = model.objects.filter(documents__image__isnull=False).exclude(
main_image__isnull=False
)
ln = q.count()
for idx, item in enumerate(q.all()):
if verbose:
if not idx:
sys.stdout.write("\n")
sys.stdout.write(
" * {}.{}: {}/{}\r".format(app_name, model_name, idx + 1, ln)
)
sys.stdout.flush()
q = item.documents.filter(image__isnull=False).exclude(image="")
if not q.count(): # no image
continue
# by default get the lowest pk
item.main_image = q.order_by("pk").all()[0]
item.skip_history_when_saving = True
item._no_move = True
item.save()
if verbose and ln:
sys.stdout.write("\n")
def m2m_historization_init(obj):
hist_values = obj.history_m2m or {}
for attr in obj.HISTORICAL_M2M:
values = []
for value in getattr(obj, attr).all():
if not hasattr(value, "history_compress"):
continue
values.append(value.history_compress())
hist_values[attr] = values
for hist in obj.history.all():
hist.history_m2m = hist_values
d = hist.history_date
date = datetime.datetime(
year=d.year,
month=d.month,
day=d.day,
hour=d.hour,
minute=d.minute,
second=d.second,
microsecond=d.microsecond,
)
hist.history_date = date
hist.last_modified = date
hist.save()
with connection.cursor() as cursor:
# not clean... but json fields seems to be not well managed by
# cursor.execute
cursor.execute(
'UPDATE "' + obj.__class__._meta.db_table + '" SET '
"history_m2m = '"
+ json.dumps(hist_values).replace("'", "''")
+ "'::json WHERE id = %s",
[obj.pk],
)
def migrate_uuid(model):
for obj in model.objects.all():
obj.uuid = uuid.uuid4()
obj.skip_history_when_saving = True
obj.save()
def set_uuid_helper(module, model_name):
def set_uuid(apps, schema_editor):
model = apps.get_model(module, model_name)
migrate_uuid(model)
return set_uuid
def migrate_created_field(apps, app_name, model_names):
for model_name in model_names:
model = apps.get_model(app_name, model_name)
try:
model_history = apps.get_model(app_name, "Historical" + model_name)
except LookupError:
continue
for item in model.objects.all():
q = model_history.objects.filter(id=item.pk).order_by("history_date")
if not q.count():
continue
history_date = q.all()[0].history_date
item.__class__.objects.filter(pk=item.pk).update(created=history_date)
|