import datetime import os import tempfile from django.conf import settings from django.contrib.auth.models import User from django.contrib.contenttypes.models import ContentType from django.contrib.gis.db import models from django.contrib.postgres.fields import ArrayField from django.core.files import File from django.utils import timezone from django.utils.text import slugify from django.apps import apps from django.template import loader from ishtar_common.models_common import BaseSheetFilter from ishtar_common.utils import get_current_profile, gettext_lazy as _ from rest_framework.authtoken.models import Token UnoCalc = None ITALIC = None if settings.USE_LIBREOFFICE: try: from ishtar_common.libreoffice import UnoCalc from com.sun.star.awt.FontSlant import ITALIC except ImportError: pass API_ACCESS_TYPES = ( ('R', _("GIS - read")), ('W', _("GIS - read/write")), ) API_ACCESS_TYPES_DICT = dict(API_ACCESS_TYPES) class UserRequestToken(models.Model): """ Temporary key to request an API key """ created = models.DateTimeField(_("Created"), auto_now_add=True) key = models.CharField(_("Key"), max_length=6, unique=True) user = models.OneToOneField( User, primary_key=True, related_name="user_request_token", on_delete=models.CASCADE, verbose_name=_("User") ) # only one request at a time access_type = models.CharField(_("Access type"), default="R", choices=API_ACCESS_TYPES, max_length=1) name = models.TextField(_("Name")) limit_date = models.DateField(_("Limit date"), blank=True, null=True) class Meta: verbose_name = _("API - GIS - Request token") verbose_name_plural = _("API - GIS - Request tokens") ADMIN_SECTION = _("API") @property def access_type_label(self): if self.access_type in API_ACCESS_TYPES_DICT: return API_ACCESS_TYPES_DICT[self.access_type] return "" @property def expiry(self): timeout = timezone.now() - self.created return settings.ISHTAR_REQUEST_TOKEN_TIMEOUT - timeout.seconds @classmethod def clean_keys(cls): profile = get_current_profile() if not profile.gis_connector: cls.objects.all().delete() return timeout = timezone.now() - datetime.timedelta( seconds=settings.ISHTAR_REQUEST_TOKEN_TIMEOUT) cls.objects.filter(created__lte=timeout).delete() def generate_token(self, app_key, from_ip="127.0.0.1"): """ Create API token if not timeouted. app_key is the QGIS ID given by the QGIS auth db """ profile = get_current_profile() timeout = timezone.now() - self.created if not profile.gis_connector or timeout.seconds > settings.ISHTAR_REQUEST_TOKEN_TIMEOUT or ( self.limit_date and self.limit_date < datetime.today()): self.delete() return app_key = f"{app_key[:7]:0<7}" # limit to 7 char and fill with 0 key = app_key + Token.generate_key()[7:] # integrate QGIS ID with api key # this is only intended to prevent bad practice such as # sharing different API key with several devices and users # or naive key compromission. A saisoned user can bypass with # a modified QGIS auth db or plugin. token = UserToken( user=self.user, access_type=self.access_type, limit_date=self.limit_date, last_ip=from_ip, key=key, name=self.name ) token.save() self.delete() return token def save(self, *args, **kwargs): while not self.key: key = Token.generate_key()[:6] # very unlikely but could occurs if UserRequestToken.objects.filter(key=key).count(): continue self.key = key return super().save(*args, **kwargs) class UserToken(models.Model): """ GIS token used for QGIS access. Token is an aggregation of a standard API key and QGIS auth DB key (specific for one QGIS installation) """ key = models.CharField(_("Key"), max_length=40, primary_key=True) user = models.ForeignKey( settings.AUTH_USER_MODEL, related_name='user_token', on_delete=models.CASCADE, verbose_name=_("User") ) name = models.TextField(_("Name")) created = models.DateTimeField(_("Created"), auto_now_add=True) access_type = models.CharField(_("Access type"), default="R", choices=API_ACCESS_TYPES, max_length=1) limit_date = models.DateField(_("Limit date"), blank=True, null=True) last_ip = models.GenericIPAddressField(verbose_name=_("Last IP"), blank=True, null=True) last_access = models.DateField(_("Last access date"), auto_now_add=True) class Meta: verbose_name = _("API - GIS - Token") verbose_name_plural = _("API - GIS - Tokens") ordering = ("user", "name", "limit_date") ADMIN_SECTION = _("API") @property def access_type_label(self): if self.access_type in API_ACCESS_TYPES_DICT: return API_ACCESS_TYPES_DICT[self.access_type] return "" def save(self, *args, **kwargs): if not self.key: self.key = Token.generate_key() return super().save(*args, **kwargs) def __str__(self): access = self.access_type if self.access_type in API_ACCESS_TYPES_DICT else 'R' return f"{self.user.username}-{API_ACCESS_TYPES_DICT[access]}" class ApiUser(models.Model): user_ptr = models.OneToOneField( User, primary_key=True, related_name="apiuser", on_delete=models.CASCADE, verbose_name=_("User") ) ip = models.GenericIPAddressField(verbose_name=_("IP")) class Meta: verbose_name = _("API - Remote access - User") verbose_name_plural = _("API - Remote access - Users") ADMIN_SECTION = _("API") def __str__(self): return self.user_ptr.username class ApiSearchModel(models.Model): user = models.ForeignKey(ApiUser, on_delete=models.CASCADE, verbose_name=_("User")) content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE) limit_query = models.TextField( verbose_name=_("Filter query"), blank=True, null=True, help_text=_("Search query add to each request"), ) table_format = models.ForeignKey( "ishtar_common.ImporterType", on_delete=models.PROTECT, null=True, verbose_name=_("Table formats"), related_name="search_model_table_format", help_text="Not used. Set it when table columns will be set by importer." ) export = models.ManyToManyField( "ishtar_common.ImporterType", blank=True, verbose_name=_("Export"), related_name="search_model_exports") class Meta: verbose_name = _("API - Remote access - Search model") verbose_name_plural = _("API - Remote access - Search models") ADMIN_SECTION = _("API") def __str__(self): return f"{self.user} - {self.content_type}" class ApiSheetFilter(BaseSheetFilter): api_search_model = models.ForeignKey(ApiSearchModel, on_delete=models.CASCADE, related_name="sheet_filters") class Meta: verbose_name = _("API - Remote access - Sheet filter") verbose_name_plural = _("API - Remote access - Sheet filters") ADMIN_SECTION = _("API") def get_template(self): ct = self.api_search_model.content_type model = apps.get_model(ct.app_label, ct.model) tpl = loader.get_template(f"ishtar/sheet_{model.SLUG}.html") return tpl.template.origin.name def __str__(self): return f"{self.api_search_model} - {self.key}" class ApiExternalSource(models.Model): url = models.URLField(verbose_name=_("URL")) name = models.CharField(verbose_name=_("Name"), max_length=200) key = models.CharField(_("Key"), max_length=40) search_columns = models.TextField(_("Search columns"), default="") search_columns_label = models.TextField(_("Search columns label"), default="") exports = models.TextField(_("Exports slug"), default="") exports_label = models.TextField(_("Exports label"), default="") users = models.ManyToManyField("IshtarUser", blank=True, verbose_name=_("Users")) match_document = models.FileField( _("Match document"), blank=True, null=True, help_text=_( 'First use the "Update types from source" action. Then use the action ' '"Generate match document" action to create a default match document. ' "Complete it and attach it back to the source to finally use the action " '"Update association from match document".' ), ) class Meta: verbose_name = _("API - Search - External source") verbose_name_plural = _("API - Search - External sources") ordering = ("name",) ADMIN_SECTION = _("API") def __str__(self): return self.name def get_columns(self, model_name): """ Column keys for table display :return: (key1:str, key2:str, ...) - list of column key """ if not self.search_columns: return [] model_name += "-" return [ k[len(model_name):] for k in self.search_columns.split("||") if k.startswith(model_name) ] def get_column_labels(self, model_name): """ Column label for table display :return: (label1:str, label2:str, ...) - list of column labels """ if not self.search_columns_label: return [] model_name += "-" return [ k[len(model_name):] for k in self.search_columns_label.split("||") if k.startswith(model_name) ] def get_exports(self, model_name): """ Get export list :return: [(slug:slug, label:str)] - list of export slug and labels """ if not self.exports or not self.exports_label: return [] model_name += "-" exports = [ k[len(model_name):] for k in self.exports.split("||") if k.startswith(model_name) ] exports_label = [ k[len(model_name):] for k in self.exports_label.split("||") if k.startswith(model_name) ] result = [] for idx in range(min([len(exports), len(exports_label)])): result.append((exports[idx], exports_label[idx])) return result def save(self, force_insert=False, force_update=False, using=None, update_fields=None): super().save(force_insert, force_update, using, update_fields) # remove external sources from profiles if not in user anymore UserProfile = apps.get_model("ishtar_common", "UserProfile") q = UserProfile.objects.filter(external_sources=self).exclude( person__ishtaruser__in=list(self.users.all()) ) for profile in q.all(): profile.external_sources.remove(self) def update_matches(self, content): result = { "created": 0, "updated": 0, "deleted": 0, "search_model do not exist": [], "type do not exist": [], } for search_model in content: if search_model == "config": continue app, model_name = search_model.split(".") try: ct = ContentType.objects.get(app_label=app, model=model_name) except ContentType.DoesNotExist: result["search_model do not exist"].append(search_model) continue for ct_type, keys, values in content[search_model]: tapp, tmodel_name = ct_type.split(".") try: ct_type = ContentType.objects.get(app_label=tapp, model=tmodel_name) except ContentType.DoesNotExist: result["type do not exist"].append(ct_type) continue t_model = ct_type.model_class() current_matches = [] for slug, label in values: current_matches.append(slug) m, created = ApiKeyMatch.objects.get_or_create( source=self, search_model=ct, associated_type=ct_type, search_keys=keys, distant_slug=slug, defaults={"distant_label": label}, ) updated = False if not created and m.distant_label != label: updated = True m.distant_label = label if not m.do_not_match and not m.local_slug: slug_key = "txt_idx" if hasattr(t_model, "slug"): slug_key = "slug" q = t_model.objects.filter(**{slug_key: m.distant_slug}) if q.count(): local_value = q.all()[0] setattr(m, "local_slug", getattr(local_value, slug_key)) m.local_label = str(local_value) updated = True if updated: m.save() if not created: result["updated"] += 1 if created: result["created"] += 1 # delete removed keys q = ApiKeyMatch.objects.filter( source=self, search_model=ct, associated_type=ct_type, ).exclude(distant_slug__in=current_matches) result["deleted"] += q.count() q.delete() return result def generate_match_document(self): if not UnoCalc: return uno = UnoCalc() calc = uno.create_calc() if not calc: return types = list( ApiKeyMatch.objects.filter( source=self, ) .order_by() .values_list("associated_type", flat=True) .distinct() ) lst_sheet = uno.get_or_create_sheet(calc, len(types), str(_("List types"))) for idx, tpe in enumerate(types): self._generate_match_page(idx, tpe, uno, calc, lst_sheet) tmpdir = tempfile.mkdtemp(prefix="ishtar-matches-") base = "{}-{}.ods".format(datetime.date.today().isoformat(), slugify(self.name)) dest_filename = "{}{}{}".format(tmpdir, os.sep, base) uno.save_calc(calc, dest_filename) with open(dest_filename, "rb") as fle: self.match_document = File(fle, base) self.save() return dest_filename def _generate_match_page(self, page_number, tpe, uno, calc, lst_sheet): model = ContentType.objects.get(pk=tpe).model_class() ROW_NUMBER = 1000 sheet = uno.get_or_create_sheet(calc, page_number) q = ApiKeyMatch.objects.filter(source=self, associated_type=tpe) if not q.count(): return sm = q.all()[0].search_model sheet.Name = f"{sm.app_label}.{sm.model}-{model._meta.app_label}.{model.__name__.lower()}" for col_number, column in enumerate( (_("Distant key"), _("Distant label"), _("Local")) ): # header cell = sheet.getCellByPosition(col_number, 0) cell.CharWeight = 150 cell.setString(str(column)) current_list = [] for idx, match in enumerate(q.all()): cell = sheet.getCellByPosition(0, idx + 1) if match.distant_slug in current_list: continue current_list.append(match.distant_slug) cell.setString(match.distant_slug) cell = sheet.getCellByPosition(1, idx + 1) cell.setString(match.distant_label) if match.local_label: cell = sheet.getCellByPosition(2, idx + 1) cell.setString(match.local_label) lst = [] for typ in model.get_types(instances=True): lst.append(str(typ)) end_row = uno.create_list( lst_sheet, page_number, 0, str(model._meta.verbose_name), lst ) uno.set_cell_validation_list( sheet, 3, 1, ROW_NUMBER + 2, lst_sheet, page_number, [1, end_row], ) def update_from_match_document(self): if not self.match_document: return if not UnoCalc: return uno = UnoCalc() calc = uno.open_calc(self.match_document.path) if not calc: return errors = [] updated = 0 for idx in range(uno.get_sheet_number(calc) - 1): # do not read the last sheet sheet = uno.get_sheet(calc, idx) sheet_name = sheet.Name try: search_model_name, ctype = sheet.Name.split("-") app_label, model_name = search_model_name.split(".") search_model = ContentType.objects.get( app_label=app_label, model=model_name ) except (ValueError, ContentType.DoesNotExist): errors.append(str(_(f"{sheet_name} is not a correct sheet name."))) continue try: app_label, model_name = ctype.split(".") ct = ContentType.objects.get(app_label=app_label, model=model_name) except (ValueError, ContentType.DoesNotExist): errors.append(str(_(f"{sheet_name} is not a correct sheet name."))) continue data = uno.sheet_get_data(sheet) base_q = ApiKeyMatch.objects.filter(source=self, associated_type=ct) model = ct.model_class() for idx_line, line in enumerate(data): if not idx_line: # header continue distant_key, distant_label, local_name = line q = base_q.filter(distant_slug=distant_key, distant_label=distant_label) if not q.count(): if distant_key: errors.append( str( _( f"{ctype} - {distant_key}, {distant_label} not referenced in the database." ) ) ) continue if q.filter(local_label=local_name).count(): # no change continue for api_key in q.all(): key_name = "slug" if hasattr(model, "txt_idx"): key_name = "txt_idx" q = model.objects.filter(label=local_name) if not q.count(): if local_name: errors.append( str( _( f"{ctype} - {local_name} do not exists in local database." ) ) ) continue tpe = q.all()[0] api_key.local_slug = getattr(tpe, key_name) api_key.local_label = local_name api_key.save() updated += 1 return {"errors": errors, "updated": updated} class ApiKeyMatch(models.Model): source = models.ForeignKey(ApiExternalSource, on_delete=models.CASCADE) search_model = models.ForeignKey( ContentType, on_delete=models.CASCADE, verbose_name=_("Search model"), related_name="key_match_search_models", ) associated_type = models.ForeignKey( ContentType, on_delete=models.CASCADE, verbose_name=_("Associated type"), related_name="key_match_types", ) search_keys = ArrayField( models.CharField(max_length=200), verbose_name=_("Search keys"), blank=True ) distant_slug = models.SlugField( verbose_name=_("Distant key"), max_length=200, allow_unicode=True ) distant_label = models.TextField( verbose_name=_("Distant value"), blank=True, default="" ) local_slug = models.SlugField( verbose_name=_("Local key"), max_length=200, allow_unicode=True ) local_label = models.TextField( verbose_name=_("Local value"), blank=True, default="" ) do_not_match = models.BooleanField( verbose_name=_("Disable match for this search"), default=False ) class Meta: verbose_name = _("API - Search - Key match") verbose_name_plural = _("API - Search - Keys matches") ADMIN_SECTION = _("API")