summaryrefslogtreecommitdiff
path: root/ishtar_common/models_rest.py
diff options
context:
space:
mode:
Diffstat (limited to 'ishtar_common/models_rest.py')
-rw-r--r--ishtar_common/models_rest.py125
1 files changed, 123 insertions, 2 deletions
diff --git a/ishtar_common/models_rest.py b/ishtar_common/models_rest.py
index 9ed9abd3e..e26968daf 100644
--- a/ishtar_common/models_rest.py
+++ b/ishtar_common/models_rest.py
@@ -8,13 +8,16 @@ from django.contrib.contenttypes.models import ContentType
from django.contrib.gis.db import models
from django.contrib.postgres.fields import ArrayField
from django.core.files import File
+from django.utils import timezone
from django.utils.text import slugify
from django.apps import apps
from django.template import loader
from ishtar_common.models_common import BaseSheetFilter
-from ishtar_common.utils import gettext_lazy as _
+from ishtar_common.utils import get_current_profile, gettext_lazy as _
+
+from rest_framework.authtoken.models import Token
UnoCalc = None
ITALIC = None
@@ -26,9 +29,127 @@ if settings.USE_LIBREOFFICE:
pass
+API_ACCESS_TYPES = (
+ ('R', _("GIS - read")),
+ ('W', _("GIS - read/write")),
+)
+
+API_ACCESS_TYPES_DICT = dict(API_ACCESS_TYPES)
+
+
+class UserRequestToken(models.Model):
+ """
+ Temporary key to request an API key
+ """
+ created = models.DateTimeField(_("Created"), auto_now_add=True)
+ key = models.CharField(_("Key"), max_length=6, unique=True)
+ user = models.OneToOneField(
+ User, primary_key=True, related_name="user_request_token",
+ on_delete=models.CASCADE,
+ verbose_name=_("User")
+ ) # only one request at a time
+ access_type = models.CharField(_("Access type"), default="R",
+ choices=API_ACCESS_TYPES, max_length=1)
+ name = models.TextField(_("Name"))
+ limit_date = models.DateField(_("Limit date"), blank=True, null=True)
+
+ class Meta:
+ verbose_name = _("API - GIS - Request token")
+ verbose_name_plural = _("API - GIS - Request tokens")
+ ADMIN_SECTION = _("API")
+
+ @classmethod
+ def clean_keys(cls):
+ profile = get_current_profile()
+ if not profile.gis_connector:
+ cls.objects.all().delete()
+ return
+ timeout = timezone.now() - datetime.timedelta(
+ seconds=settings.ISHTAR_REQUEST_TOKEN_TIMEOUT)
+ cls.objects.filter(created__lte=timeout).delete()
+
+ def generate_token(self, app_key, from_ip="127.0.0.1"):
+ """
+ Create API token if not timeouted.
+ app_key is the QGIS ID given by the QGIS auth db
+ """
+ profile = get_current_profile()
+ timeout = timezone.now() - self.created
+ if not profile.gis_connector or timeout.seconds > settings.ISHTAR_REQUEST_TOKEN_TIMEOUT or (
+ self.limit_date and self.limit_date < datetime.today()):
+ self.delete()
+ return
+ app_key = f"{app_key[:7]:0<7}" # limit to 7 char and fill with 0
+ key = app_key + Token.generate_key()[7:] # integrate QGIS ID with api key
+ # this is only intended to prevent bad practice such as
+ # sharing different API key with several devices and users
+ # or naive key compromission. A saisoned user can bypass with
+ # a modified QGIS auth db or plugin.
+ token = UserToken(
+ user=self.user, access_type=self.access_type,
+ limit_date=self.limit_date, last_ip=from_ip,
+ key=key, name=self.name
+ )
+ token.save()
+ self.delete()
+ return token
+
+ def save(self, *args, **kwargs):
+ while not self.key:
+ key = Token.generate_key()[:6]
+ # very unlikely but could occurs
+ if UserRequestToken.objects.filter(key=key).count():
+ continue
+ self.key = key
+ return super().save(*args, **kwargs)
+
+
+class UserToken(models.Model):
+ """
+ GIS token used for QGIS access.
+ Token is an aggregation of a standard API key and QGIS auth DB key (specific for
+ one QGIS installation)
+ """
+ key = models.CharField(_("Key"), max_length=40, primary_key=True)
+ user = models.ForeignKey(
+ settings.AUTH_USER_MODEL, related_name='user_token',
+ on_delete=models.CASCADE, verbose_name=_("User")
+ )
+ name = models.TextField(_("Name"))
+ created = models.DateTimeField(_("Created"), auto_now_add=True)
+ access_type = models.CharField(_("Access type"), default="R",
+ choices=API_ACCESS_TYPES, max_length=1)
+ limit_date = models.DateField(_("Limit date"), blank=True, null=True)
+ last_ip = models.GenericIPAddressField(verbose_name=_("Last IP"), blank=True,
+ null=True)
+ last_access = models.DateField(_("Last access date"), auto_now_add=True)
+
+ class Meta:
+ verbose_name = _("API - GIS - Token")
+ verbose_name_plural = _("API - GIS - Tokens")
+ ordering = ("user", "name", "limit_date")
+ ADMIN_SECTION = _("API")
+
+ @property
+ def access_type_label(self):
+ if self.access_type in API_ACCESS_TYPES_DICT:
+ return API_ACCESS_TYPES_DICT[self.access_type]
+ return ""
+
+ def save(self, *args, **kwargs):
+ if not self.key:
+ self.key = Token.generate_key()
+ return super().save(*args, **kwargs)
+
+ def __str__(self):
+ access = self.access_type if self.access_type in API_ACCESS_TYPES_DICT else 'R'
+ return f"{self.user.username}-{API_ACCESS_TYPES_DICT[access]}"
+
+
class ApiUser(models.Model):
user_ptr = models.OneToOneField(
- User, primary_key=True, related_name="apiuser", on_delete=models.CASCADE, verbose_name=_("User")
+ User, primary_key=True, related_name="apiuser", on_delete=models.CASCADE,
+ verbose_name=_("User")
)
ip = models.GenericIPAddressField(verbose_name=_("IP"))