#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2025 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. from django.apps import apps from django.http import Http404 import html from urllib.parse import unquote_plus from rest_framework import serializers, status from rest_framework.response import Response from ishtar_common.data_importer import ImporterError from ishtar_common.models_common import GeneralType from ishtar_common.models_imports import ImporterType, ImportChunk from ishtar_common.rest import GISAPIView from ishtar_common.views_item import get_item class ImporterTypeSerializer(serializers.ModelSerializer): model = serializers.SerializerMethodField() unicity_keys = serializers.SerializerMethodField() class Meta: model = ImporterType fields = ['slug', 'name', 'description', 'is_import', 'is_template', 'model', 'unicity_keys'] def get_model(self, obj): if not obj.associated_models: return "" return str(obj.associated_models) def get_unicity_keys(self, obj): if not obj.unicity_keys: return [] return obj.unicity_keys.split(";") class GISSourceAPI(GISAPIView): model = ImporterType def get_queryset(self): user = self.request.user if not user.ishtaruser: return self.model.filter(pk__isnull=True) return self.model.q_qgis_importers(user.ishtaruser) def get(self, request, format=None): serializer = ImporterTypeSerializer(self.get_queryset(), many=True) return Response(serializer.data) class GenericTypeSerializer(serializers.Serializer): label = serializers.CharField(max_length=500) txt_idx = serializers.CharField(max_length=200) class GISTypeAPI(GISAPIView): def get(self, request, app, model, format=None): try: model = apps.get_model(app, model) except LookupError: raise Http404() if not issubclass(model, GeneralType): raise Http404() serializer = GenericTypeSerializer( model.objects.filter(available=True), many=True ) return Response(serializer.data) class GISBaseImportView: def get_importer(self, request, importer_slug, can_import=None): user = request.user q = ImporterType.q_qgis_importers(user.ishtaruser).filter(slug=importer_slug) if can_import: q = q.filter(is_import=True) else: q = q.filter(is_template=True) if not user.ishtaruser or not q.count(): return return q.all()[0] class GISExportAPI(GISBaseImportView, GISAPIView): PAGE_LEN = 50 def get(self, request, importer_slug, page=1, format=None): importer = self.get_importer(request, importer_slug) if not importer: return Response([]) query = None if request.GET and request.GET["query"]: query = unquote_plus(request.GET["query"]) dct = {"query": query, "length": self.PAGE_LEN} if page > 1: dct["start"] = (page - 1) * self.PAGE_LEN + 1 try: importer_class = importer.get_importer_class() import_key = importer.get_gis_import_key() cols, col_names = importer.get_columns(importer_class=importer_class) obj_name = importer_class.OBJECT_CLS.__name__.lower() return get_item(importer_class.OBJECT_CLS, "get_" + obj_name, obj_name, own_table_cols=cols)( request, data_type="json", full=False, force_own=False, no_link=True, col_names=col_names, col_types=importer.get_columns_types(), geo_import_key=import_key, **dct ) except ImporterError as e: return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) class ImportChunkSerializer(serializers.ModelSerializer): class Meta: model = ImportChunk exclude = [] class GISImportAPI(GISBaseImportView, GISAPIView): def post(self, request, importer_slug, format=None): importer = self.get_importer(request, importer_slug, can_import=True) if not importer: return Response({"error": "Importer not found"}, status=status.HTTP_400_BAD_REQUEST) data = {} for k in request.data: data[k] = request.data.get(k) data["importer"] = importer.pk data["user"] = request.user.ishtaruser.pk data["chunk"] = html.unescape(data["chunk"]) serializer = ImportChunkSerializer(data=data) if serializer.is_valid(): chunk = serializer.save() new_import = chunk.associate_chunks() chunk_slug = f"{importer_slug}-{data['send_datetime']}-{data['number']}" return Response( {"chunk": chunk_slug, "import": new_import.id if new_import else None}, status=status.HTTP_201_CREATED ) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)