summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorÉtienne Loks <etienne.loks@iggdrasil.net>2025-05-01 17:33:17 +0200
committerÉtienne Loks <etienne.loks@iggdrasil.net>2025-10-15 19:32:59 +0200
commit99ca4cbcf573cdd80ab525d573b1f0e3ab738057 (patch)
treeef54ff649355b574b6a0af5789d5aabc73bcace2
parent22e1f1f2a167b362ccaa3629c66c64d55aca8dae (diff)
downloadIshtar-99ca4cbcf573cdd80ab525d573b1f0e3ab738057.tar.bz2
Ishtar-99ca4cbcf573cdd80ab525d573b1f0e3ab738057.zip
🗃️ GIS API: database migrations - some logic to manage token request
-rw-r--r--example_project/settings.py3
-rw-r--r--ishtar_common/admin.py13
-rw-r--r--ishtar_common/models.py1
-rw-r--r--ishtar_common/models_rest.py125
-rw-r--r--ishtar_common/tests.py87
5 files changed, 225 insertions, 4 deletions
diff --git a/example_project/settings.py b/example_project/settings.py
index ba88e842a..feb994af8 100644
--- a/example_project/settings.py
+++ b/example_project/settings.py
@@ -285,7 +285,8 @@ ISHTAR_DEFAULT_YEAR = 1900
ISHTAR_MUSEUM_GAM = False # France - AlimGAM export of exhibitions
# exclude business days from deadline calculation
ISHTAR_FILE_EXCLUDE_BUSSINESS_DAYS = True
-
+# timeout for request token (GIS connector)
+ISHTAR_REQUEST_TOKEN_TIMEOUT = 60*10 # 10 minutes
ISHTAR_SLUGS = {
"document-publisher": ["publisher"],
}
diff --git a/ishtar_common/admin.py b/ishtar_common/admin.py
index 9b0bf8fac..f7cdcc867 100644
--- a/ishtar_common/admin.py
+++ b/ishtar_common/admin.py
@@ -636,6 +636,7 @@ class IshtarSiteProfileAdmin(admin.ModelAdmin):
"classes": ("collapse",),
"fields": (
"experimental_feature",
+ "gis_connector",
"calculate_weight_on_full",
"locate_warehouses",
"use_town_for_geo",
@@ -2897,6 +2898,18 @@ class DocumentTemplateAdmin(admin.ModelAdmin):
admin_site.register(models.DocumentTemplate, DocumentTemplateAdmin)
+@admin.register(models_rest.UserRequestToken, site=admin_site)
+class UserRequestTokenAdmin(admin.ModelAdmin):
+ list_display = ("user", "access_type", "created")
+ readonly_fields = ("key",)
+
+
+@admin.register(models_rest.UserToken, site=admin_site)
+class UserTokenAdmin(admin.ModelAdmin):
+ list_display = ("user", "access_type", "name", "last_ip", "last_access")
+ readonly_fields = ("key", "last_ip", "last_access")
+
+
class ApiUserAdmin(admin.ModelAdmin):
list_display = ("user_ptr", "ip")
diff --git a/ishtar_common/models.py b/ishtar_common/models.py
index 9cc61742a..66c29e08d 100644
--- a/ishtar_common/models.py
+++ b/ishtar_common/models.py
@@ -1195,6 +1195,7 @@ class IshtarSiteProfile(models.Model, Cached):
preventive_operator = models.BooleanField(
_("Preventive operator module"), default=False
)
+ gis_connector = models.BooleanField(_("GIS connector"), default=False)
underwater = models.BooleanField(_("Underwater module"), default=False)
no_context_button = models.ForeignKey(
"archaeological_context_records.ContextRecord",
diff --git a/ishtar_common/models_rest.py b/ishtar_common/models_rest.py
index 9ed9abd3e..e26968daf 100644
--- a/ishtar_common/models_rest.py
+++ b/ishtar_common/models_rest.py
@@ -8,13 +8,16 @@ from django.contrib.contenttypes.models import ContentType
from django.contrib.gis.db import models
from django.contrib.postgres.fields import ArrayField
from django.core.files import File
+from django.utils import timezone
from django.utils.text import slugify
from django.apps import apps
from django.template import loader
from ishtar_common.models_common import BaseSheetFilter
-from ishtar_common.utils import gettext_lazy as _
+from ishtar_common.utils import get_current_profile, gettext_lazy as _
+
+from rest_framework.authtoken.models import Token
UnoCalc = None
ITALIC = None
@@ -26,9 +29,127 @@ if settings.USE_LIBREOFFICE:
pass
+API_ACCESS_TYPES = (
+ ('R', _("GIS - read")),
+ ('W', _("GIS - read/write")),
+)
+
+API_ACCESS_TYPES_DICT = dict(API_ACCESS_TYPES)
+
+
+class UserRequestToken(models.Model):
+ """
+ Temporary key to request an API key
+ """
+ created = models.DateTimeField(_("Created"), auto_now_add=True)
+ key = models.CharField(_("Key"), max_length=6, unique=True)
+ user = models.OneToOneField(
+ User, primary_key=True, related_name="user_request_token",
+ on_delete=models.CASCADE,
+ verbose_name=_("User")
+ ) # only one request at a time
+ access_type = models.CharField(_("Access type"), default="R",
+ choices=API_ACCESS_TYPES, max_length=1)
+ name = models.TextField(_("Name"))
+ limit_date = models.DateField(_("Limit date"), blank=True, null=True)
+
+ class Meta:
+ verbose_name = _("API - GIS - Request token")
+ verbose_name_plural = _("API - GIS - Request tokens")
+ ADMIN_SECTION = _("API")
+
+ @classmethod
+ def clean_keys(cls):
+ profile = get_current_profile()
+ if not profile.gis_connector:
+ cls.objects.all().delete()
+ return
+ timeout = timezone.now() - datetime.timedelta(
+ seconds=settings.ISHTAR_REQUEST_TOKEN_TIMEOUT)
+ cls.objects.filter(created__lte=timeout).delete()
+
+ def generate_token(self, app_key, from_ip="127.0.0.1"):
+ """
+ Create API token if not timeouted.
+ app_key is the QGIS ID given by the QGIS auth db
+ """
+ profile = get_current_profile()
+ timeout = timezone.now() - self.created
+ if not profile.gis_connector or timeout.seconds > settings.ISHTAR_REQUEST_TOKEN_TIMEOUT or (
+ self.limit_date and self.limit_date < datetime.today()):
+ self.delete()
+ return
+ app_key = f"{app_key[:7]:0<7}" # limit to 7 char and fill with 0
+ key = app_key + Token.generate_key()[7:] # integrate QGIS ID with api key
+ # this is only intended to prevent bad practice such as
+ # sharing different API key with several devices and users
+ # or naive key compromission. A saisoned user can bypass with
+ # a modified QGIS auth db or plugin.
+ token = UserToken(
+ user=self.user, access_type=self.access_type,
+ limit_date=self.limit_date, last_ip=from_ip,
+ key=key, name=self.name
+ )
+ token.save()
+ self.delete()
+ return token
+
+ def save(self, *args, **kwargs):
+ while not self.key:
+ key = Token.generate_key()[:6]
+ # very unlikely but could occurs
+ if UserRequestToken.objects.filter(key=key).count():
+ continue
+ self.key = key
+ return super().save(*args, **kwargs)
+
+
+class UserToken(models.Model):
+ """
+ GIS token used for QGIS access.
+ Token is an aggregation of a standard API key and QGIS auth DB key (specific for
+ one QGIS installation)
+ """
+ key = models.CharField(_("Key"), max_length=40, primary_key=True)
+ user = models.ForeignKey(
+ settings.AUTH_USER_MODEL, related_name='user_token',
+ on_delete=models.CASCADE, verbose_name=_("User")
+ )
+ name = models.TextField(_("Name"))
+ created = models.DateTimeField(_("Created"), auto_now_add=True)
+ access_type = models.CharField(_("Access type"), default="R",
+ choices=API_ACCESS_TYPES, max_length=1)
+ limit_date = models.DateField(_("Limit date"), blank=True, null=True)
+ last_ip = models.GenericIPAddressField(verbose_name=_("Last IP"), blank=True,
+ null=True)
+ last_access = models.DateField(_("Last access date"), auto_now_add=True)
+
+ class Meta:
+ verbose_name = _("API - GIS - Token")
+ verbose_name_plural = _("API - GIS - Tokens")
+ ordering = ("user", "name", "limit_date")
+ ADMIN_SECTION = _("API")
+
+ @property
+ def access_type_label(self):
+ if self.access_type in API_ACCESS_TYPES_DICT:
+ return API_ACCESS_TYPES_DICT[self.access_type]
+ return ""
+
+ def save(self, *args, **kwargs):
+ if not self.key:
+ self.key = Token.generate_key()
+ return super().save(*args, **kwargs)
+
+ def __str__(self):
+ access = self.access_type if self.access_type in API_ACCESS_TYPES_DICT else 'R'
+ return f"{self.user.username}-{API_ACCESS_TYPES_DICT[access]}"
+
+
class ApiUser(models.Model):
user_ptr = models.OneToOneField(
- User, primary_key=True, related_name="apiuser", on_delete=models.CASCADE, verbose_name=_("User")
+ User, primary_key=True, related_name="apiuser", on_delete=models.CASCADE,
+ verbose_name=_("User")
)
ip = models.GenericIPAddressField(verbose_name=_("IP"))
diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py
index 053e19e7c..7582bf163 100644
--- a/ishtar_common/tests.py
+++ b/ishtar_common/tests.py
@@ -50,16 +50,19 @@ from django.core.exceptions import ValidationError
from django.core.files import File as DjangoFile
from django.core.files.uploadedfile import SimpleUploadedFile
from django.core.management import call_command
+from django.db import transaction
from django.db.models.fields import BooleanField
from django.db.models.fields.related import ForeignKey
+from django.db.utils import IntegrityError
from django.template.defaultfilters import slugify
from django.test import TestCase as BaseTestCase
from django.test.client import Client
from django.test.runner import DiscoverRunner
+from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from django.urls import reverse
-from ishtar_common import models, models_common, forms_common
+from ishtar_common import models, models_common, models_rest, forms_common
from ishtar_common import views
from ishtar_common.apps import admin_site
from ishtar_common.serializers import (
@@ -84,6 +87,7 @@ from archaeological_finds.serializers import FIND_MODEL_LIST
from archaeological_warehouse.serializers import WAREHOUSE_MODEL_LIST
from ishtar_common.serializers_utils import serialization_info
from ishtar_common.utils import (
+ get_current_profile,
update_data,
move_dict_data,
rename_and_simplify_media_name,
@@ -3601,6 +3605,87 @@ class SheetFilterTest(TestCase):
self.assertIn('Bill', content)
+class QgisAPITest(TestCase):
+ def setUp(self):
+ self.password = "mypassword"
+ self.user = User.objects.create_user(
+ "myuser", "myemail@test.com", self.password
+ )
+
+ def test_profile_not_set(self):
+ profile = get_current_profile()
+ profile.gis_connector = False
+ profile.save()
+ # no key generation
+ req = models_rest.UserRequestToken.objects.create(
+ user=self.user
+ )
+ token = req.generate_token("AAAAAA")
+ self.assertFalse(
+ models_rest.UserRequestToken.objects.filter(pk=req.pk).count()
+ )
+ self.assertFalse(token) # token is not generated
+
+ def test_normal_key_generation(self):
+ profile = get_current_profile()
+ profile.gis_connector = True
+ profile.save()
+
+ req = models_rest.UserRequestToken.objects.create(
+ user=self.user
+ )
+ with transaction.atomic():
+ # only one request is possible by user
+ try:
+ with self.assertRaises(IntegrityError):
+ models_rest.UserRequestToken.objects.create(
+ user=self.user
+ )
+ except IntegrityError:
+ pass
+ self.assertTrue(req.key)
+ token = req.generate_token("AAAAAA")
+ # request is deleted
+ self.assertFalse(
+ models_rest.UserRequestToken.objects.filter(pk=req.pk).count()
+ )
+ self.assertTrue(token) # token is generated
+ self.assertTrue(token.key.startswith("AAAAAA0"))
+
+ def test_clean(self):
+ profile = get_current_profile()
+ profile.gis_connector = True
+ profile.save()
+ req = models_rest.UserRequestToken.objects.create(
+ user=self.user
+ )
+ models_rest.UserRequestToken.clean_keys()
+ # request is still valid keep it
+ self.assertTrue(
+ models_rest.UserRequestToken.objects.filter(pk=req.pk).count()
+ )
+
+ req.created = timezone.now() - datetime.timedelta(seconds=(10*60+1))
+ req.save()
+ models_rest.UserRequestToken.clean_keys()
+ # request is no more valid
+ self.assertFalse(
+ models_rest.UserRequestToken.objects.filter(pk=req.pk).count()
+ )
+
+ req = models_rest.UserRequestToken.objects.create(
+ user=self.user
+ )
+ req.created = timezone.now() - datetime.timedelta(seconds=(10*60+1))
+ req.save()
+ # cannot generate token
+ self.assertIsNone(req.generate_token("AAAAAA"))
+ # request is also deleted
+ self.assertFalse(
+ models_rest.UserRequestToken.objects.filter(pk=req.pk).count()
+ )
+
+
class GeoVectorFormTest(TestCase):
fixtures = FILE_FIXTURES