import datetime import os import tempfile from django.conf import settings from django.contrib.auth.models import User from django.contrib.contenttypes.models import ContentType from django.contrib.gis.db import models from django.contrib.postgres.fields import ArrayField from django.utils.text import slugify try: assert settings.USE_LIBREOFFICE from ishtar_common.libreoffice import UnoCalc from com.sun.star.awt.FontSlant import ITALIC except (AssertionError, ImportError): UnoCalc = None from ishtar_common.utils import ugettext_lazy as _ MAIN_CONTENT_TYPES = (("archaeological_operations", "operation"),) class ApiUser(models.Model): user_ptr = models.OneToOneField( User, primary_key=True, related_name="apiuser", on_delete=models.CASCADE ) ip = models.GenericIPAddressField(verbose_name=_("IP")) class Meta: verbose_name = _("API - Remote access - User") verbose_name_plural = _("API - Remote access - Users") def __str__(self): return self.user_ptr.username class ApiSearchModel(models.Model): user = models.ForeignKey(ApiUser, on_delete=models.CASCADE) content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE) limit_query = models.TextField( verbose_name=_("Limit query"), blank=True, null=True, help_text=_("Search query add to each request"), ) class Meta: verbose_name = _("API - Remote access - Search model") verbose_name_plural = _("API - Remote access - Search models") class ApiExternalSource(models.Model): url = models.URLField(verbose_name=_("URL")) name = models.CharField(verbose_name=_("Name"), max_length=200) key = models.CharField(_("Key"), max_length=40) class Meta: verbose_name = _("API - Search - External source") verbose_name_plural = _("API - Search - External sources") def __str__(self): return self.name def update_matches(self, content): result = { "created": 0, "updated": 0, "deleted": 0, "search_model do not exist": [], "type do not exist": [], } for search_model in content: app, model_name = search_model.split(".") try: ct = ContentType.objects.get(app_label=app, model=model_name) except ContentType.DoesNotExist: result["search_model do not exist"].append(search_model) continue for ct_type, keys, values in content[search_model]: tapp, tmodel_name = ct_type.split(".") try: ct_type = ContentType.objects.get(app_label=tapp, model=tmodel_name) except ContentType.DoesNotExist: result["type do not exist"].append(ct_type) continue t_model = ct_type.model_class() current_matches = [] for slug, label in values: current_matches.append(slug) m, created = ApiKeyMatch.objects.get_or_create( source=self, search_model=ct, associated_type=ct_type, search_keys=keys, distant_slug=slug, defaults={"distant_label": label}, ) updated = False if not created and m.distant_label != label: updated = True m.distant_label = label if not m.do_not_match and not m.local_slug: slug_key = "txt_idx" if hasattr(t_model, "slug"): slug_key = "slug" q = t_model.objects.filter(**{slug_key: m.distant_slug}) if q.count(): local_value = q.all()[0] setattr(m, "local_slug", getattr(local_value, slug_key)) m.local_label = str(local_value) updated = True if updated: m.save() if not created: result["updated"] += 1 if created: result["created"] += 1 # delete removed keys q = ApiKeyMatch.objects.filter( source=self, search_model=ct, associated_type=ct_type, ).exclude(distant_slug__in=current_matches) result["deleted"] += q.count() q.delete() return result def generate_match_document(self): if not UnoCalc: return uno = UnoCalc() calc = uno.create_calc() if not calc: return types = list( ApiKeyMatch.objects.filter( source=self, ) .order_by() .values_list("associated_type", flat=True) .distinct() ) lst_sheet = uno.get_sheet(calc, len(types), str(_("List types"))) for idx, tpe in enumerate(types): self._generate_match_page(idx, tpe, uno, calc, lst_sheet) tmpdir = tempfile.mkdtemp(prefix="ishtar-matches-") dest_filename = "{}{}{}-{}.ods".format(tmpdir, os.sep, datetime.date.today().isoformat(), slugify(self.name)) uno.save_calc(calc, dest_filename) return dest_filename def _generate_match_page(self, page_number, tpe, uno, calc, lst_sheet): model = ContentType.objects.get(pk=tpe).model_class() ROW_NUMBER = 1000 sheet = uno.get_sheet(calc, page_number) sheet.Name = str(model._meta.verbose_name) for col_number, column in enumerate( (_("Distant key"), _("Distant label"), _("Local")) ): # header cell = sheet.getCellByPosition(col_number, 0) cell.CharWeight = 150 cell.setString(str(column)) for idx, match in enumerate( ApiKeyMatch.objects.filter(source=self, associated_type=tpe).all() ): cell = sheet.getCellByPosition(0, idx + 1) cell.setString(match.distant_slug) cell = sheet.getCellByPosition(1, idx + 1) cell.setString(match.distant_label) if match.local_label: cell = sheet.getCellByPosition(2, idx + 1) cell.setString(match.local_label) lst = [] for typ in model.get_types(instances=True): lst.append(str(typ)) end_row = uno.create_list( lst_sheet, page_number, 0, str(model._meta.verbose_name), lst ) uno.set_cell_validation_list( sheet, 3, 1, ROW_NUMBER + 2, lst_sheet, page_number, [1, end_row], ) class ApiKeyMatch(models.Model): source = models.ForeignKey(ApiExternalSource, on_delete=models.CASCADE) search_model = models.ForeignKey( ContentType, on_delete=models.CASCADE, verbose_name=_("Search model"), related_name="key_match_search_models", ) associated_type = models.ForeignKey( ContentType, on_delete=models.CASCADE, verbose_name=_("Associated type"), related_name="key_match_types", ) search_keys = ArrayField( models.CharField(max_length=200), verbose_name=_("Search keys"), blank=True ) distant_slug = models.SlugField( verbose_name=_("Distant key"), max_length=200, allow_unicode=True ) distant_label = models.TextField( verbose_name=_("Distant value"), blank=True, default="" ) local_slug = models.SlugField( verbose_name=_("Local key"), max_length=200, allow_unicode=True ) local_label = models.TextField( verbose_name=_("Local value"), blank=True, default="" ) do_not_match = models.BooleanField( verbose_name=_("Disable match for this search"), default=False ) class Meta: verbose_name = _("API - Search - Key match") verbose_name_plural = _("API - Search - Keys matches")