summaryrefslogtreecommitdiff
path: root/ishtar_common/utils_migrations.py
blob: 72697f48b434ec58d406c38c9c28d33b1fb4780c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# -*- coding: utf-8 -*-
import datetime
import json
import os
import sys
import uuid

from django.core.files import File
from django.core.management import call_command
from django.db import connection


def migrations_load_data(module_name, migration_filename):
    json_path = os.sep.join(os.path.abspath(__file__).split(os.sep)[:-2] + [
        module_name, "migrations", migration_filename])
    call_command("loaddata", json_path)


def migrate_simple_image_to_m2m(base_model, image_model, rel_model, verbose=False):
    missing, moved = 0, 0
    for item in base_model.objects.all():
        if not item.image:
            continue

        image_instance = image_model.objects.create()

        try:
            image_instance.image.save(
                os.path.basename(item.image.path), File(open(item.image.path))
            )
            image_instance.thumbnail.save(
                os.path.basename(item.thumbnail.path), File(open(item.thumbnail.path))
            )
        except IOError:
            # image not on hard-drive
            item.image = None
            item.thumbnail = None
            item.save()
            image_instance.delete()
            missing += 1
            continue

        image_instance.save()

        rel_model.objects.create(item=item, image=image_instance, is_main=True)

        # clean
        item.image = None
        item.thumbnail = None
        item.save()
        moved += 1
    if verbose:
        print("")
        print(base_model)
        print("{} missing".format(missing))
        print("{} moved".format(moved))


def migrate_images(apps, base_model, rel_model):
    IshtarImage = apps.get_model("ishtar_common", "IshtarImage")
    Document = apps.get_model("ishtar_common", "Document")
    for image_rel in rel_model.objects.order_by("is_main").all():
        image = IshtarImage.objects.get(pk=image_rel.image.pk)
        doc = Document.objects.create(image=image.image, thumbnail=image.thumbnail)
        item = base_model.objects.get(pk=image_rel.item.pk)
        item.documents.add(doc)


def migrate_sources(apps, base_model, source_model, item_attr):
    Document = apps.get_model("ishtar_common", "Document")
    for source in source_model.objects.all():
        doc = Document.objects.create()
        for attr in [
            "title",
            "index",
            "external_id",
            "reference",
            "internal_reference",
            "source_type",
            "support_type",
            "format_type",
            "scale",
            "associated_url",
            "receipt_date",
            "creation_date",
            "receipt_date_in_documentation",
            "item_number",
            "description",
            "comment",
            "additional_information",
            "duplicate",
        ]:
            setattr(doc, attr, getattr(source, attr))
        doc.save()
        for author in source.authors.all():
            doc.authors.add(author)
        item = base_model.objects.get(pk=getattr(source, item_attr).pk)
        item.documents.add(doc)


def reinit_last_modified(apps, app_name, models):
    for model_name in models:
        model = apps.get_model(app_name, model_name)
        try:
            historical_model = apps.get_model(app_name, "Historical" + model_name)
        except LookupError:
            continue
        for item in model.objects.all():
            q = historical_model.objects.filter(id=item.pk).order_by("-history_date")
            if not q.count():
                return
            edit_date = q.all()[0].history_date
            if not edit_date:
                return
            item.last_modified = edit_date
            item.skip_history_when_saving = True
            item.save()


def migrate_main_image(apps, app_name, model_name, verbose=False):
    model = apps.get_model(app_name, model_name)
    q = model.objects.filter(documents__image__isnull=False).exclude(
        main_image__isnull=False
    )
    ln = q.count()
    for idx, item in enumerate(q.all()):
        if verbose:
            if not idx:
                sys.stdout.write("\n")
            sys.stdout.write(
                "  * {}.{}: {}/{}\r".format(app_name, model_name, idx + 1, ln)
            )
            sys.stdout.flush()
        q = item.documents.filter(image__isnull=False).exclude(image="")
        if not q.count():  # no image
            continue
        # by default get the lowest pk
        item.main_image = q.order_by("pk").all()[0]
        item.skip_history_when_saving = True
        item._no_move = True
        item.save()
    if verbose and ln:
        sys.stdout.write("\n")


def m2m_historization_init(obj):
    hist_values = obj.history_m2m or {}
    for attr in obj.HISTORICAL_M2M:
        values = []
        for value in getattr(obj, attr).all():
            if not hasattr(value, "history_compress"):
                continue
            values.append(value.history_compress())
        hist_values[attr] = values
    for hist in obj.history.all():
        hist.history_m2m = hist_values
        d = hist.history_date
        date = datetime.datetime(
            year=d.year,
            month=d.month,
            day=d.day,
            hour=d.hour,
            minute=d.minute,
            second=d.second,
            microsecond=d.microsecond,
        )
        hist.history_date = date
        hist.last_modified = date
        hist.save()
    with connection.cursor() as cursor:
        # not clean... but json fields seems to be not well managed by
        # cursor.execute
        cursor.execute(
            'UPDATE "' + obj.__class__._meta.db_table + '" SET '
            "history_m2m = '"
            + json.dumps(hist_values).replace("'", "''")
            + "'::json WHERE id = %s",
            [obj.pk],
        )


def migrate_uuid(model):
    for obj in model.objects.all():
        obj.uuid = uuid.uuid4()
        obj.skip_history_when_saving = True
        obj.save()


def set_uuid_helper(module, model_name):
    def set_uuid(apps, schema_editor):
        model = apps.get_model(module, model_name)
        migrate_uuid(model)

    return set_uuid


def migrate_created_field(apps, app_name, model_names):
    for model_name in model_names:
        model = apps.get_model(app_name, model_name)
        try:
            model_history = apps.get_model(app_name, "Historical" + model_name)
        except LookupError:
            continue
        for item in model.objects.all():
            q = model_history.objects.filter(id=item.pk).order_by("history_date")
            if not q.count():
                continue
            history_date = q.all()[0].history_date
            item.__class__.objects.filter(pk=item.pk).update(created=history_date)