diff options
author | Étienne Loks <etienne.loks@iggdrasil.net> | 2025-05-01 17:33:17 +0200 |
---|---|---|
committer | Étienne Loks <etienne.loks@iggdrasil.net> | 2025-10-15 19:32:59 +0200 |
commit | 99ca4cbcf573cdd80ab525d573b1f0e3ab738057 (patch) | |
tree | ef54ff649355b574b6a0af5789d5aabc73bcace2 | |
parent | 22e1f1f2a167b362ccaa3629c66c64d55aca8dae (diff) | |
download | Ishtar-99ca4cbcf573cdd80ab525d573b1f0e3ab738057.tar.bz2 Ishtar-99ca4cbcf573cdd80ab525d573b1f0e3ab738057.zip |
🗃️ GIS API: database migrations - some logic to manage token request
-rw-r--r-- | example_project/settings.py | 3 | ||||
-rw-r--r-- | ishtar_common/admin.py | 13 | ||||
-rw-r--r-- | ishtar_common/models.py | 1 | ||||
-rw-r--r-- | ishtar_common/models_rest.py | 125 | ||||
-rw-r--r-- | ishtar_common/tests.py | 87 |
5 files changed, 225 insertions, 4 deletions
diff --git a/example_project/settings.py b/example_project/settings.py index ba88e842a..feb994af8 100644 --- a/example_project/settings.py +++ b/example_project/settings.py @@ -285,7 +285,8 @@ ISHTAR_DEFAULT_YEAR = 1900 ISHTAR_MUSEUM_GAM = False # France - AlimGAM export of exhibitions # exclude business days from deadline calculation ISHTAR_FILE_EXCLUDE_BUSSINESS_DAYS = True - +# timeout for request token (GIS connector) +ISHTAR_REQUEST_TOKEN_TIMEOUT = 60*10 # 10 minutes ISHTAR_SLUGS = { "document-publisher": ["publisher"], } diff --git a/ishtar_common/admin.py b/ishtar_common/admin.py index 9b0bf8fac..f7cdcc867 100644 --- a/ishtar_common/admin.py +++ b/ishtar_common/admin.py @@ -636,6 +636,7 @@ class IshtarSiteProfileAdmin(admin.ModelAdmin): "classes": ("collapse",), "fields": ( "experimental_feature", + "gis_connector", "calculate_weight_on_full", "locate_warehouses", "use_town_for_geo", @@ -2897,6 +2898,18 @@ class DocumentTemplateAdmin(admin.ModelAdmin): admin_site.register(models.DocumentTemplate, DocumentTemplateAdmin) +@admin.register(models_rest.UserRequestToken, site=admin_site) +class UserRequestTokenAdmin(admin.ModelAdmin): + list_display = ("user", "access_type", "created") + readonly_fields = ("key",) + + +@admin.register(models_rest.UserToken, site=admin_site) +class UserTokenAdmin(admin.ModelAdmin): + list_display = ("user", "access_type", "name", "last_ip", "last_access") + readonly_fields = ("key", "last_ip", "last_access") + + class ApiUserAdmin(admin.ModelAdmin): list_display = ("user_ptr", "ip") diff --git a/ishtar_common/models.py b/ishtar_common/models.py index 9cc61742a..66c29e08d 100644 --- a/ishtar_common/models.py +++ b/ishtar_common/models.py @@ -1195,6 +1195,7 @@ class IshtarSiteProfile(models.Model, Cached): preventive_operator = models.BooleanField( _("Preventive operator module"), default=False ) + gis_connector = models.BooleanField(_("GIS connector"), default=False) underwater = models.BooleanField(_("Underwater module"), default=False) no_context_button = models.ForeignKey( "archaeological_context_records.ContextRecord", diff --git a/ishtar_common/models_rest.py b/ishtar_common/models_rest.py index 9ed9abd3e..e26968daf 100644 --- a/ishtar_common/models_rest.py +++ b/ishtar_common/models_rest.py @@ -8,13 +8,16 @@ from django.contrib.contenttypes.models import ContentType from django.contrib.gis.db import models from django.contrib.postgres.fields import ArrayField from django.core.files import File +from django.utils import timezone from django.utils.text import slugify from django.apps import apps from django.template import loader from ishtar_common.models_common import BaseSheetFilter -from ishtar_common.utils import gettext_lazy as _ +from ishtar_common.utils import get_current_profile, gettext_lazy as _ + +from rest_framework.authtoken.models import Token UnoCalc = None ITALIC = None @@ -26,9 +29,127 @@ if settings.USE_LIBREOFFICE: pass +API_ACCESS_TYPES = ( + ('R', _("GIS - read")), + ('W', _("GIS - read/write")), +) + +API_ACCESS_TYPES_DICT = dict(API_ACCESS_TYPES) + + +class UserRequestToken(models.Model): + """ + Temporary key to request an API key + """ + created = models.DateTimeField(_("Created"), auto_now_add=True) + key = models.CharField(_("Key"), max_length=6, unique=True) + user = models.OneToOneField( + User, primary_key=True, related_name="user_request_token", + on_delete=models.CASCADE, + verbose_name=_("User") + ) # only one request at a time + access_type = models.CharField(_("Access type"), default="R", + choices=API_ACCESS_TYPES, max_length=1) + name = models.TextField(_("Name")) + limit_date = models.DateField(_("Limit date"), blank=True, null=True) + + class Meta: + verbose_name = _("API - GIS - Request token") + verbose_name_plural = _("API - GIS - Request tokens") + ADMIN_SECTION = _("API") + + @classmethod + def clean_keys(cls): + profile = get_current_profile() + if not profile.gis_connector: + cls.objects.all().delete() + return + timeout = timezone.now() - datetime.timedelta( + seconds=settings.ISHTAR_REQUEST_TOKEN_TIMEOUT) + cls.objects.filter(created__lte=timeout).delete() + + def generate_token(self, app_key, from_ip="127.0.0.1"): + """ + Create API token if not timeouted. + app_key is the QGIS ID given by the QGIS auth db + """ + profile = get_current_profile() + timeout = timezone.now() - self.created + if not profile.gis_connector or timeout.seconds > settings.ISHTAR_REQUEST_TOKEN_TIMEOUT or ( + self.limit_date and self.limit_date < datetime.today()): + self.delete() + return + app_key = f"{app_key[:7]:0<7}" # limit to 7 char and fill with 0 + key = app_key + Token.generate_key()[7:] # integrate QGIS ID with api key + # this is only intended to prevent bad practice such as + # sharing different API key with several devices and users + # or naive key compromission. A saisoned user can bypass with + # a modified QGIS auth db or plugin. + token = UserToken( + user=self.user, access_type=self.access_type, + limit_date=self.limit_date, last_ip=from_ip, + key=key, name=self.name + ) + token.save() + self.delete() + return token + + def save(self, *args, **kwargs): + while not self.key: + key = Token.generate_key()[:6] + # very unlikely but could occurs + if UserRequestToken.objects.filter(key=key).count(): + continue + self.key = key + return super().save(*args, **kwargs) + + +class UserToken(models.Model): + """ + GIS token used for QGIS access. + Token is an aggregation of a standard API key and QGIS auth DB key (specific for + one QGIS installation) + """ + key = models.CharField(_("Key"), max_length=40, primary_key=True) + user = models.ForeignKey( + settings.AUTH_USER_MODEL, related_name='user_token', + on_delete=models.CASCADE, verbose_name=_("User") + ) + name = models.TextField(_("Name")) + created = models.DateTimeField(_("Created"), auto_now_add=True) + access_type = models.CharField(_("Access type"), default="R", + choices=API_ACCESS_TYPES, max_length=1) + limit_date = models.DateField(_("Limit date"), blank=True, null=True) + last_ip = models.GenericIPAddressField(verbose_name=_("Last IP"), blank=True, + null=True) + last_access = models.DateField(_("Last access date"), auto_now_add=True) + + class Meta: + verbose_name = _("API - GIS - Token") + verbose_name_plural = _("API - GIS - Tokens") + ordering = ("user", "name", "limit_date") + ADMIN_SECTION = _("API") + + @property + def access_type_label(self): + if self.access_type in API_ACCESS_TYPES_DICT: + return API_ACCESS_TYPES_DICT[self.access_type] + return "" + + def save(self, *args, **kwargs): + if not self.key: + self.key = Token.generate_key() + return super().save(*args, **kwargs) + + def __str__(self): + access = self.access_type if self.access_type in API_ACCESS_TYPES_DICT else 'R' + return f"{self.user.username}-{API_ACCESS_TYPES_DICT[access]}" + + class ApiUser(models.Model): user_ptr = models.OneToOneField( - User, primary_key=True, related_name="apiuser", on_delete=models.CASCADE, verbose_name=_("User") + User, primary_key=True, related_name="apiuser", on_delete=models.CASCADE, + verbose_name=_("User") ) ip = models.GenericIPAddressField(verbose_name=_("IP")) diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py index 053e19e7c..7582bf163 100644 --- a/ishtar_common/tests.py +++ b/ishtar_common/tests.py @@ -50,16 +50,19 @@ from django.core.exceptions import ValidationError from django.core.files import File as DjangoFile from django.core.files.uploadedfile import SimpleUploadedFile from django.core.management import call_command +from django.db import transaction from django.db.models.fields import BooleanField from django.db.models.fields.related import ForeignKey +from django.db.utils import IntegrityError from django.template.defaultfilters import slugify from django.test import TestCase as BaseTestCase from django.test.client import Client from django.test.runner import DiscoverRunner +from django.utils import timezone from django.utils.translation import gettext_lazy as _ from django.urls import reverse -from ishtar_common import models, models_common, forms_common +from ishtar_common import models, models_common, models_rest, forms_common from ishtar_common import views from ishtar_common.apps import admin_site from ishtar_common.serializers import ( @@ -84,6 +87,7 @@ from archaeological_finds.serializers import FIND_MODEL_LIST from archaeological_warehouse.serializers import WAREHOUSE_MODEL_LIST from ishtar_common.serializers_utils import serialization_info from ishtar_common.utils import ( + get_current_profile, update_data, move_dict_data, rename_and_simplify_media_name, @@ -3601,6 +3605,87 @@ class SheetFilterTest(TestCase): self.assertIn('Bill', content) +class QgisAPITest(TestCase): + def setUp(self): + self.password = "mypassword" + self.user = User.objects.create_user( + "myuser", "myemail@test.com", self.password + ) + + def test_profile_not_set(self): + profile = get_current_profile() + profile.gis_connector = False + profile.save() + # no key generation + req = models_rest.UserRequestToken.objects.create( + user=self.user + ) + token = req.generate_token("AAAAAA") + self.assertFalse( + models_rest.UserRequestToken.objects.filter(pk=req.pk).count() + ) + self.assertFalse(token) # token is not generated + + def test_normal_key_generation(self): + profile = get_current_profile() + profile.gis_connector = True + profile.save() + + req = models_rest.UserRequestToken.objects.create( + user=self.user + ) + with transaction.atomic(): + # only one request is possible by user + try: + with self.assertRaises(IntegrityError): + models_rest.UserRequestToken.objects.create( + user=self.user + ) + except IntegrityError: + pass + self.assertTrue(req.key) + token = req.generate_token("AAAAAA") + # request is deleted + self.assertFalse( + models_rest.UserRequestToken.objects.filter(pk=req.pk).count() + ) + self.assertTrue(token) # token is generated + self.assertTrue(token.key.startswith("AAAAAA0")) + + def test_clean(self): + profile = get_current_profile() + profile.gis_connector = True + profile.save() + req = models_rest.UserRequestToken.objects.create( + user=self.user + ) + models_rest.UserRequestToken.clean_keys() + # request is still valid keep it + self.assertTrue( + models_rest.UserRequestToken.objects.filter(pk=req.pk).count() + ) + + req.created = timezone.now() - datetime.timedelta(seconds=(10*60+1)) + req.save() + models_rest.UserRequestToken.clean_keys() + # request is no more valid + self.assertFalse( + models_rest.UserRequestToken.objects.filter(pk=req.pk).count() + ) + + req = models_rest.UserRequestToken.objects.create( + user=self.user + ) + req.created = timezone.now() - datetime.timedelta(seconds=(10*60+1)) + req.save() + # cannot generate token + self.assertIsNone(req.generate_token("AAAAAA")) + # request is also deleted + self.assertFalse( + models_rest.UserRequestToken.objects.filter(pk=req.pk).count() + ) + + class GeoVectorFormTest(TestCase): fixtures = FILE_FIXTURES |