summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--archaeological_operations/tests.py65
-rw-r--r--ishtar_common/models_imports.py71
-rw-r--r--ishtar_common/views.py2
3 files changed, 98 insertions, 40 deletions
diff --git a/archaeological_operations/tests.py b/archaeological_operations/tests.py
index b8db7e2c7..385337213 100644
--- a/archaeological_operations/tests.py
+++ b/archaeological_operations/tests.py
@@ -91,7 +91,8 @@ from ishtar_common.models import (
Document,
ValueFormater,
Regexp,
- MediaExporter
+ MediaExporter,
+ SourceType
)
from ishtar_common.models_rest import ApiUser, ApiSearchModel, ApiSheetFilter
from archaeological_files.models import File, FileType
@@ -4101,7 +4102,8 @@ class OperationExportMediaTest(TestCase, TestPermissionQuery, OperationInitTest)
response = c.get(self.export_url)
self.assertEqual(response.status_code, 200)
- def _test_files(self, response, filenames):
+ def _test_files(self, response, filenames, exclude=None):
+ exclude = exclude or []
with tempfile.TemporaryDirectory() as tmpdir:
f, z = None, None
try:
@@ -4117,6 +4119,8 @@ class OperationExportMediaTest(TestCase, TestPermissionQuery, OperationInitTest)
image = f"{tmpdir}/{filename}"
with Image.open(image) as im:
self.assertIsNone(im.verify())
+ for excluded in exclude:
+ self.assertNotIn(exclude, name_list)
finally:
if z:
z.close()
@@ -4124,27 +4128,37 @@ class OperationExportMediaTest(TestCase, TestPermissionQuery, OperationInitTest)
f.close()
def test_content(self):
+ document = self.documents[0]
+ document.pk = None
+ document.save()
+ self.operation.documents.add(document)
+
c = Client()
c.login(username=self.username, password=self.password)
response = c.get(self.export_url)
- self._test_files(response, [('operation_image_00001.png', True)])
+ self._test_files(response, [
+ ('operation_image_00001.png', True),
+ ('operation_image_00002.png', True),
+ ])
self.exporter.files_to_export = 'A'
self.exporter.save()
response = c.get(self.export_url)
- self._test_files(
- response,
- [('operation_image_00001.png', True), ('operation_file_00001.txt', False), ]
- )
+ self._test_files(response, [
+ ('operation_image_00001.png', True),
+ ('operation_image_00002.png', True),
+ ('operation_file_00001.txt', False),
+ ])
self.exporter.thumbnail_for_images = True
self.exporter.save()
response = c.get(self.export_url)
- self._test_files(
- response,
- [('operation_image_00001.jpg', True), ('operation_file_00001.txt', False), ]
- )
+ self._test_files(response, [
+ ('operation_image_00001.jpg', True),
+ ('operation_image_00002.jpg', True),
+ ('operation_file_00001.txt', False),
+ ])
def _add_find(self):
cr_data = {
@@ -4189,6 +4203,35 @@ class OperationExportMediaTest(TestCase, TestPermissionQuery, OperationInitTest)
('find_image_00001.png', True),]
)
+ def test_query(self):
+ document_type = SourceType.objects.all()[0]
+ document = self.documents[0]
+ document.pk = None
+ document.source_type = document_type
+ document.save()
+ self.operation.documents.add(document)
+
+ c = Client()
+ c.login(username=self.username, password=self.password)
+
+ # classic export
+ response = c.get(self.export_url)
+ self._test_files(
+ response,
+ [('operation_image_00001.png', True),
+ ('operation_image_00002.png', True)]
+ )
+
+ # filter: only one document is relevant
+ self.exporter.query = f"type={document_type.label}"
+ self.exporter.save()
+ response = c.get(self.export_url)
+ self._test_files(
+ response,
+ [('operation_image_00001.png', True)],
+ exclude=[('operation_image_00002.png', True)]
+ )
+
class LabelTest(TestCase, OperationInitTest):
fixtures = FILE_FIXTURES
diff --git a/ishtar_common/models_imports.py b/ishtar_common/models_imports.py
index 0157b33ee..23fb4f38e 100644
--- a/ishtar_common/models_imports.py
+++ b/ishtar_common/models_imports.py
@@ -100,6 +100,7 @@ from ishtar_common.data_importer import (
LowerCaseFormater,
)
from ishtar_common.utils import task
+from ishtar_common.views_item import get_item
from ishtar_common.ignf_utils import IGNF
@@ -3052,8 +3053,8 @@ class MediaExporter(models.Model):
query = models.TextField(
_("Filter query"), blank=True, null=False, default="",
help_text=_(
- "Use 'text' query used in Ishtar search input. Can be left empty "
- "to export all."
+ "Use 'text' query used in Ishtar search input. The query is from the "
+ "document point of view. Can be left empty to export all attached media."
)
)
naming = models.TextField(
@@ -3128,13 +3129,7 @@ class MediaExporter(models.Model):
down += list(getattr(o, attr).all())
return down
- def export(self, obj, tmpdir=None):
- if not tmpdir:
- tmpdir = tempfile.mkdtemp()
- if not hasattr(obj, "documents"):
- # database inconstency - should not occur
- return
- q = None
+ def _get_query(self, ishtaruser):
media_attrs = []
if self.files_to_export in ("A", "I"):
q = Q(image__isnull=False)
@@ -3149,24 +3144,18 @@ class MediaExporter(models.Model):
else:
q = q2
media_attrs.append("associated_file")
- archive_path = os.path.join(tmpdir, "archive")
- os.mkdir(archive_path)
- items = [obj]
- if self.cascade:
- down = self._get_down_objects([obj])
- items += down
- while down:
- down = self._get_down_objects(down)
- items += down
- counters = {}
- for item in items:
- if not hasattr(item, "documents"):
- continue
- self._copy_media(item, q, media_attrs, counters, archive_path)
- now = datetime.datetime.now()
- archive_name = os.path.join(tmpdir, f"media-{now.strftime('%Y-%m-%d-%H%M%S')}")
- shutil.make_archive(archive_name, "zip", archive_path)
- return archive_name + ".zip"
+ # if query is empty do also the query in order to filter with permissions
+ Document = apps.get_model("ishtar_common", "Document")
+ _get_item = get_item(
+ Document,
+ "", "", no_permission_check=True,
+ )
+ query = {"search_vector": self.query or ""}
+ ids = _get_item(
+ None, return_query=True, ishtaruser=ishtaruser, query=query
+ ).values_list("id", flat=True)
+ q = q & Q(pk__in=ids)
+ return q, media_attrs
def _copy_media(self, item, q, media_attrs, counters, archive_path):
item_type = item.SLUG
@@ -3189,5 +3178,31 @@ class MediaExporter(models.Model):
if key not in counters:
counters[key] = 0
counters[key] += 1
- name += f"_{idx + counters[key]:05d}.{ext}"
+ name += f"_{counters[key]:05d}.{ext}"
shutil.copy(media.path, os.path.join(archive_path, name))
+
+ def export(self, obj, ishtaruser, tmpdir=None):
+ if not tmpdir:
+ tmpdir = tempfile.mkdtemp()
+ if not hasattr(obj, "documents"):
+ # database inconstency - should not occur
+ return
+ q, media_attrs = self._get_query(ishtaruser)
+ archive_path = os.path.join(tmpdir, "archive")
+ os.mkdir(archive_path)
+ items = [obj]
+ if self.cascade:
+ down = self._get_down_objects([obj])
+ items += down
+ while down:
+ down = self._get_down_objects(down)
+ items += down
+ counters = {}
+ for item in items:
+ if not hasattr(item, "documents"):
+ continue
+ self._copy_media(item, q, media_attrs, counters, archive_path)
+ now = datetime.datetime.now()
+ archive_name = os.path.join(tmpdir, f"media-{now.strftime('%Y-%m-%d-%H%M%S')}")
+ shutil.make_archive(archive_name, "zip", archive_path)
+ return archive_name + ".zip"
diff --git a/ishtar_common/views.py b/ishtar_common/views.py
index 95cc37990..aa47040aa 100644
--- a/ishtar_common/views.py
+++ b/ishtar_common/views.py
@@ -1674,7 +1674,7 @@ class ExportMediaView(IshtarMixin, LoginRequiredMixin, View):
if not obj:
return HttpResponse(content_type="text/plain")
with tempfile.TemporaryDirectory() as tmpdir:
- export = exporter.export(obj, tmpdir=tmpdir)
+ export = exporter.export(obj, request.user.ishtaruser, tmpdir=tmpdir)
if not export:
return HttpResponse(content_type="text/plain")
with open(export, "rb") as f: