1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (C) 2025 Étienne Loks <etienne.loks at iggdrasil dot net>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# See the file COPYING for details.
from django.apps import apps
from django.http import Http404
from urllib.parse import unquote_plus
from rest_framework import serializers, status
from rest_framework.response import Response
from ishtar_common.models_common import GeneralType
from ishtar_common.models_imports import ImporterType, ImportChunk
from ishtar_common.rest import GISAPIView
from ishtar_common.views_item import get_item
class ImporterTypeSerializer(serializers.ModelSerializer):
model = serializers.SerializerMethodField()
class Meta:
model = ImporterType
fields = ['slug', 'name', 'description', 'is_import', 'is_template', 'model']
def get_model(self, obj):
if not obj.associated_models:
return ""
return str(obj.associated_models)
class GISSourceAPI(GISAPIView):
model = ImporterType
def get_queryset(self):
user = self.request.user
if not user.ishtaruser:
return self.model.filter(pk__isnull=True)
return self.model.q_qgis_importers(user.ishtaruser)
def get(self, request, format=None):
serializer = ImporterTypeSerializer(self.get_queryset(), many=True)
return Response(serializer.data)
class GenericTypeSerializer(serializers.Serializer):
label = serializers.CharField(max_length=500)
txt_idx = serializers.CharField(max_length=200)
class GISTypeAPI(GISAPIView):
def get(self, request, app, model, format=None):
try:
model = apps.get_model(app, model)
except LookupError:
raise Http404()
if not issubclass(model, GeneralType):
raise Http404()
serializer = GenericTypeSerializer(
model.objects.filter(available=True), many=True
)
return Response(serializer.data)
class GISBaseImportView:
def get_importer(self, request, importer_slug):
user = request.user
q = ImporterType.q_qgis_importers(user.ishtaruser).filter(slug=importer_slug)
if not user.ishtaruser or not q.count():
return
return q.all()[0]
class GISExportAPI(GISBaseImportView, GISAPIView):
PAGE_LEN = 50
def get(self, request, importer_slug, page=1, format=None):
importer = self.get_importer(request, importer_slug)
if not importer:
return Response([])
query = None
if request.GET and request.GET["query"]:
query = unquote_plus(request.GET["query"])
dct = {"query": query, "length": self.PAGE_LEN}
if page > 1:
dct["start"] = (page - 1) * self.PAGE_LEN + 1
importer_class = importer.get_importer_class()
cols, col_names = importer.get_columns(importer_class=importer_class)
obj_name = importer_class.OBJECT_CLS.__name__.lower()
return get_item(importer_class.OBJECT_CLS, "get_" + obj_name, obj_name,
own_table_cols=cols)(
request, data_type="json", full=False, force_own=False,
no_link=True, col_names=col_names,
col_types=importer.get_columns_types(),
**dct
)
class ImportChunkSerializer(serializers.ModelSerializer):
class Meta:
model = ImportChunk
exclude = []
class GISImportAPI(GISBaseImportView, GISAPIView):
def post(self, request, importer_slug, format=None):
importer = self.get_importer(request, importer_slug)
if not importer:
return Response({"error": "Importer not found"},
status=status.HTTP_400_BAD_REQUEST)
data = request.data
data.importer = importer
data.user = request.user.ishtaruser
serializer = ImportChunkSerializer(data=data)
if serializer.is_valid():
serializer.save()
chunk_slug = f"{importer_slug}-{serializer.send_datetime}-{serializer.number}"
return Response({"chunk": chunk_slug}, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|