import datetime import os import tempfile from django.apps import apps from django.conf import settings from django.contrib.auth.models import User from django.contrib.contenttypes.models import ContentType from django.contrib.gis.db import models from django.contrib.postgres.fields import ArrayField from django.core.files import File from django.utils.text import slugify try: assert settings.USE_LIBREOFFICE from ishtar_common.libreoffice import UnoCalc from com.sun.star.awt.FontSlant import ITALIC except (AssertionError, ImportError): UnoCalc = None from ishtar_common.utils import ugettext_lazy as _ APP_CONTENT_TYPES = [ ("archaeological_operations", "operation"), ("archaeological_context_records", "contextrecord"), ("archaeological_finds", "find"), ("archaeological_warehouse", "warehouse"), ("archaeological_files", "file"), ] MAIN_CONTENT_TYPES = APP_CONTENT_TYPES + [ ("archaeological_operations", "archaeologicalsite"), ("archaeological_warehouse", "container"), ] class ApiUser(models.Model): user_ptr = models.OneToOneField( User, primary_key=True, related_name="apiuser", on_delete=models.CASCADE ) ip = models.GenericIPAddressField(verbose_name=_("IP")) class Meta: verbose_name = _("API - Remote access - User") verbose_name_plural = _("API - Remote access - Users") def __str__(self): return self.user_ptr.username class ApiSearchModel(models.Model): user = models.ForeignKey(ApiUser, on_delete=models.CASCADE) content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE) limit_query = models.TextField( verbose_name=_("Limit query"), blank=True, null=True, help_text=_("Search query add to each request"), ) class Meta: verbose_name = _("API - Remote access - Search model") verbose_name_plural = _("API - Remote access - Search models") class ApiExternalSource(models.Model): url = models.URLField(verbose_name=_("URL")) name = models.CharField(verbose_name=_("Name"), max_length=200) key = models.CharField(_("Key"), max_length=40) match_document = models.FileField( _("Match document"), blank=True, null=True, help_text=_( 'First use the "Update types from source" action. Then use the action ' '"Generate match document" action to create a default match document. ' "Complete it and attach it back to the source to finally use the action " '"Update association from match document".' ), ) class Meta: verbose_name = _("API - Search - External source") verbose_name_plural = _("API - Search - External sources") def __str__(self): return self.name def update_matches(self, content): result = { "created": 0, "updated": 0, "deleted": 0, "search_model do not exist": [], "type do not exist": [], } for search_model in content: app, model_name = search_model.split(".") try: ct = ContentType.objects.get(app_label=app, model=model_name) except ContentType.DoesNotExist: result["search_model do not exist"].append(search_model) continue for ct_type, keys, values in content[search_model]: tapp, tmodel_name = ct_type.split(".") try: ct_type = ContentType.objects.get(app_label=tapp, model=tmodel_name) except ContentType.DoesNotExist: result["type do not exist"].append(ct_type) continue t_model = ct_type.model_class() current_matches = [] for slug, label in values: current_matches.append(slug) m, created = ApiKeyMatch.objects.get_or_create( source=self, search_model=ct, associated_type=ct_type, search_keys=keys, distant_slug=slug, defaults={"distant_label": label}, ) updated = False if not created and m.distant_label != label: updated = True m.distant_label = label if not m.do_not_match and not m.local_slug: slug_key = "txt_idx" if hasattr(t_model, "slug"): slug_key = "slug" q = t_model.objects.filter(**{slug_key: m.distant_slug}) if q.count(): local_value = q.all()[0] setattr(m, "local_slug", getattr(local_value, slug_key)) m.local_label = str(local_value) updated = True if updated: m.save() if not created: result["updated"] += 1 if created: result["created"] += 1 # delete removed keys q = ApiKeyMatch.objects.filter( source=self, search_model=ct, associated_type=ct_type, ).exclude(distant_slug__in=current_matches) result["deleted"] += q.count() q.delete() return result def generate_match_document(self): if not UnoCalc: return uno = UnoCalc() calc = uno.create_calc() if not calc: return types = list( ApiKeyMatch.objects.filter( source=self, ) .order_by() .values_list("associated_type", flat=True) .distinct() ) lst_sheet = uno.get_or_create_sheet(calc, len(types), str(_("List types"))) for idx, tpe in enumerate(types): self._generate_match_page(idx, tpe, uno, calc, lst_sheet) tmpdir = tempfile.mkdtemp(prefix="ishtar-matches-") base = "{}-{}.ods".format(datetime.date.today().isoformat(), slugify(self.name)) dest_filename = "{}{}{}".format(tmpdir, os.sep, base) uno.save_calc(calc, dest_filename) with open(dest_filename, "rb") as fle: self.match_document = File(fle, base) self.save() return dest_filename def _generate_match_page(self, page_number, tpe, uno, calc, lst_sheet): model = ContentType.objects.get(pk=tpe).model_class() ROW_NUMBER = 1000 sheet = uno.get_or_create_sheet(calc, page_number) q = ApiKeyMatch.objects.filter(source=self, associated_type=tpe) if not q.count(): return sm = q.all()[0].search_model sheet.Name = ( f"{sm.app_label}.{sm.model}-{model._meta.app_label}.{model.__name__.lower()}" ) for col_number, column in enumerate( (_("Distant key"), _("Distant label"), _("Local")) ): # header cell = sheet.getCellByPosition(col_number, 0) cell.CharWeight = 150 cell.setString(str(column)) for idx, match in enumerate(q.all()): cell = sheet.getCellByPosition(0, idx + 1) cell.setString(match.distant_slug) cell = sheet.getCellByPosition(1, idx + 1) cell.setString(match.distant_label) if match.local_label: cell = sheet.getCellByPosition(2, idx + 1) cell.setString(match.local_label) lst = [] for typ in model.get_types(instances=True): lst.append(str(typ)) end_row = uno.create_list( lst_sheet, page_number, 0, str(model._meta.verbose_name), lst ) uno.set_cell_validation_list( sheet, 3, 1, ROW_NUMBER + 2, lst_sheet, page_number, [1, end_row], ) def update_from_match_document(self): if not self.match_document: return if not UnoCalc: return uno = UnoCalc() calc = uno.open_calc(self.match_document.path) if not calc: return errors = [] updated = 0 for idx in range(uno.get_sheet_number(calc) - 1): # do not read the last sheet sheet = uno.get_sheet(calc, idx) sheet_name = sheet.Name try: search_model_name, ctype = sheet.Name.split("-") app_label, model_name = search_model_name.split(".") search_model = ContentType.objects.get( app_label=app_label, model=model_name ) except (ValueError, ContentType.DoesNotExist): errors.append(str(_(f"{sheet_name} is not a correct sheet name."))) continue try: app_label, model_name = ctype.split(".") ct = ContentType.objects.get(app_label=app_label, model=model_name) except (ValueError, ContentType.DoesNotExist): errors.append(str(_(f"{sheet_name} is not a correct sheet name."))) continue data = uno.sheet_get_data(sheet) base_q = ApiKeyMatch.objects.filter( source=self, search_model=search_model, associated_type=ct ) model = ct.model_class() for idx_line, line in enumerate(data): if not idx_line: # header continue distant_key, distant_label, local_name = line q = base_q.filter(distant_slug=distant_key, distant_label=distant_label) if not q.count(): errors.append( str( _( f"{ctype} - {distant_key}, {distant_label} not referenced in the database." ) ) ) continue if q.filter(local_label=local_name).count(): # no change continue api_key = q.all()[0] key_name = "slug" if hasattr(model, "txt_idx"): key_name = "txt_idx" q = model.objects.filter(label=local_name) if not q.count(): errors.append( str( _( f"{ctype} - {local_name} do not exists in local database." ) ) ) continue tpe = q.all()[0] api_key.local_slug = getattr(tpe, key_name) api_key.local_label = local_name api_key.save() updated += 1 return {"errors": errors, "updated": updated} class ApiKeyMatch(models.Model): source = models.ForeignKey(ApiExternalSource, on_delete=models.CASCADE) search_model = models.ForeignKey( ContentType, on_delete=models.CASCADE, verbose_name=_("Search model"), related_name="key_match_search_models", ) associated_type = models.ForeignKey( ContentType, on_delete=models.CASCADE, verbose_name=_("Associated type"), related_name="key_match_types", ) search_keys = ArrayField( models.CharField(max_length=200), verbose_name=_("Search keys"), blank=True ) distant_slug = models.SlugField( verbose_name=_("Distant key"), max_length=200, allow_unicode=True ) distant_label = models.TextField( verbose_name=_("Distant value"), blank=True, default="" ) local_slug = models.SlugField( verbose_name=_("Local key"), max_length=200, allow_unicode=True ) local_label = models.TextField( verbose_name=_("Local value"), blank=True, default="" ) do_not_match = models.BooleanField( verbose_name=_("Disable match for this search"), default=False ) class Meta: verbose_name = _("API - Search - Key match") verbose_name_plural = _("API - Search - Keys matches")