diff options
-rw-r--r-- | archaeological_operations/tests.py | 66 | ||||
-rw-r--r-- | ishtar_common/admin.py | 54 | ||||
-rw-r--r-- | ishtar_common/models_rest.py | 115 |
3 files changed, 207 insertions, 28 deletions
diff --git a/archaeological_operations/tests.py b/archaeological_operations/tests.py index 74ae5819a..ee1f2645c 100644 --- a/archaeological_operations/tests.py +++ b/archaeological_operations/tests.py @@ -4579,6 +4579,17 @@ class ApiTest(OperationInitTest, APITestCase): ) return src + def _mock_request(self, mock_get, json_file="external_source_types_1.json"): + json_file = "../archaeological_operations/tests/" + json_file + mock_get.return_value.status_code = 200 + + def __json(): + return json.loads(mock_get.return_value.text) + + mock_get.return_value.json = __json + with open(json_file, "r") as tpes: + mock_get.return_value.text = tpes.read() + @patch("requests.get") def test_type_admin(self, mock_get): # POV: local @@ -4623,10 +4634,7 @@ class ApiTest(OperationInitTest, APITestCase): response.content.decode(), ) - with open( - "../archaeological_operations/tests/external_source_types_1.json", "r" - ) as tpes: - mock_get.return_value.text = tpes.read() + self._mock_request(mock_get) response = self.client.post(url, params, follow=True) result = {"created": "XXXXX"} @@ -4635,10 +4643,7 @@ class ApiTest(OperationInitTest, APITestCase): ) self.assertIn(msg_created, response.content.decode()) # test incoherent search_model and types - with open( - "../archaeological_operations/tests/external_source_types_2.json", "r" - ) as tpes: - mock_get.return_value.text = tpes.read() + self._mock_request(mock_get, "external_source_types_2.json") response = self.client.post(url, params, follow=True) content = response.content.decode() @@ -4662,7 +4667,50 @@ class ApiTest(OperationInitTest, APITestCase): msg = str(_(f"{result['deleted']} matches deleted")) self.assertIn(msg, response.content.decode()) - pass + @tag("libreoffice") + @patch("requests.get") + def test_match_doc(self, mock_get): + if not settings.USE_LIBREOFFICE: + return + self._mock_request(mock_get) + self.client.login(username=self.username, password=self.password) + source = self._get_source() + url = "/admin/{}/{}/".format( + "ishtar_common", models_rest.ApiExternalSource.__name__.lower() + ) + params = { + "action": "update_types_from_source", + "_selected_action": [source.pk], + } + self.client.post(url, params, follow=True) + + params["action"] = "generate_match_document" + response = self.client.post(url, params, follow=True) + #src_doc = source.generate_match_document() + zip_file = zipfile.ZipFile(BytesIO(response.content)) + + self.assertIsNone( + zip_file.testzip(), + "Libreoffice match doc generated is not a correct zip file.", + ) + + filename = None + for name in zip_file.namelist(): + if name == "content.xml": + filename = name + break + self.assertIsNotNone(filename) + + # only check that all operation types are listed + with tempfile.TemporaryDirectory(prefix="tmp-ishtar-") as tmpdir: + match_file = zip_file.extract(filename, tmpdir) + with open(match_file) as content_file: + content = content_file.read() + for ope_type in models.OperationType.objects.filter( + available=True + ).all(): + ope_type = str(ope_type).replace("'", "'") + self.assertIn(ope_type, content) def test_query_transformation(self): # POV: local diff --git a/ishtar_common/admin.py b/ishtar_common/admin.py index 59dc99f15..8124bbeed 100644 --- a/ishtar_common/admin.py +++ b/ishtar_common/admin.py @@ -2218,10 +2218,58 @@ def update_types_from_source(modeladmin, request, queryset): return response -class ApiExternalSource(admin.ModelAdmin): +def generate_match_document(modeladmin, request, queryset): + return_url = ( + reverse( + "admin:%s_%s_changelist" + % (modeladmin.model._meta.app_label, modeladmin.model._meta.model_name) + ) + + "?" + + urllib.parse.urlencode(request.GET) + ) + if queryset.count() != 1: + return send_error_message( + request, + str(_("Select only one source.")), + return_url, + message_type=messages.WARNING, + ) + src_doc = queryset.all()[0].generate_match_document() + in_memory = BytesIO() + with open(src_doc, "rb") as fle: + in_memory.write(fle.read()) + filename = src_doc.split(os.sep)[-1] + + response = HttpResponse( + content_type="application/vnd.oasis.opendocument.spreadsheet" + ) + response["Content-Disposition"] = "attachment; filename=%s" % filename.replace( + " ", "_" + ) + in_memory.seek(0) + response.write(in_memory.read()) + return response + + +class ApiExternalSourceAdmin(admin.ModelAdmin): model = models_rest.ApiExternalSource - actions = [update_types_from_source] + actions = [update_types_from_source, generate_match_document] list_display = ("name", "url", "key") -admin_site.register(models_rest.ApiExternalSource, ApiExternalSource) +admin_site.register(models_rest.ApiExternalSource, ApiExternalSourceAdmin) + + +class ApiKeyMatchAdmin(admin.ModelAdmin): + model = models_rest.ApiKeyMatch + list_display = ["source", "search_model", "associated_type", + "distant_slug", "distant_label", "local_slug", + "local_label", "do_not_match"] + list_filter = ["source", "do_not_match"] + search_fields = ["associated_type__model", "distant_slug", "distant_label"] + actions = [ + change_value("do_not_match", False, _("Enable match")), + change_value("do_not_match", True, _("Disable match")), + ] + +admin_site.register(models_rest.ApiKeyMatch, ApiKeyMatchAdmin) diff --git a/ishtar_common/models_rest.py b/ishtar_common/models_rest.py index 566a73127..bb260c30d 100644 --- a/ishtar_common/models_rest.py +++ b/ishtar_common/models_rest.py @@ -1,7 +1,20 @@ +import datetime +import os +import tempfile + +from django.conf import settings from django.contrib.auth.models import User from django.contrib.contenttypes.models import ContentType from django.contrib.gis.db import models from django.contrib.postgres.fields import ArrayField +from django.utils.text import slugify + +try: + assert settings.USE_LIBREOFFICE + from ishtar_common.libreoffice import UnoCalc + from com.sun.star.awt.FontSlant import ITALIC +except (AssertionError, ImportError): + UnoCalc = None from ishtar_common.utils import ugettext_lazy as _ @@ -16,8 +29,8 @@ class ApiUser(models.Model): ip = models.GenericIPAddressField(verbose_name=_("IP")) class Meta: - verbose_name = _("API - User") - verbose_name_plural = _("API - Users") + verbose_name = _("API - Remote access - User") + verbose_name_plural = _("API - Remote access - Users") def __str__(self): return self.user_ptr.username @@ -34,8 +47,8 @@ class ApiSearchModel(models.Model): ) class Meta: - verbose_name = _("API - Search model") - verbose_name_plural = _("API - Search models") + verbose_name = _("API - Remote access - Search model") + verbose_name_plural = _("API - Remote access - Search models") class ApiExternalSource(models.Model): @@ -44,8 +57,11 @@ class ApiExternalSource(models.Model): key = models.CharField(_("Key"), max_length=40) class Meta: - verbose_name = _("API - External source") - verbose_name_plural = _("API - External sources") + verbose_name = _("API - Search - External source") + verbose_name_plural = _("API - Search - External sources") + + def __str__(self): + return self.name def update_matches(self, content): result = { @@ -93,7 +109,7 @@ class ApiExternalSource(models.Model): if q.count(): local_value = q.all()[0] setattr(m, "local_slug", getattr(local_value, slug_key)) - m.local_value = str(local_value) + m.local_label = str(local_value) updated = True if updated: m.save() @@ -103,25 +119,92 @@ class ApiExternalSource(models.Model): result["created"] += 1 # delete removed keys q = ApiKeyMatch.objects.filter( - source=self, search_model=ct, associated_type=ct_type, + source=self, + search_model=ct, + associated_type=ct_type, ).exclude(distant_slug__in=current_matches) result["deleted"] += q.count() q.delete() return result - def generate_match_csv(self): - pass + def generate_match_document(self): + if not UnoCalc: + return + uno = UnoCalc() + calc = uno.create_calc() + if not calc: + return + types = list( + ApiKeyMatch.objects.filter( + source=self, + ) + .order_by() + .values_list("associated_type", flat=True) + .distinct() + ) + lst_sheet = uno.get_sheet(calc, len(types), str(_("List types"))) + for idx, tpe in enumerate(types): + self._generate_match_page(idx, tpe, uno, calc, lst_sheet) + tmpdir = tempfile.mkdtemp(prefix="ishtar-matches-") + dest_filename = "{}{}{}-{}.ods".format(tmpdir, os.sep, + datetime.date.today().isoformat(), + slugify(self.name)) + uno.save_calc(calc, dest_filename) + return dest_filename + + def _generate_match_page(self, page_number, tpe, uno, calc, lst_sheet): + model = ContentType.objects.get(pk=tpe).model_class() + ROW_NUMBER = 1000 + sheet = uno.get_sheet(calc, page_number) + sheet.Name = str(model._meta.verbose_name) + for col_number, column in enumerate( + (_("Distant key"), _("Distant label"), _("Local")) + ): + # header + cell = sheet.getCellByPosition(col_number, 0) + cell.CharWeight = 150 + cell.setString(str(column)) + + for idx, match in enumerate( + ApiKeyMatch.objects.filter(source=self, associated_type=tpe).all() + ): + cell = sheet.getCellByPosition(0, idx + 1) + cell.setString(match.distant_slug) + cell = sheet.getCellByPosition(1, idx + 1) + cell.setString(match.distant_label) + if match.local_label: + cell = sheet.getCellByPosition(2, idx + 1) + cell.setString(match.local_label) + lst = [] + for typ in model.get_types(instances=True): + lst.append(str(typ)) + end_row = uno.create_list( + lst_sheet, page_number, 0, str(model._meta.verbose_name), lst + ) + uno.set_cell_validation_list( + sheet, + 3, + 1, + ROW_NUMBER + 2, + lst_sheet, + page_number, + [1, end_row], + ) class ApiKeyMatch(models.Model): source = models.ForeignKey(ApiExternalSource, on_delete=models.CASCADE) search_model = models.ForeignKey( - ContentType, on_delete=models.CASCADE, verbose_name=_("Search model"), - related_name="key_match_search_models" + ContentType, + on_delete=models.CASCADE, + verbose_name=_("Search model"), + related_name="key_match_search_models", ) associated_type = models.ForeignKey( - ContentType, on_delete=models.CASCADE, verbose_name=_("Associated type"), - related_name="key_match_types" + ContentType, + on_delete=models.CASCADE, + verbose_name=_("Associated type"), + related_name="key_match_types", ) search_keys = ArrayField( models.CharField(max_length=200), verbose_name=_("Search keys"), blank=True @@ -143,5 +226,5 @@ class ApiKeyMatch(models.Model): ) class Meta: - verbose_name = _("API - Key match") - verbose_name_plural = _("API - Keys matches") + verbose_name = _("API - Search - Key match") + verbose_name_plural = _("API - Search - Keys matches") |