summaryrefslogtreecommitdiff
path: root/ishtar_common
diff options
context:
space:
mode:
authorÉtienne Loks <etienne.loks@iggdrasil.net>2025-03-26 17:27:41 +0100
committerÉtienne Loks <etienne.loks@iggdrasil.net>2025-07-21 15:07:41 +0200
commit5499e558a541a0c8740d608dc4446f64ebe774c6 (patch)
tree28fbac26740b4f2ea19a0261422deb7e46f7af65 /ishtar_common
parent62b34d0afb55a5c5c7bc1da22f0c0d293ee3936d (diff)
downloadIshtar-5499e558a541a0c8740d608dc4446f64ebe774c6.tar.bz2
Ishtar-5499e558a541a0c8740d608dc4446f64ebe774c6.zip
🐛 fix unclosed file
Diffstat (limited to 'ishtar_common')
-rw-r--r--ishtar_common/data_importer.py18
-rw-r--r--ishtar_common/forms_common.py2
-rw-r--r--ishtar_common/models_imports.py10
-rw-r--r--ishtar_common/tasks.py14
-rw-r--r--ishtar_common/tests.py142
-rw-r--r--ishtar_common/utils.py3
-rw-r--r--ishtar_common/utils_migrations.py14
7 files changed, 105 insertions, 98 deletions
diff --git a/ishtar_common/data_importer.py b/ishtar_common/data_importer.py
index 4259e1b8a..7df66a672 100644
--- a/ishtar_common/data_importer.py
+++ b/ishtar_common/data_importer.py
@@ -647,7 +647,6 @@ class FileFormater(Formater):
return my_file
def _format_zip(self, value, archive):
- zp = zipfile.ZipFile(archive)
value = value.strip().replace("\\", "/")
items = value.replace("/", "_").split(".")
base_dir = settings.MEDIA_ROOT + "imported"
@@ -656,14 +655,15 @@ class FileFormater(Formater):
filename = base_dir + os.sep + ".".join(items[:-1]) + "." + items[-1]
try:
- with open(filename, "wb") as f:
- with zp.open(value) as z:
- f.write(z.read())
- f = open(filename, "rb")
- my_file = File(f)
- # manually set the file size because of an issue with TempFile
- my_file.size = os.stat(filename).st_size
- return my_file
+ with zipfile.ZipFile(archive) as zp:
+ with open(filename, "wb") as f:
+ with zp.open(value) as z:
+ f.write(z.read())
+ f = open(filename, "rb")
+ my_file = File(f)
+ # manually set the file size because of an issue with TempFile
+ my_file.size = os.stat(filename).st_size
+ return my_file
except KeyError:
raise ValueError(
_('"%(value)s" is not a valid path for the given archive')
diff --git a/ishtar_common/forms_common.py b/ishtar_common/forms_common.py
index a5d02b11e..af1a24df0 100644
--- a/ishtar_common/forms_common.py
+++ b/ishtar_common/forms_common.py
@@ -443,6 +443,8 @@ class NewImportForm(BaseImportForm):
# media is downloaded - clean the link
item.imported_media_link = None
item.save()
+ if hasattr(temp_file, "close"):
+ temp_file.close()
return item
diff --git a/ishtar_common/models_imports.py b/ishtar_common/models_imports.py
index 363b2a1c0..ec300019c 100644
--- a/ishtar_common/models_imports.py
+++ b/ishtar_common/models_imports.py
@@ -1907,11 +1907,13 @@ class ImportGroup(BaseImport):
imports = []
imported_file, imported_images = None, None
if self.imported_file:
- imported_file = ContentFile(self.imported_file.read())
- imported_file.name = self.imported_file.name.split(os.sep)[-1]
+ with open(self.imported_file.path, "rb") as imp:
+ imported_file = ContentFile(imp.read())
+ imported_file.name = self.imported_file.name.split(os.sep)[-1]
if self.imported_images:
- imported_images = ContentFile(self.imported_images.read())
- imported_images.name = self.imported_images.name.split(os.sep)[-1]
+ with open(self.imported_images.path, "rb") as imp:
+ imported_images = ContentFile(imp.read())
+ imported_images.name = self.imported_images.name.split(os.sep)[-1]
for import_type_relation in self.importer_type.importer_types.all():
importer_type = import_type_relation.importer_type
diff --git a/ishtar_common/tasks.py b/ishtar_common/tasks.py
index 55c704ab1..24ae02dcc 100644
--- a/ishtar_common/tasks.py
+++ b/ishtar_common/tasks.py
@@ -173,13 +173,13 @@ def launch_export(export_task_id):
if not export_task.geo:
kwargs["no_geo"] = True
archive_name = full_serialization(**kwargs)
- result = open(archive_name, "rb")
- export_task.result.save(archive_name.split(os.sep)[-1], File(result))
- os.remove(archive_name)
- export_task.finished_date = datetime.datetime.now()
- export_task.state = "F"
- export_task.result_info = str(_("Export finished"))
- export_task.save()
+ with open(archive_name, "rb") as result:
+ export_task.result.save(archive_name.split(os.sep)[-1], File(result))
+ os.remove(archive_name)
+ export_task.finished_date = datetime.datetime.now()
+ export_task.state = "F"
+ export_task.result_info = str(_("Export finished"))
+ export_task.save()
def update_towns():
diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py
index bbe5db892..818967c64 100644
--- a/ishtar_common/tests.py
+++ b/ishtar_common/tests.py
@@ -732,17 +732,18 @@ class GenericSerializationTest:
)
self.documents = []
for idx in range(12):
- self.documents.append(
- models.Document.objects.create(
- title="Test{}".format(idx),
- associated_file=SimpleUploadedFile("test.txt", b"no real content"),
- image=SimpleUploadedFile(
- name="test.png",
- content=open(image_path, "rb").read(),
- content_type="image/png",
- ),
+ with open(image_path, "rb") as img:
+ self.documents.append(
+ models.Document.objects.create(
+ title="Test{}".format(idx),
+ associated_file=SimpleUploadedFile("test.txt", b"no real content"),
+ image=SimpleUploadedFile(
+ name="test.png",
+ content=img.read(),
+ content_type="image/png",
+ ),
+ )
)
- )
def generic_serialization_test(self, serialize, no_test=False, kwargs=None):
if not kwargs:
@@ -2718,8 +2719,10 @@ class BaseImportTest(TestCase):
importer_filename = os.path.join(root, "importer-group.zip")
restore_serialized(importer_filename)
imp_group = models.ImporterGroup.objects.get(slug="chantier-des-depots")
- imp_file = open(os.path.join(root, "importer-group.csv"), "rb")
- imp_media = open(os.path.join(root, "importer-group-media.zip"), "rb")
+ with open(os.path.join(root, "importer-group.csv"), "rb") as imp:
+ imp_file = SimpleUploadedFile(imp.name, imp.read())
+ with open(os.path.join(root, "importer-group-media.zip"), "rb") as imp:
+ imp_media = SimpleUploadedFile(imp.name, imp.read())
return imp_group, imp_file, imp_media
def create_group_import(self, init=True):
@@ -2727,8 +2730,8 @@ class BaseImportTest(TestCase):
create_user()
ishtar_user = models.IshtarUser.objects.all()[0]
file_dict = {
- "imported_file": SimpleUploadedFile(imp_file.name, imp_file.read()),
- "imported_images": SimpleUploadedFile(imp_media.name, imp_media.read())
+ "imported_file": imp_file,
+ "imported_images": imp_media
}
post_dict = {
"importer_type": imp_group.pk,
@@ -2891,13 +2894,12 @@ class ImportTestInterface(BaseImportTest):
def _test_create_import_get_data(self):
csv_path = os.path.join(LIB_BASE_PATH, "ishtar_common", "tests", "insee-test.csv")
+ with open(csv_path, "rb") as f:
+ imported_file = SimpleUploadedFile(name="insee-test.csv", content=f.read())
return {
"name": "Test Name",
"importer_type": self.importer_type.pk,
- "imported_file": SimpleUploadedFile(
- name="insee-test.csv",
- content=open(csv_path, "rb").read(),
- ),
+ "imported_file": imported_file,
"encoding": "utf-8",
"csv_sep": '|',
"skip_lines": 1,
@@ -3042,17 +3044,16 @@ class ImportTestInterface(BaseImportTest):
def test_validation_zip_import_image(self):
# init
image_path = os.path.join(LIB_BASE_PATH, "ishtar_common", "tests", "test.png")
+ with open(image_path, "rb") as f:
+ imported_images = SimpleUploadedFile(name="test.png", content=f.read(),
+ content_type="image/png")
data = {
"name": "Import Zip Not Valid Must Fail",
"importer_type": self.importer_type.pk,
"encoding": "utf-8",
"csv_sep": "|",
"skip_lines": 1,
- "imported_images": SimpleUploadedFile(
- name="test.png",
- content=open(image_path, "rb").read(),
- content_type="image/png",
- ),
+ "imported_images": imported_images,
}
# superuser
@@ -3113,9 +3114,15 @@ class ImportTestInterface(BaseImportTest):
path = os.path.join(
LIB_BASE_PATH, "ishtar_common", "tests", "error-file.csv"
)
- imprt.error_file = SimpleUploadedFile(name="error-file.csv", content=open(path, "rb").read(), content_type="text/csv")
+ with open(path, "rb") as f:
+ error_file = SimpleUploadedFile(name="error-file.csv", content=f.read(),
+ content_type="text/csv")
+ imprt.error_file = error_file
imprt.save()
- imprt2.error_file = SimpleUploadedFile(name="error-file.csv", content=open(path, "rb").read(), content_type="text/csv")
+ with open(path, "rb") as f:
+ error_file = SimpleUploadedFile(name="error-file.csv", content=f.read(),
+ content_type="text/csv")
+ imprt2.error_file = error_file
imprt2.save()
q = models.ImportLineError.objects.filter(import_item=imprt.pk)
@@ -3261,27 +3268,27 @@ class ImportTest(BaseImportTest):
SUB_IMPORT_MATCH_IDX = [1, 2]
with tempfile.TemporaryDirectory() as tmpdir:
- current_zip = zipfile.ZipFile(group_import.archive_file.path, "r")
- name_list = current_zip.namelist()
- self.assertIn("content.json", name_list)
- current_zip.extract("content.json", tmpdir)
- content_name = os.path.join(tmpdir, "content.json")
- with open(content_name, "r") as content:
- files = json.loads(content.read())
- self.assertIn("imported_file", files.keys())
- self.assertIn(files["imported_file"], name_list)
- for idx in range(4):
- self.assertIn(f"sub-{idx}-result_file", files.keys())
- self.assertIn(files[f"sub-{idx}-result_file"], name_list)
- for idx in SUB_IMPORT_MATCH_IDX:
- self.assertIn(f"sub-{idx}-match_file", files.keys())
- self.assertIn(files[f"sub-{idx}-match_file"], name_list)
- for name in name_list:
- current_zip.extract(name, tmpdir)
- if name == files["imported_file"]:
- with open(os.path.join(tmpdir, name), "r") as f:
- result = f.read()
- self.assertEqual(result, csv_content)
+ with zipfile.ZipFile(group_import.archive_file.path, "r") as current_zip:
+ name_list = current_zip.namelist()
+ self.assertIn("content.json", name_list)
+ current_zip.extract("content.json", tmpdir)
+ content_name = os.path.join(tmpdir, "content.json")
+ with open(content_name, "r") as content:
+ files = json.loads(content.read())
+ self.assertIn("imported_file", files.keys())
+ self.assertIn(files["imported_file"], name_list)
+ for idx in range(4):
+ self.assertIn(f"sub-{idx}-result_file", files.keys())
+ self.assertIn(files[f"sub-{idx}-result_file"], name_list)
+ for idx in SUB_IMPORT_MATCH_IDX:
+ self.assertIn(f"sub-{idx}-match_file", files.keys())
+ self.assertIn(files[f"sub-{idx}-match_file"], name_list)
+ for name in name_list:
+ current_zip.extract(name, tmpdir)
+ if name == files["imported_file"]:
+ with open(os.path.join(tmpdir, name), "r") as f:
+ result = f.read()
+ self.assertEqual(result, csv_content)
group_import.unarchive("F")
group_import = models.ImportGroup.objects.get(pk=group_import.pk)
@@ -4639,14 +4646,13 @@ class StorageTest(TestCase):
image_path = os.path.join(
LIB_BASE_PATH, "ishtar_common", "tests", "test.png"
)
+ with open(image_path, "rb") as f:
+ image = SimpleUploadedFile(name="test.png", content=f.read(),
+ content_type="image/png")
doc = models.Document.objects.create(
source_type=self.st1,
title="Operation report",
- image=SimpleUploadedFile(
- name="test.png",
- content=open(image_path, "rb").read(),
- content_type="image/png",
- ),
+ image=image,
)
p = doc.image.path.split(os.sep)
# current save path
@@ -4656,25 +4662,17 @@ class StorageTest(TestCase):
if f.startswith("test"):
os.remove(os.path.join(base_path, f))
doc = models.Document.objects.get(pk=doc.pk)
- doc.image.save(
- "test.png",
- SimpleUploadedFile(
- name="test.png",
- content=open(image_path, "rb").read(),
- content_type="image/png",
- ),
- )
+ with open(image_path, "rb") as f:
+ image = SimpleUploadedFile(name="test.png", content=f.read(),
+ content_type="image/png")
+ doc.image.save("test.png", image)
doc = models.Document.objects.get(pk=doc.pk)
os.remove(doc.image.path)
os.symlink("/tmp/ZZZZZZZZZZZZZZZ", doc.image.path) # bad link
- doc.image.save(
- "test.png",
- SimpleUploadedFile(
- name="test.png",
- content=open(image_path, "rb").read(),
- content_type="image/png",
- ),
- )
+ with open(image_path, "rb") as f:
+ image = SimpleUploadedFile(name="test.png", content=f.read(),
+ content_type="image/png")
+ doc.image.save("test.png", image)
doc.save()
@@ -4863,13 +4861,15 @@ class DocumentTest(TestCase):
pdf_path = os.path.join(
LIB_BASE_PATH, "ishtar_common", "tests", "simple.pdf"
)
- doc = models.Document.objects.create(
- title="Document",
- associated_file=SimpleUploadedFile(
+ with open(pdf_path, "rb") as f:
+ associated_file = SimpleUploadedFile(
name="simple.pdf",
- content=open(pdf_path, "rb").read(),
+ content=f.read(),
content_type="application/pdf",
)
+ doc = models.Document.objects.create(
+ title="Document",
+ associated_file=associated_file
)
doc.operations.add(self.ope1)
doc = models.Document.objects.get(id=doc.pk)
diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py
index ca78993c2..10dee40ca 100644
--- a/ishtar_common/utils.py
+++ b/ishtar_common/utils.py
@@ -2249,7 +2249,8 @@ def generate_relation_graph(
png_name = tempdir + os.path.sep + "relations.png"
with open(png_name, "wb") as png_file:
- svg2png(open(svg_tmp_name, "rb").read(), write_to=png_file)
+ with open(svg_tmp_name, "rb") as svg_tmp:
+ svg2png(svg_tmp.read(), write_to=png_file)
with open(png_name, "rb") as png_file:
django_file = File(png_file)
attr = "relation_bitmap_image" + suffix
diff --git a/ishtar_common/utils_migrations.py b/ishtar_common/utils_migrations.py
index f11428e3c..5654a8897 100644
--- a/ishtar_common/utils_migrations.py
+++ b/ishtar_common/utils_migrations.py
@@ -28,12 +28,14 @@ def migrate_simple_image_to_m2m(base_model, image_model, rel_model, verbose=Fals
image_instance = image_model.objects.create()
try:
- image_instance.image.save(
- os.path.basename(item.image.path), File(open(item.image.path))
- )
- image_instance.thumbnail.save(
- os.path.basename(item.thumbnail.path), File(open(item.thumbnail.path))
- )
+ with open(item.image.path) as fle:
+ image_instance.image.save(
+ os.path.basename(item.image.path), File(fle)
+ )
+ with open(item.thumbnail.path) as fle:
+ image_instance.thumbnail.save(
+ os.path.basename(item.thumbnail.path), File(fle)
+ )
except IOError:
# image not on hard-drive
item.image = None