diff options
| -rw-r--r-- | archaeological_operations/tests.py | 66 | ||||
| -rw-r--r-- | ishtar_common/admin.py | 54 | ||||
| -rw-r--r-- | ishtar_common/models_rest.py | 115 | 
3 files changed, 207 insertions, 28 deletions
| diff --git a/archaeological_operations/tests.py b/archaeological_operations/tests.py index 74ae5819a..ee1f2645c 100644 --- a/archaeological_operations/tests.py +++ b/archaeological_operations/tests.py @@ -4579,6 +4579,17 @@ class ApiTest(OperationInitTest, APITestCase):          )          return src +    def _mock_request(self, mock_get, json_file="external_source_types_1.json"): +        json_file = "../archaeological_operations/tests/" + json_file +        mock_get.return_value.status_code = 200 + +        def __json(): +            return json.loads(mock_get.return_value.text) + +        mock_get.return_value.json = __json +        with open(json_file, "r") as tpes: +            mock_get.return_value.text = tpes.read() +      @patch("requests.get")      def test_type_admin(self, mock_get):          # POV: local @@ -4623,10 +4634,7 @@ class ApiTest(OperationInitTest, APITestCase):              response.content.decode(),          ) -        with open( -            "../archaeological_operations/tests/external_source_types_1.json", "r" -        ) as tpes: -            mock_get.return_value.text = tpes.read() +        self._mock_request(mock_get)          response = self.client.post(url, params, follow=True)          result = {"created": "XXXXX"} @@ -4635,10 +4643,7 @@ class ApiTest(OperationInitTest, APITestCase):          )          self.assertIn(msg_created, response.content.decode())          # test incoherent search_model and types -        with open( -            "../archaeological_operations/tests/external_source_types_2.json", "r" -        ) as tpes: -            mock_get.return_value.text = tpes.read() +        self._mock_request(mock_get, "external_source_types_2.json")          response = self.client.post(url, params, follow=True)          content = response.content.decode() @@ -4662,7 +4667,50 @@ class ApiTest(OperationInitTest, APITestCase):          msg = str(_(f"{result['deleted']} matches deleted"))          self.assertIn(msg, response.content.decode()) -        pass +    @tag("libreoffice") +    @patch("requests.get") +    def test_match_doc(self, mock_get): +        if not settings.USE_LIBREOFFICE: +            return +        self._mock_request(mock_get) +        self.client.login(username=self.username, password=self.password) +        source = self._get_source() +        url = "/admin/{}/{}/".format( +            "ishtar_common", models_rest.ApiExternalSource.__name__.lower() +        ) +        params = { +            "action": "update_types_from_source", +            "_selected_action": [source.pk], +        } +        self.client.post(url, params, follow=True) + +        params["action"] = "generate_match_document" +        response = self.client.post(url, params, follow=True) +        #src_doc = source.generate_match_document() +        zip_file = zipfile.ZipFile(BytesIO(response.content)) + +        self.assertIsNone( +            zip_file.testzip(), +            "Libreoffice match doc generated is not a correct zip file.", +        ) + +        filename = None +        for name in zip_file.namelist(): +            if name == "content.xml": +                filename = name +                break +        self.assertIsNotNone(filename) + +        # only check that all operation types are listed +        with tempfile.TemporaryDirectory(prefix="tmp-ishtar-") as tmpdir: +            match_file = zip_file.extract(filename, tmpdir) +            with open(match_file) as content_file: +                content = content_file.read() +                for ope_type in models.OperationType.objects.filter( +                    available=True +                ).all(): +                    ope_type = str(ope_type).replace("'", "'") +                    self.assertIn(ope_type, content)      def test_query_transformation(self):          # POV: local diff --git a/ishtar_common/admin.py b/ishtar_common/admin.py index 59dc99f15..8124bbeed 100644 --- a/ishtar_common/admin.py +++ b/ishtar_common/admin.py @@ -2218,10 +2218,58 @@ def update_types_from_source(modeladmin, request, queryset):      return response -class ApiExternalSource(admin.ModelAdmin): +def generate_match_document(modeladmin, request, queryset): +    return_url = ( +            reverse( +                "admin:%s_%s_changelist" +                % (modeladmin.model._meta.app_label, modeladmin.model._meta.model_name) +            ) +            + "?" +            + urllib.parse.urlencode(request.GET) +    ) +    if queryset.count() != 1: +        return send_error_message( +            request, +            str(_("Select only one source.")), +            return_url, +            message_type=messages.WARNING, +        ) +    src_doc = queryset.all()[0].generate_match_document() +    in_memory = BytesIO() +    with open(src_doc, "rb") as fle: +        in_memory.write(fle.read()) +        filename = src_doc.split(os.sep)[-1] + +        response = HttpResponse( +            content_type="application/vnd.oasis.opendocument.spreadsheet" +        ) +        response["Content-Disposition"] = "attachment; filename=%s" % filename.replace( +            " ", "_" +        ) +        in_memory.seek(0) +        response.write(in_memory.read()) +        return response + + +class ApiExternalSourceAdmin(admin.ModelAdmin):      model = models_rest.ApiExternalSource -    actions = [update_types_from_source] +    actions = [update_types_from_source, generate_match_document]      list_display = ("name", "url", "key") -admin_site.register(models_rest.ApiExternalSource, ApiExternalSource) +admin_site.register(models_rest.ApiExternalSource, ApiExternalSourceAdmin) + + +class ApiKeyMatchAdmin(admin.ModelAdmin): +    model = models_rest.ApiKeyMatch +    list_display = ["source", "search_model", "associated_type", +                    "distant_slug", "distant_label", "local_slug", +                    "local_label", "do_not_match"] +    list_filter = ["source", "do_not_match"] +    search_fields = ["associated_type__model", "distant_slug", "distant_label"] +    actions = [ +        change_value("do_not_match", False, _("Enable match")), +        change_value("do_not_match", True, _("Disable match")), +    ] + +admin_site.register(models_rest.ApiKeyMatch, ApiKeyMatchAdmin) diff --git a/ishtar_common/models_rest.py b/ishtar_common/models_rest.py index 566a73127..bb260c30d 100644 --- a/ishtar_common/models_rest.py +++ b/ishtar_common/models_rest.py @@ -1,7 +1,20 @@ +import datetime +import os +import tempfile + +from django.conf import settings  from django.contrib.auth.models import User  from django.contrib.contenttypes.models import ContentType  from django.contrib.gis.db import models  from django.contrib.postgres.fields import ArrayField +from django.utils.text import slugify + +try: +    assert settings.USE_LIBREOFFICE +    from ishtar_common.libreoffice import UnoCalc +    from com.sun.star.awt.FontSlant import ITALIC +except (AssertionError, ImportError): +    UnoCalc = None  from ishtar_common.utils import ugettext_lazy as _ @@ -16,8 +29,8 @@ class ApiUser(models.Model):      ip = models.GenericIPAddressField(verbose_name=_("IP"))      class Meta: -        verbose_name = _("API - User") -        verbose_name_plural = _("API - Users") +        verbose_name = _("API - Remote access - User") +        verbose_name_plural = _("API - Remote access - Users")      def __str__(self):          return self.user_ptr.username @@ -34,8 +47,8 @@ class ApiSearchModel(models.Model):      )      class Meta: -        verbose_name = _("API - Search model") -        verbose_name_plural = _("API - Search models") +        verbose_name = _("API - Remote access - Search model") +        verbose_name_plural = _("API - Remote access - Search models")  class ApiExternalSource(models.Model): @@ -44,8 +57,11 @@ class ApiExternalSource(models.Model):      key = models.CharField(_("Key"), max_length=40)      class Meta: -        verbose_name = _("API - External source") -        verbose_name_plural = _("API - External sources") +        verbose_name = _("API - Search - External source") +        verbose_name_plural = _("API - Search - External sources") + +    def __str__(self): +        return self.name      def update_matches(self, content):          result = { @@ -93,7 +109,7 @@ class ApiExternalSource(models.Model):                          if q.count():                              local_value = q.all()[0]                              setattr(m, "local_slug", getattr(local_value, slug_key)) -                            m.local_value = str(local_value) +                            m.local_label = str(local_value)                              updated = True                      if updated:                          m.save() @@ -103,25 +119,92 @@ class ApiExternalSource(models.Model):                          result["created"] += 1                  # delete removed keys                  q = ApiKeyMatch.objects.filter( -                    source=self, search_model=ct, associated_type=ct_type, +                    source=self, +                    search_model=ct, +                    associated_type=ct_type,                  ).exclude(distant_slug__in=current_matches)                  result["deleted"] += q.count()                  q.delete()          return result -    def generate_match_csv(self): -        pass +    def generate_match_document(self): +        if not UnoCalc: +            return +        uno = UnoCalc() +        calc = uno.create_calc() +        if not calc: +            return +        types = list( +            ApiKeyMatch.objects.filter( +                source=self, +            ) +            .order_by() +            .values_list("associated_type", flat=True) +            .distinct() +        ) +        lst_sheet = uno.get_sheet(calc, len(types), str(_("List types"))) +        for idx, tpe in enumerate(types): +            self._generate_match_page(idx, tpe, uno, calc, lst_sheet) +        tmpdir = tempfile.mkdtemp(prefix="ishtar-matches-") +        dest_filename = "{}{}{}-{}.ods".format(tmpdir, os.sep, +                                               datetime.date.today().isoformat(), +                                               slugify(self.name)) +        uno.save_calc(calc, dest_filename) +        return dest_filename + +    def _generate_match_page(self, page_number, tpe, uno, calc, lst_sheet): +        model = ContentType.objects.get(pk=tpe).model_class() +        ROW_NUMBER = 1000 +        sheet = uno.get_sheet(calc, page_number) +        sheet.Name = str(model._meta.verbose_name) +        for col_number, column in enumerate( +            (_("Distant key"), _("Distant label"), _("Local")) +        ): +            # header +            cell = sheet.getCellByPosition(col_number, 0) +            cell.CharWeight = 150 +            cell.setString(str(column)) + +        for idx, match in enumerate( +            ApiKeyMatch.objects.filter(source=self, associated_type=tpe).all() +        ): +            cell = sheet.getCellByPosition(0, idx + 1) +            cell.setString(match.distant_slug) +            cell = sheet.getCellByPosition(1, idx + 1) +            cell.setString(match.distant_label) +            if match.local_label: +                cell = sheet.getCellByPosition(2, idx + 1) +                cell.setString(match.local_label) +        lst = [] +        for typ in model.get_types(instances=True): +            lst.append(str(typ)) +        end_row = uno.create_list( +            lst_sheet, page_number, 0, str(model._meta.verbose_name), lst +        ) +        uno.set_cell_validation_list( +            sheet, +            3, +            1, +            ROW_NUMBER + 2, +            lst_sheet, +            page_number, +            [1, end_row], +        )  class ApiKeyMatch(models.Model):      source = models.ForeignKey(ApiExternalSource, on_delete=models.CASCADE)      search_model = models.ForeignKey( -        ContentType, on_delete=models.CASCADE, verbose_name=_("Search model"), -        related_name="key_match_search_models" +        ContentType, +        on_delete=models.CASCADE, +        verbose_name=_("Search model"), +        related_name="key_match_search_models",      )      associated_type = models.ForeignKey( -        ContentType, on_delete=models.CASCADE, verbose_name=_("Associated type"), -        related_name="key_match_types" +        ContentType, +        on_delete=models.CASCADE, +        verbose_name=_("Associated type"), +        related_name="key_match_types",      )      search_keys = ArrayField(          models.CharField(max_length=200), verbose_name=_("Search keys"), blank=True @@ -143,5 +226,5 @@ class ApiKeyMatch(models.Model):      )      class Meta: -        verbose_name = _("API - Key match") -        verbose_name_plural = _("API - Keys matches") +        verbose_name = _("API - Search - Key match") +        verbose_name_plural = _("API - Search - Keys matches") | 
