import datetime import os import tempfile from django.conf import settings from django.contrib.auth.models import User from django.contrib.contenttypes.models import ContentType from django.contrib.gis.db import models from django.contrib.postgres.fields import ArrayField from django.core.files import File from django.utils.text import slugify from django.apps import apps from django.template import loader from ishtar_common.models_common import SheetFilter from ishtar_common.utils import ugettext_lazy as _ UnoCalc = None ITALIC = None if settings.USE_LIBREOFFICE: try: from ishtar_common.libreoffice import UnoCalc from com.sun.star.awt.FontSlant import ITALIC except ImportError: pass class ApiUser(models.Model): user_ptr = models.OneToOneField( User, primary_key=True, related_name="apiuser", on_delete=models.CASCADE, verbose_name=_("User") ) ip = models.GenericIPAddressField(verbose_name=_("IP")) class Meta: verbose_name = _("API - Remote access - User") verbose_name_plural = _("API - Remote access - Users") ADMIN_SECTION = _("API") def __str__(self): return self.user_ptr.username class ApiSearchModel(models.Model): user = models.ForeignKey(ApiUser, on_delete=models.CASCADE, verbose_name=_("User")) content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE) limit_query = models.TextField( verbose_name=_("Filter query"), blank=True, null=True, help_text=_("Search query add to each request"), ) table_format = models.ForeignKey( "ishtar_common.ImporterType", on_delete=models.PROTECT, null=True, verbose_name=_("Table formats"), related_name="search_model_table_format", help_text="Not used. Set it when table columns will be set by importer." ) export = models.ManyToManyField( "ishtar_common.ImporterType", blank=True, verbose_name=_("Export"), related_name="search_model_exports") class Meta: verbose_name = _("API - Remote access - Search model") verbose_name_plural = _("API - Remote access - Search models") ADMIN_SECTION = _("API") def __str__(self): return f"{self.user} - {self.content_type}" class ApiSheetFilter(SheetFilter): api_search_model = models.ForeignKey(ApiSearchModel, on_delete=models.CASCADE, related_name="sheet_filters") class Meta: verbose_name = _("API - Remote access - Sheet filter") verbose_name_plural = _("API - Remote access - Sheet filters") ADMIN_SECTION = _("API") def get_template(self): ct = self.api_search_model.content_type model = apps.get_model(ct.app_label, ct.model) tpl = loader.get_template(f"ishtar/sheet_{model.SLUG}.html") return tpl.template.origin.name def __str__(self): return f"{self.api_search_model} - {self.key}" class ApiExternalSource(models.Model): url = models.URLField(verbose_name=_("URL")) name = models.CharField(verbose_name=_("Name"), max_length=200) key = models.CharField(_("Key"), max_length=40) search_columns = models.TextField(_("Search columns"), default="") search_columns_label = models.TextField(_("Search columns label"), default="") exports = models.TextField(_("Exports slug"), default="") exports_label = models.TextField(_("Exports label"), default="") users = models.ManyToManyField("IshtarUser", blank=True, verbose_name=_("Users")) match_document = models.FileField( _("Match document"), blank=True, null=True, help_text=_( 'First use the "Update types from source" action. Then use the action ' '"Generate match document" action to create a default match document. ' "Complete it and attach it back to the source to finally use the action " '"Update association from match document".' ), ) class Meta: verbose_name = _("API - Search - External source") verbose_name_plural = _("API - Search - External sources") ordering = ("name",) ADMIN_SECTION = _("API") def __str__(self): return self.name def get_columns(self, model_name): """ Column keys for table display :return: (key1:str, key2:str, ...) - list of column key """ if not self.search_columns: return [] model_name += "-" return [ k[len(model_name):] for k in self.search_columns.split("||") if k.startswith(model_name) ] def get_column_labels(self, model_name): """ Column label for table display :return: (label1:str, label2:str, ...) - list of column labels """ if not self.search_columns_label: return [] model_name += "-" return [ k[len(model_name):] for k in self.search_columns_label.split("||") if k.startswith(model_name) ] def get_exports(self, model_name): """ Get export list :return: [(slug:slug, label:str)] - list of export slug and labels """ if not self.exports or not self.exports_label: return [] model_name += "-" exports = [ k[len(model_name):] for k in self.exports.split("||") if k.startswith(model_name) ] exports_label = [ k[len(model_name):] for k in self.exports_label.split("||") if k.startswith(model_name) ] result = [] for idx in range(min([len(exports), len(exports_label)])): result.append((exports[idx], exports_label[idx])) return result def save(self, force_insert=False, force_update=False, using=None, update_fields=None): super().save(force_insert, force_update, using, update_fields) # remove external sources from profiles if not in user anymore UserProfile = apps.get_model("ishtar_common", "UserProfile") q = UserProfile.objects.filter(external_sources=self).exclude( person__ishtaruser__in=list(self.users.all()) ) for profile in q.all(): profile.external_sources.remove(self) def update_matches(self, content): result = { "created": 0, "updated": 0, "deleted": 0, "search_model do not exist": [], "type do not exist": [], } for search_model in content: if search_model == "config": continue app, model_name = search_model.split(".") try: ct = ContentType.objects.get(app_label=app, model=model_name) except ContentType.DoesNotExist: result["search_model do not exist"].append(search_model) continue for ct_type, keys, values in content[search_model]: tapp, tmodel_name = ct_type.split(".") try: ct_type = ContentType.objects.get(app_label=tapp, model=tmodel_name) except ContentType.DoesNotExist: result["type do not exist"].append(ct_type) continue t_model = ct_type.model_class() current_matches = [] for slug, label in values: current_matches.append(slug) m, created = ApiKeyMatch.objects.get_or_create( source=self, search_model=ct, associated_type=ct_type, search_keys=keys, distant_slug=slug, defaults={"distant_label": label}, ) updated = False if not created and m.distant_label != label: updated = True m.distant_label = label if not m.do_not_match and not m.local_slug: slug_key = "txt_idx" if hasattr(t_model, "slug"): slug_key = "slug" q = t_model.objects.filter(**{slug_key: m.distant_slug}) if q.count(): local_value = q.all()[0] setattr(m, "local_slug", getattr(local_value, slug_key)) m.local_label = str(local_value) updated = True if updated: m.save() if not created: result["updated"] += 1 if created: result["created"] += 1 # delete removed keys q = ApiKeyMatch.objects.filter( source=self, search_model=ct, associated_type=ct_type, ).exclude(distant_slug__in=current_matches) result["deleted"] += q.count() q.delete() return result def generate_match_document(self): if not UnoCalc: return uno = UnoCalc() calc = uno.create_calc() if not calc: return types = list( ApiKeyMatch.objects.filter( source=self, ) .order_by() .values_list("associated_type", flat=True) .distinct() ) lst_sheet = uno.get_or_create_sheet(calc, len(types), str(_("List types"))) for idx, tpe in enumerate(types): self._generate_match_page(idx, tpe, uno, calc, lst_sheet) tmpdir = tempfile.mkdtemp(prefix="ishtar-matches-") base = "{}-{}.ods".format(datetime.date.today().isoformat(), slugify(self.name)) dest_filename = "{}{}{}".format(tmpdir, os.sep, base) uno.save_calc(calc, dest_filename) with open(dest_filename, "rb") as fle: self.match_document = File(fle, base) self.save() return dest_filename def _generate_match_page(self, page_number, tpe, uno, calc, lst_sheet): model = ContentType.objects.get(pk=tpe).model_class() ROW_NUMBER = 1000 sheet = uno.get_or_create_sheet(calc, page_number) q = ApiKeyMatch.objects.filter(source=self, associated_type=tpe) if not q.count(): return sm = q.all()[0].search_model sheet.Name = f"{sm.app_label}.{sm.model}-{model._meta.app_label}.{model.__name__.lower()}" for col_number, column in enumerate( (_("Distant key"), _("Distant label"), _("Local")) ): # header cell = sheet.getCellByPosition(col_number, 0) cell.CharWeight = 150 cell.setString(str(column)) current_list = [] for idx, match in enumerate(q.all()): cell = sheet.getCellByPosition(0, idx + 1) if match.distant_slug in current_list: continue current_list.append(match.distant_slug) cell.setString(match.distant_slug) cell = sheet.getCellByPosition(1, idx + 1) cell.setString(match.distant_label) if match.local_label: cell = sheet.getCellByPosition(2, idx + 1) cell.setString(match.local_label) lst = [] for typ in model.get_types(instances=True): lst.append(str(typ)) end_row = uno.create_list( lst_sheet, page_number, 0, str(model._meta.verbose_name), lst ) uno.set_cell_validation_list( sheet, 3, 1, ROW_NUMBER + 2, lst_sheet, page_number, [1, end_row], ) def update_from_match_document(self): if not self.match_document: return if not UnoCalc: return uno = UnoCalc() calc = uno.open_calc(self.match_document.path) if not calc: return errors = [] updated = 0 for idx in range(uno.get_sheet_number(calc) - 1): # do not read the last sheet sheet = uno.get_sheet(calc, idx) sheet_name = sheet.Name try: search_model_name, ctype = sheet.Name.split("-") app_label, model_name = search_model_name.split(".") search_model = ContentType.objects.get( app_label=app_label, model=model_name ) except (ValueError, ContentType.DoesNotExist): errors.append(str(_(f"{sheet_name} is not a correct sheet name."))) continue try: app_label, model_name = ctype.split(".") ct = ContentType.objects.get(app_label=app_label, model=model_name) except (ValueError, ContentType.DoesNotExist): errors.append(str(_(f"{sheet_name} is not a correct sheet name."))) continue data = uno.sheet_get_data(sheet) base_q = ApiKeyMatch.objects.filter(source=self, associated_type=ct) model = ct.model_class() for idx_line, line in enumerate(data): if not idx_line: # header continue distant_key, distant_label, local_name = line q = base_q.filter(distant_slug=distant_key, distant_label=distant_label) if not q.count(): if distant_key: errors.append( str( _( f"{ctype} - {distant_key}, {distant_label} not referenced in the database." ) ) ) continue if q.filter(local_label=local_name).count(): # no change continue for api_key in q.all(): key_name = "slug" if hasattr(model, "txt_idx"): key_name = "txt_idx" q = model.objects.filter(label=local_name) if not q.count(): if local_name: errors.append( str( _( f"{ctype} - {local_name} do not exists in local database." ) ) ) continue tpe = q.all()[0] api_key.local_slug = getattr(tpe, key_name) api_key.local_label = local_name api_key.save() updated += 1 return {"errors": errors, "updated": updated} class ApiKeyMatch(models.Model): source = models.ForeignKey(ApiExternalSource, on_delete=models.CASCADE) search_model = models.ForeignKey( ContentType, on_delete=models.CASCADE, verbose_name=_("Search model"), related_name="key_match_search_models", ) associated_type = models.ForeignKey( ContentType, on_delete=models.CASCADE, verbose_name=_("Associated type"), related_name="key_match_types", ) search_keys = ArrayField( models.CharField(max_length=200), verbose_name=_("Search keys"), blank=True ) distant_slug = models.SlugField( verbose_name=_("Distant key"), max_length=200, allow_unicode=True ) distant_label = models.TextField( verbose_name=_("Distant value"), blank=True, default="" ) local_slug = models.SlugField( verbose_name=_("Local key"), max_length=200, allow_unicode=True ) local_label = models.TextField( verbose_name=_("Local value"), blank=True, default="" ) do_not_match = models.BooleanField( verbose_name=_("Disable match for this search"), default=False ) class Meta: verbose_name = _("API - Search - Key match") verbose_name_plural = _("API - Search - Keys matches") ADMIN_SECTION = _("API")