summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorÉtienne Loks <etienne.loks@iggdrasil.net>2021-10-19 22:19:45 +0200
committerÉtienne Loks <etienne.loks@iggdrasil.net>2022-12-12 12:20:59 +0100
commita0db19bfca9c0f27ceb2a9fc1b85b4eed2e9e5b7 (patch)
tree19988c86ec4f03874017d54b3ad5a9ec6276a0a4
parent1b769cc99c16e42dd125914905b393782997acba (diff)
downloadIshtar-a0db19bfca9c0f27ceb2a9fc1b85b4eed2e9e5b7.tar.bz2
Ishtar-a0db19bfca9c0f27ceb2a9fc1b85b4eed2e9e5b7.zip
Syndication - generate libreoffice match document
-rw-r--r--archaeological_operations/tests.py66
-rw-r--r--ishtar_common/admin.py54
-rw-r--r--ishtar_common/models_rest.py115
3 files changed, 207 insertions, 28 deletions
diff --git a/archaeological_operations/tests.py b/archaeological_operations/tests.py
index 74ae5819a..ee1f2645c 100644
--- a/archaeological_operations/tests.py
+++ b/archaeological_operations/tests.py
@@ -4579,6 +4579,17 @@ class ApiTest(OperationInitTest, APITestCase):
)
return src
+ def _mock_request(self, mock_get, json_file="external_source_types_1.json"):
+ json_file = "../archaeological_operations/tests/" + json_file
+ mock_get.return_value.status_code = 200
+
+ def __json():
+ return json.loads(mock_get.return_value.text)
+
+ mock_get.return_value.json = __json
+ with open(json_file, "r") as tpes:
+ mock_get.return_value.text = tpes.read()
+
@patch("requests.get")
def test_type_admin(self, mock_get):
# POV: local
@@ -4623,10 +4634,7 @@ class ApiTest(OperationInitTest, APITestCase):
response.content.decode(),
)
- with open(
- "../archaeological_operations/tests/external_source_types_1.json", "r"
- ) as tpes:
- mock_get.return_value.text = tpes.read()
+ self._mock_request(mock_get)
response = self.client.post(url, params, follow=True)
result = {"created": "XXXXX"}
@@ -4635,10 +4643,7 @@ class ApiTest(OperationInitTest, APITestCase):
)
self.assertIn(msg_created, response.content.decode())
# test incoherent search_model and types
- with open(
- "../archaeological_operations/tests/external_source_types_2.json", "r"
- ) as tpes:
- mock_get.return_value.text = tpes.read()
+ self._mock_request(mock_get, "external_source_types_2.json")
response = self.client.post(url, params, follow=True)
content = response.content.decode()
@@ -4662,7 +4667,50 @@ class ApiTest(OperationInitTest, APITestCase):
msg = str(_(f"{result['deleted']} matches deleted"))
self.assertIn(msg, response.content.decode())
- pass
+ @tag("libreoffice")
+ @patch("requests.get")
+ def test_match_doc(self, mock_get):
+ if not settings.USE_LIBREOFFICE:
+ return
+ self._mock_request(mock_get)
+ self.client.login(username=self.username, password=self.password)
+ source = self._get_source()
+ url = "/admin/{}/{}/".format(
+ "ishtar_common", models_rest.ApiExternalSource.__name__.lower()
+ )
+ params = {
+ "action": "update_types_from_source",
+ "_selected_action": [source.pk],
+ }
+ self.client.post(url, params, follow=True)
+
+ params["action"] = "generate_match_document"
+ response = self.client.post(url, params, follow=True)
+ #src_doc = source.generate_match_document()
+ zip_file = zipfile.ZipFile(BytesIO(response.content))
+
+ self.assertIsNone(
+ zip_file.testzip(),
+ "Libreoffice match doc generated is not a correct zip file.",
+ )
+
+ filename = None
+ for name in zip_file.namelist():
+ if name == "content.xml":
+ filename = name
+ break
+ self.assertIsNotNone(filename)
+
+ # only check that all operation types are listed
+ with tempfile.TemporaryDirectory(prefix="tmp-ishtar-") as tmpdir:
+ match_file = zip_file.extract(filename, tmpdir)
+ with open(match_file) as content_file:
+ content = content_file.read()
+ for ope_type in models.OperationType.objects.filter(
+ available=True
+ ).all():
+ ope_type = str(ope_type).replace("'", "&apos;")
+ self.assertIn(ope_type, content)
def test_query_transformation(self):
# POV: local
diff --git a/ishtar_common/admin.py b/ishtar_common/admin.py
index 59dc99f15..8124bbeed 100644
--- a/ishtar_common/admin.py
+++ b/ishtar_common/admin.py
@@ -2218,10 +2218,58 @@ def update_types_from_source(modeladmin, request, queryset):
return response
-class ApiExternalSource(admin.ModelAdmin):
+def generate_match_document(modeladmin, request, queryset):
+ return_url = (
+ reverse(
+ "admin:%s_%s_changelist"
+ % (modeladmin.model._meta.app_label, modeladmin.model._meta.model_name)
+ )
+ + "?"
+ + urllib.parse.urlencode(request.GET)
+ )
+ if queryset.count() != 1:
+ return send_error_message(
+ request,
+ str(_("Select only one source.")),
+ return_url,
+ message_type=messages.WARNING,
+ )
+ src_doc = queryset.all()[0].generate_match_document()
+ in_memory = BytesIO()
+ with open(src_doc, "rb") as fle:
+ in_memory.write(fle.read())
+ filename = src_doc.split(os.sep)[-1]
+
+ response = HttpResponse(
+ content_type="application/vnd.oasis.opendocument.spreadsheet"
+ )
+ response["Content-Disposition"] = "attachment; filename=%s" % filename.replace(
+ " ", "_"
+ )
+ in_memory.seek(0)
+ response.write(in_memory.read())
+ return response
+
+
+class ApiExternalSourceAdmin(admin.ModelAdmin):
model = models_rest.ApiExternalSource
- actions = [update_types_from_source]
+ actions = [update_types_from_source, generate_match_document]
list_display = ("name", "url", "key")
-admin_site.register(models_rest.ApiExternalSource, ApiExternalSource)
+admin_site.register(models_rest.ApiExternalSource, ApiExternalSourceAdmin)
+
+
+class ApiKeyMatchAdmin(admin.ModelAdmin):
+ model = models_rest.ApiKeyMatch
+ list_display = ["source", "search_model", "associated_type",
+ "distant_slug", "distant_label", "local_slug",
+ "local_label", "do_not_match"]
+ list_filter = ["source", "do_not_match"]
+ search_fields = ["associated_type__model", "distant_slug", "distant_label"]
+ actions = [
+ change_value("do_not_match", False, _("Enable match")),
+ change_value("do_not_match", True, _("Disable match")),
+ ]
+
+admin_site.register(models_rest.ApiKeyMatch, ApiKeyMatchAdmin)
diff --git a/ishtar_common/models_rest.py b/ishtar_common/models_rest.py
index 566a73127..bb260c30d 100644
--- a/ishtar_common/models_rest.py
+++ b/ishtar_common/models_rest.py
@@ -1,7 +1,20 @@
+import datetime
+import os
+import tempfile
+
+from django.conf import settings
from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType
from django.contrib.gis.db import models
from django.contrib.postgres.fields import ArrayField
+from django.utils.text import slugify
+
+try:
+ assert settings.USE_LIBREOFFICE
+ from ishtar_common.libreoffice import UnoCalc
+ from com.sun.star.awt.FontSlant import ITALIC
+except (AssertionError, ImportError):
+ UnoCalc = None
from ishtar_common.utils import ugettext_lazy as _
@@ -16,8 +29,8 @@ class ApiUser(models.Model):
ip = models.GenericIPAddressField(verbose_name=_("IP"))
class Meta:
- verbose_name = _("API - User")
- verbose_name_plural = _("API - Users")
+ verbose_name = _("API - Remote access - User")
+ verbose_name_plural = _("API - Remote access - Users")
def __str__(self):
return self.user_ptr.username
@@ -34,8 +47,8 @@ class ApiSearchModel(models.Model):
)
class Meta:
- verbose_name = _("API - Search model")
- verbose_name_plural = _("API - Search models")
+ verbose_name = _("API - Remote access - Search model")
+ verbose_name_plural = _("API - Remote access - Search models")
class ApiExternalSource(models.Model):
@@ -44,8 +57,11 @@ class ApiExternalSource(models.Model):
key = models.CharField(_("Key"), max_length=40)
class Meta:
- verbose_name = _("API - External source")
- verbose_name_plural = _("API - External sources")
+ verbose_name = _("API - Search - External source")
+ verbose_name_plural = _("API - Search - External sources")
+
+ def __str__(self):
+ return self.name
def update_matches(self, content):
result = {
@@ -93,7 +109,7 @@ class ApiExternalSource(models.Model):
if q.count():
local_value = q.all()[0]
setattr(m, "local_slug", getattr(local_value, slug_key))
- m.local_value = str(local_value)
+ m.local_label = str(local_value)
updated = True
if updated:
m.save()
@@ -103,25 +119,92 @@ class ApiExternalSource(models.Model):
result["created"] += 1
# delete removed keys
q = ApiKeyMatch.objects.filter(
- source=self, search_model=ct, associated_type=ct_type,
+ source=self,
+ search_model=ct,
+ associated_type=ct_type,
).exclude(distant_slug__in=current_matches)
result["deleted"] += q.count()
q.delete()
return result
- def generate_match_csv(self):
- pass
+ def generate_match_document(self):
+ if not UnoCalc:
+ return
+ uno = UnoCalc()
+ calc = uno.create_calc()
+ if not calc:
+ return
+ types = list(
+ ApiKeyMatch.objects.filter(
+ source=self,
+ )
+ .order_by()
+ .values_list("associated_type", flat=True)
+ .distinct()
+ )
+ lst_sheet = uno.get_sheet(calc, len(types), str(_("List types")))
+ for idx, tpe in enumerate(types):
+ self._generate_match_page(idx, tpe, uno, calc, lst_sheet)
+ tmpdir = tempfile.mkdtemp(prefix="ishtar-matches-")
+ dest_filename = "{}{}{}-{}.ods".format(tmpdir, os.sep,
+ datetime.date.today().isoformat(),
+ slugify(self.name))
+ uno.save_calc(calc, dest_filename)
+ return dest_filename
+
+ def _generate_match_page(self, page_number, tpe, uno, calc, lst_sheet):
+ model = ContentType.objects.get(pk=tpe).model_class()
+ ROW_NUMBER = 1000
+ sheet = uno.get_sheet(calc, page_number)
+ sheet.Name = str(model._meta.verbose_name)
+ for col_number, column in enumerate(
+ (_("Distant key"), _("Distant label"), _("Local"))
+ ):
+ # header
+ cell = sheet.getCellByPosition(col_number, 0)
+ cell.CharWeight = 150
+ cell.setString(str(column))
+
+ for idx, match in enumerate(
+ ApiKeyMatch.objects.filter(source=self, associated_type=tpe).all()
+ ):
+ cell = sheet.getCellByPosition(0, idx + 1)
+ cell.setString(match.distant_slug)
+ cell = sheet.getCellByPosition(1, idx + 1)
+ cell.setString(match.distant_label)
+ if match.local_label:
+ cell = sheet.getCellByPosition(2, idx + 1)
+ cell.setString(match.local_label)
+ lst = []
+ for typ in model.get_types(instances=True):
+ lst.append(str(typ))
+ end_row = uno.create_list(
+ lst_sheet, page_number, 0, str(model._meta.verbose_name), lst
+ )
+ uno.set_cell_validation_list(
+ sheet,
+ 3,
+ 1,
+ ROW_NUMBER + 2,
+ lst_sheet,
+ page_number,
+ [1, end_row],
+ )
class ApiKeyMatch(models.Model):
source = models.ForeignKey(ApiExternalSource, on_delete=models.CASCADE)
search_model = models.ForeignKey(
- ContentType, on_delete=models.CASCADE, verbose_name=_("Search model"),
- related_name="key_match_search_models"
+ ContentType,
+ on_delete=models.CASCADE,
+ verbose_name=_("Search model"),
+ related_name="key_match_search_models",
)
associated_type = models.ForeignKey(
- ContentType, on_delete=models.CASCADE, verbose_name=_("Associated type"),
- related_name="key_match_types"
+ ContentType,
+ on_delete=models.CASCADE,
+ verbose_name=_("Associated type"),
+ related_name="key_match_types",
)
search_keys = ArrayField(
models.CharField(max_length=200), verbose_name=_("Search keys"), blank=True
@@ -143,5 +226,5 @@ class ApiKeyMatch(models.Model):
)
class Meta:
- verbose_name = _("API - Key match")
- verbose_name_plural = _("API - Keys matches")
+ verbose_name = _("API - Search - Key match")
+ verbose_name_plural = _("API - Search - Keys matches")