diff options
author | Étienne Loks <etienne.loks@iggdrasil.net> | 2025-05-01 17:33:17 +0200 |
---|---|---|
committer | Étienne Loks <etienne.loks@iggdrasil.net> | 2025-06-13 18:16:05 +0200 |
commit | 2affbfc716765b697fa6bc676b5f5788f17f3063 (patch) | |
tree | 17b810f995acce655e24fde96bea2afdcfb0ca4e /ishtar_common/models_rest.py | |
parent | 606934214846af318b4074b3bb8b06f3bb1bec7c (diff) | |
download | Ishtar-2affbfc716765b697fa6bc676b5f5788f17f3063.tar.bz2 Ishtar-2affbfc716765b697fa6bc676b5f5788f17f3063.zip |
🗃️ GIS API: database migrations - some logic to manage token request
Diffstat (limited to 'ishtar_common/models_rest.py')
-rw-r--r-- | ishtar_common/models_rest.py | 125 |
1 files changed, 123 insertions, 2 deletions
diff --git a/ishtar_common/models_rest.py b/ishtar_common/models_rest.py index 9ed9abd3e..e26968daf 100644 --- a/ishtar_common/models_rest.py +++ b/ishtar_common/models_rest.py @@ -8,13 +8,16 @@ from django.contrib.contenttypes.models import ContentType from django.contrib.gis.db import models from django.contrib.postgres.fields import ArrayField from django.core.files import File +from django.utils import timezone from django.utils.text import slugify from django.apps import apps from django.template import loader from ishtar_common.models_common import BaseSheetFilter -from ishtar_common.utils import gettext_lazy as _ +from ishtar_common.utils import get_current_profile, gettext_lazy as _ + +from rest_framework.authtoken.models import Token UnoCalc = None ITALIC = None @@ -26,9 +29,127 @@ if settings.USE_LIBREOFFICE: pass +API_ACCESS_TYPES = ( + ('R', _("GIS - read")), + ('W', _("GIS - read/write")), +) + +API_ACCESS_TYPES_DICT = dict(API_ACCESS_TYPES) + + +class UserRequestToken(models.Model): + """ + Temporary key to request an API key + """ + created = models.DateTimeField(_("Created"), auto_now_add=True) + key = models.CharField(_("Key"), max_length=6, unique=True) + user = models.OneToOneField( + User, primary_key=True, related_name="user_request_token", + on_delete=models.CASCADE, + verbose_name=_("User") + ) # only one request at a time + access_type = models.CharField(_("Access type"), default="R", + choices=API_ACCESS_TYPES, max_length=1) + name = models.TextField(_("Name")) + limit_date = models.DateField(_("Limit date"), blank=True, null=True) + + class Meta: + verbose_name = _("API - GIS - Request token") + verbose_name_plural = _("API - GIS - Request tokens") + ADMIN_SECTION = _("API") + + @classmethod + def clean_keys(cls): + profile = get_current_profile() + if not profile.gis_connector: + cls.objects.all().delete() + return + timeout = timezone.now() - datetime.timedelta( + seconds=settings.ISHTAR_REQUEST_TOKEN_TIMEOUT) + cls.objects.filter(created__lte=timeout).delete() + + def generate_token(self, app_key, from_ip="127.0.0.1"): + """ + Create API token if not timeouted. + app_key is the QGIS ID given by the QGIS auth db + """ + profile = get_current_profile() + timeout = timezone.now() - self.created + if not profile.gis_connector or timeout.seconds > settings.ISHTAR_REQUEST_TOKEN_TIMEOUT or ( + self.limit_date and self.limit_date < datetime.today()): + self.delete() + return + app_key = f"{app_key[:7]:0<7}" # limit to 7 char and fill with 0 + key = app_key + Token.generate_key()[7:] # integrate QGIS ID with api key + # this is only intended to prevent bad practice such as + # sharing different API key with several devices and users + # or naive key compromission. A saisoned user can bypass with + # a modified QGIS auth db or plugin. + token = UserToken( + user=self.user, access_type=self.access_type, + limit_date=self.limit_date, last_ip=from_ip, + key=key, name=self.name + ) + token.save() + self.delete() + return token + + def save(self, *args, **kwargs): + while not self.key: + key = Token.generate_key()[:6] + # very unlikely but could occurs + if UserRequestToken.objects.filter(key=key).count(): + continue + self.key = key + return super().save(*args, **kwargs) + + +class UserToken(models.Model): + """ + GIS token used for QGIS access. + Token is an aggregation of a standard API key and QGIS auth DB key (specific for + one QGIS installation) + """ + key = models.CharField(_("Key"), max_length=40, primary_key=True) + user = models.ForeignKey( + settings.AUTH_USER_MODEL, related_name='user_token', + on_delete=models.CASCADE, verbose_name=_("User") + ) + name = models.TextField(_("Name")) + created = models.DateTimeField(_("Created"), auto_now_add=True) + access_type = models.CharField(_("Access type"), default="R", + choices=API_ACCESS_TYPES, max_length=1) + limit_date = models.DateField(_("Limit date"), blank=True, null=True) + last_ip = models.GenericIPAddressField(verbose_name=_("Last IP"), blank=True, + null=True) + last_access = models.DateField(_("Last access date"), auto_now_add=True) + + class Meta: + verbose_name = _("API - GIS - Token") + verbose_name_plural = _("API - GIS - Tokens") + ordering = ("user", "name", "limit_date") + ADMIN_SECTION = _("API") + + @property + def access_type_label(self): + if self.access_type in API_ACCESS_TYPES_DICT: + return API_ACCESS_TYPES_DICT[self.access_type] + return "" + + def save(self, *args, **kwargs): + if not self.key: + self.key = Token.generate_key() + return super().save(*args, **kwargs) + + def __str__(self): + access = self.access_type if self.access_type in API_ACCESS_TYPES_DICT else 'R' + return f"{self.user.username}-{API_ACCESS_TYPES_DICT[access]}" + + class ApiUser(models.Model): user_ptr = models.OneToOneField( - User, primary_key=True, related_name="apiuser", on_delete=models.CASCADE, verbose_name=_("User") + User, primary_key=True, related_name="apiuser", on_delete=models.CASCADE, + verbose_name=_("User") ) ip = models.GenericIPAddressField(verbose_name=_("IP")) |