#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2013-2025 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import datetime import os import sys from django.conf import settings from django.core.files import File from django.db.models import Q from django.utils.translation import ugettext_lazy as _ from ishtar_common.models_common import Town, Department from ishtar_common.utils import task from ishtar_common.serializers import full_serialization, restore_serialized from ishtar_common.models import ImportTask, ExportTask from archaeological_operations.models import Operation, ArchaeologicalSite from archaeological_context_records.models import ContextRecord from archaeological_finds.models import Find from archaeological_warehouse.models import Warehouse from archaeological_operations.views import get_operation, get_site from archaeological_context_records.views import get_contextrecord from archaeological_finds.views import get_find from archaeological_warehouse.views import get_warehouse @task() def trigger_error(): return 1 / 0 @task() def launch_import(import_task_id): try: import_task = ImportTask.objects.get(pk=import_task_id) except ImportTask.DoesNotExist: return if import_task.state != "S": return import_task.launch_date = datetime.datetime.now() import_task.state = "P" import_task.save() restore_serialized( import_task.source.path, import_task.import_user, delete_existing=import_task.delete_before, release_locks=import_task.releasing_locks, ) import_task.finished_date = datetime.datetime.now() import_task.state = "F" import_task.save() @task() def launch_export(export_task_id): try: export_task = ExportTask.objects.get(pk=export_task_id) except ExportTask.DoesNotExist: return if export_task.state != "S": return export_task.launch_date = datetime.datetime.now() export_task.state = "P" export_task.save() kwargs = { "info": {}, "put_locks": export_task.put_locks, "lock_user": export_task.lock_user, } for fltr_key in ( "export_types", "export_conf", "export_importers", "export_geo", "export_dir", "export_docs", "export_items", ): kwargs["info"][fltr_key] = getattr(export_task, fltr_key) kwargs[fltr_key] = getattr(export_task, fltr_key) if export_task.filter_type and export_task.filter_text: query = {"search_vector": export_task.filter_text} kwargs["info"]["query"] = {"filter": export_task.filter_text} if export_task.filter_type == "O": kwargs["info"]["query"]["model"] = "Operation" ids = list( get_operation(None, query=query, return_query=True).values_list( "id", flat=True ) ) if not ids: export_task.state = "F" export_task.result_info = str(_("No data to export")) export_task.save() return kwargs["operation_queryset"] = Operation.objects.filter(pk__in=ids) elif export_task.filter_type == "S": kwargs["info"]["query"]["model"] = "ArchaeologicalSite" ids = list( get_site(None, query=query, return_query=True).values_list( "id", flat=True ) ) if not ids: export_task.state = "F" export_task.result_info = str(_("No data to export")) export_task.save() return kwargs["site_queryset"] = ArchaeologicalSite.objects.filter(pk__in=ids) elif export_task.filter_type == "CR": kwargs["info"]["query"]["model"] = "ArchaeologicalSite" ids = list( get_contextrecord(None, query=query, return_query=True).values_list( "id", flat=True ) ) if not ids: export_task.state = "F" export_task.result_info = str(_("No data to export")) export_task.save() return kwargs["cr_queryset"] = ContextRecord.objects.filter(pk__in=ids) elif export_task.filter_type == "F": kwargs["info"]["query"]["model"] = "Find" ids = list( get_find(None, query=query, return_query=True).values_list( "id", flat=True ) ) if not ids: export_task.state = "F" export_task.result_info = str(_("No data to export")) export_task.save() return kwargs["find_queryset"] = Find.objects.filter(pk__in=ids) elif export_task.filter_type == "W": kwargs["info"]["query"]["model"] = "Warehouse" ids = list( get_warehouse(None, query=query, return_query=True).values_list( "id", flat=True ) ) if not ids: export_task.state = "F" export_task.result_info = str(_("No data to export")) export_task.save() return kwargs["warehouse_queryset"] = Warehouse.objects.filter(pk__in=ids) kwargs["info"]["geo"] = export_task.geo if not export_task.geo: kwargs["no_geo"] = True archive_name = full_serialization(**kwargs) result = open(archive_name, "rb") export_task.result.save(archive_name.split(os.sep)[-1], File(result)) os.remove(archive_name) export_task.finished_date = datetime.datetime.now() export_task.state = "F" export_task.result_info = str(_("Export finished")) export_task.save() def update_towns(): # TODO: remove? nb, updated = 0, 0 dpts = dict([(dpt.number, dpt) for dpt in Department.objects.all()]) q = Town.objects.filter(numero_insee__isnull=False) total = q.count() for idx, town in enumerate(q.all()): sys.stdout.write( "\rProcessing... %s/%d" % (str(idx + 1).zfill(len(str(total))), total) ) if len(town.numero_insee) < 2: continue dpt_code = town.numero_insee[:2] if dpt_code.startswith("9") and int(dpt_code) > 95: dpt_code = town.numero_insee[:3] if dpt_code not in dpts: sys.stdout.write("Missing department with INSEE code: %s" % dpt_code) continue if town.departement == dpts[dpt_code]: continue if town.departement: updated += 1 else: nb += 1 town.departement = dpts[dpt_code] town.save() sys.stdout.write("\n") return nb, updated