diff options
author | Étienne Loks <etienne.loks@iggdrasil.net> | 2025-03-26 17:27:41 +0100 |
---|---|---|
committer | Étienne Loks <etienne.loks@iggdrasil.net> | 2025-07-21 15:07:41 +0200 |
commit | 5499e558a541a0c8740d608dc4446f64ebe774c6 (patch) | |
tree | 28fbac26740b4f2ea19a0261422deb7e46f7af65 /ishtar_common | |
parent | 62b34d0afb55a5c5c7bc1da22f0c0d293ee3936d (diff) | |
download | Ishtar-5499e558a541a0c8740d608dc4446f64ebe774c6.tar.bz2 Ishtar-5499e558a541a0c8740d608dc4446f64ebe774c6.zip |
🐛 fix unclosed file
Diffstat (limited to 'ishtar_common')
-rw-r--r-- | ishtar_common/data_importer.py | 18 | ||||
-rw-r--r-- | ishtar_common/forms_common.py | 2 | ||||
-rw-r--r-- | ishtar_common/models_imports.py | 10 | ||||
-rw-r--r-- | ishtar_common/tasks.py | 14 | ||||
-rw-r--r-- | ishtar_common/tests.py | 142 | ||||
-rw-r--r-- | ishtar_common/utils.py | 3 | ||||
-rw-r--r-- | ishtar_common/utils_migrations.py | 14 |
7 files changed, 105 insertions, 98 deletions
diff --git a/ishtar_common/data_importer.py b/ishtar_common/data_importer.py index 4259e1b8a..7df66a672 100644 --- a/ishtar_common/data_importer.py +++ b/ishtar_common/data_importer.py @@ -647,7 +647,6 @@ class FileFormater(Formater): return my_file def _format_zip(self, value, archive): - zp = zipfile.ZipFile(archive) value = value.strip().replace("\\", "/") items = value.replace("/", "_").split(".") base_dir = settings.MEDIA_ROOT + "imported" @@ -656,14 +655,15 @@ class FileFormater(Formater): filename = base_dir + os.sep + ".".join(items[:-1]) + "." + items[-1] try: - with open(filename, "wb") as f: - with zp.open(value) as z: - f.write(z.read()) - f = open(filename, "rb") - my_file = File(f) - # manually set the file size because of an issue with TempFile - my_file.size = os.stat(filename).st_size - return my_file + with zipfile.ZipFile(archive) as zp: + with open(filename, "wb") as f: + with zp.open(value) as z: + f.write(z.read()) + f = open(filename, "rb") + my_file = File(f) + # manually set the file size because of an issue with TempFile + my_file.size = os.stat(filename).st_size + return my_file except KeyError: raise ValueError( _('"%(value)s" is not a valid path for the given archive') diff --git a/ishtar_common/forms_common.py b/ishtar_common/forms_common.py index a5d02b11e..af1a24df0 100644 --- a/ishtar_common/forms_common.py +++ b/ishtar_common/forms_common.py @@ -443,6 +443,8 @@ class NewImportForm(BaseImportForm): # media is downloaded - clean the link item.imported_media_link = None item.save() + if hasattr(temp_file, "close"): + temp_file.close() return item diff --git a/ishtar_common/models_imports.py b/ishtar_common/models_imports.py index 363b2a1c0..ec300019c 100644 --- a/ishtar_common/models_imports.py +++ b/ishtar_common/models_imports.py @@ -1907,11 +1907,13 @@ class ImportGroup(BaseImport): imports = [] imported_file, imported_images = None, None if self.imported_file: - imported_file = ContentFile(self.imported_file.read()) - imported_file.name = self.imported_file.name.split(os.sep)[-1] + with open(self.imported_file.path, "rb") as imp: + imported_file = ContentFile(imp.read()) + imported_file.name = self.imported_file.name.split(os.sep)[-1] if self.imported_images: - imported_images = ContentFile(self.imported_images.read()) - imported_images.name = self.imported_images.name.split(os.sep)[-1] + with open(self.imported_images.path, "rb") as imp: + imported_images = ContentFile(imp.read()) + imported_images.name = self.imported_images.name.split(os.sep)[-1] for import_type_relation in self.importer_type.importer_types.all(): importer_type = import_type_relation.importer_type diff --git a/ishtar_common/tasks.py b/ishtar_common/tasks.py index 55c704ab1..24ae02dcc 100644 --- a/ishtar_common/tasks.py +++ b/ishtar_common/tasks.py @@ -173,13 +173,13 @@ def launch_export(export_task_id): if not export_task.geo: kwargs["no_geo"] = True archive_name = full_serialization(**kwargs) - result = open(archive_name, "rb") - export_task.result.save(archive_name.split(os.sep)[-1], File(result)) - os.remove(archive_name) - export_task.finished_date = datetime.datetime.now() - export_task.state = "F" - export_task.result_info = str(_("Export finished")) - export_task.save() + with open(archive_name, "rb") as result: + export_task.result.save(archive_name.split(os.sep)[-1], File(result)) + os.remove(archive_name) + export_task.finished_date = datetime.datetime.now() + export_task.state = "F" + export_task.result_info = str(_("Export finished")) + export_task.save() def update_towns(): diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py index bbe5db892..818967c64 100644 --- a/ishtar_common/tests.py +++ b/ishtar_common/tests.py @@ -732,17 +732,18 @@ class GenericSerializationTest: ) self.documents = [] for idx in range(12): - self.documents.append( - models.Document.objects.create( - title="Test{}".format(idx), - associated_file=SimpleUploadedFile("test.txt", b"no real content"), - image=SimpleUploadedFile( - name="test.png", - content=open(image_path, "rb").read(), - content_type="image/png", - ), + with open(image_path, "rb") as img: + self.documents.append( + models.Document.objects.create( + title="Test{}".format(idx), + associated_file=SimpleUploadedFile("test.txt", b"no real content"), + image=SimpleUploadedFile( + name="test.png", + content=img.read(), + content_type="image/png", + ), + ) ) - ) def generic_serialization_test(self, serialize, no_test=False, kwargs=None): if not kwargs: @@ -2718,8 +2719,10 @@ class BaseImportTest(TestCase): importer_filename = os.path.join(root, "importer-group.zip") restore_serialized(importer_filename) imp_group = models.ImporterGroup.objects.get(slug="chantier-des-depots") - imp_file = open(os.path.join(root, "importer-group.csv"), "rb") - imp_media = open(os.path.join(root, "importer-group-media.zip"), "rb") + with open(os.path.join(root, "importer-group.csv"), "rb") as imp: + imp_file = SimpleUploadedFile(imp.name, imp.read()) + with open(os.path.join(root, "importer-group-media.zip"), "rb") as imp: + imp_media = SimpleUploadedFile(imp.name, imp.read()) return imp_group, imp_file, imp_media def create_group_import(self, init=True): @@ -2727,8 +2730,8 @@ class BaseImportTest(TestCase): create_user() ishtar_user = models.IshtarUser.objects.all()[0] file_dict = { - "imported_file": SimpleUploadedFile(imp_file.name, imp_file.read()), - "imported_images": SimpleUploadedFile(imp_media.name, imp_media.read()) + "imported_file": imp_file, + "imported_images": imp_media } post_dict = { "importer_type": imp_group.pk, @@ -2891,13 +2894,12 @@ class ImportTestInterface(BaseImportTest): def _test_create_import_get_data(self): csv_path = os.path.join(LIB_BASE_PATH, "ishtar_common", "tests", "insee-test.csv") + with open(csv_path, "rb") as f: + imported_file = SimpleUploadedFile(name="insee-test.csv", content=f.read()) return { "name": "Test Name", "importer_type": self.importer_type.pk, - "imported_file": SimpleUploadedFile( - name="insee-test.csv", - content=open(csv_path, "rb").read(), - ), + "imported_file": imported_file, "encoding": "utf-8", "csv_sep": '|', "skip_lines": 1, @@ -3042,17 +3044,16 @@ class ImportTestInterface(BaseImportTest): def test_validation_zip_import_image(self): # init image_path = os.path.join(LIB_BASE_PATH, "ishtar_common", "tests", "test.png") + with open(image_path, "rb") as f: + imported_images = SimpleUploadedFile(name="test.png", content=f.read(), + content_type="image/png") data = { "name": "Import Zip Not Valid Must Fail", "importer_type": self.importer_type.pk, "encoding": "utf-8", "csv_sep": "|", "skip_lines": 1, - "imported_images": SimpleUploadedFile( - name="test.png", - content=open(image_path, "rb").read(), - content_type="image/png", - ), + "imported_images": imported_images, } # superuser @@ -3113,9 +3114,15 @@ class ImportTestInterface(BaseImportTest): path = os.path.join( LIB_BASE_PATH, "ishtar_common", "tests", "error-file.csv" ) - imprt.error_file = SimpleUploadedFile(name="error-file.csv", content=open(path, "rb").read(), content_type="text/csv") + with open(path, "rb") as f: + error_file = SimpleUploadedFile(name="error-file.csv", content=f.read(), + content_type="text/csv") + imprt.error_file = error_file imprt.save() - imprt2.error_file = SimpleUploadedFile(name="error-file.csv", content=open(path, "rb").read(), content_type="text/csv") + with open(path, "rb") as f: + error_file = SimpleUploadedFile(name="error-file.csv", content=f.read(), + content_type="text/csv") + imprt2.error_file = error_file imprt2.save() q = models.ImportLineError.objects.filter(import_item=imprt.pk) @@ -3261,27 +3268,27 @@ class ImportTest(BaseImportTest): SUB_IMPORT_MATCH_IDX = [1, 2] with tempfile.TemporaryDirectory() as tmpdir: - current_zip = zipfile.ZipFile(group_import.archive_file.path, "r") - name_list = current_zip.namelist() - self.assertIn("content.json", name_list) - current_zip.extract("content.json", tmpdir) - content_name = os.path.join(tmpdir, "content.json") - with open(content_name, "r") as content: - files = json.loads(content.read()) - self.assertIn("imported_file", files.keys()) - self.assertIn(files["imported_file"], name_list) - for idx in range(4): - self.assertIn(f"sub-{idx}-result_file", files.keys()) - self.assertIn(files[f"sub-{idx}-result_file"], name_list) - for idx in SUB_IMPORT_MATCH_IDX: - self.assertIn(f"sub-{idx}-match_file", files.keys()) - self.assertIn(files[f"sub-{idx}-match_file"], name_list) - for name in name_list: - current_zip.extract(name, tmpdir) - if name == files["imported_file"]: - with open(os.path.join(tmpdir, name), "r") as f: - result = f.read() - self.assertEqual(result, csv_content) + with zipfile.ZipFile(group_import.archive_file.path, "r") as current_zip: + name_list = current_zip.namelist() + self.assertIn("content.json", name_list) + current_zip.extract("content.json", tmpdir) + content_name = os.path.join(tmpdir, "content.json") + with open(content_name, "r") as content: + files = json.loads(content.read()) + self.assertIn("imported_file", files.keys()) + self.assertIn(files["imported_file"], name_list) + for idx in range(4): + self.assertIn(f"sub-{idx}-result_file", files.keys()) + self.assertIn(files[f"sub-{idx}-result_file"], name_list) + for idx in SUB_IMPORT_MATCH_IDX: + self.assertIn(f"sub-{idx}-match_file", files.keys()) + self.assertIn(files[f"sub-{idx}-match_file"], name_list) + for name in name_list: + current_zip.extract(name, tmpdir) + if name == files["imported_file"]: + with open(os.path.join(tmpdir, name), "r") as f: + result = f.read() + self.assertEqual(result, csv_content) group_import.unarchive("F") group_import = models.ImportGroup.objects.get(pk=group_import.pk) @@ -4639,14 +4646,13 @@ class StorageTest(TestCase): image_path = os.path.join( LIB_BASE_PATH, "ishtar_common", "tests", "test.png" ) + with open(image_path, "rb") as f: + image = SimpleUploadedFile(name="test.png", content=f.read(), + content_type="image/png") doc = models.Document.objects.create( source_type=self.st1, title="Operation report", - image=SimpleUploadedFile( - name="test.png", - content=open(image_path, "rb").read(), - content_type="image/png", - ), + image=image, ) p = doc.image.path.split(os.sep) # current save path @@ -4656,25 +4662,17 @@ class StorageTest(TestCase): if f.startswith("test"): os.remove(os.path.join(base_path, f)) doc = models.Document.objects.get(pk=doc.pk) - doc.image.save( - "test.png", - SimpleUploadedFile( - name="test.png", - content=open(image_path, "rb").read(), - content_type="image/png", - ), - ) + with open(image_path, "rb") as f: + image = SimpleUploadedFile(name="test.png", content=f.read(), + content_type="image/png") + doc.image.save("test.png", image) doc = models.Document.objects.get(pk=doc.pk) os.remove(doc.image.path) os.symlink("/tmp/ZZZZZZZZZZZZZZZ", doc.image.path) # bad link - doc.image.save( - "test.png", - SimpleUploadedFile( - name="test.png", - content=open(image_path, "rb").read(), - content_type="image/png", - ), - ) + with open(image_path, "rb") as f: + image = SimpleUploadedFile(name="test.png", content=f.read(), + content_type="image/png") + doc.image.save("test.png", image) doc.save() @@ -4863,13 +4861,15 @@ class DocumentTest(TestCase): pdf_path = os.path.join( LIB_BASE_PATH, "ishtar_common", "tests", "simple.pdf" ) - doc = models.Document.objects.create( - title="Document", - associated_file=SimpleUploadedFile( + with open(pdf_path, "rb") as f: + associated_file = SimpleUploadedFile( name="simple.pdf", - content=open(pdf_path, "rb").read(), + content=f.read(), content_type="application/pdf", ) + doc = models.Document.objects.create( + title="Document", + associated_file=associated_file ) doc.operations.add(self.ope1) doc = models.Document.objects.get(id=doc.pk) diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py index ca78993c2..10dee40ca 100644 --- a/ishtar_common/utils.py +++ b/ishtar_common/utils.py @@ -2249,7 +2249,8 @@ def generate_relation_graph( png_name = tempdir + os.path.sep + "relations.png" with open(png_name, "wb") as png_file: - svg2png(open(svg_tmp_name, "rb").read(), write_to=png_file) + with open(svg_tmp_name, "rb") as svg_tmp: + svg2png(svg_tmp.read(), write_to=png_file) with open(png_name, "rb") as png_file: django_file = File(png_file) attr = "relation_bitmap_image" + suffix diff --git a/ishtar_common/utils_migrations.py b/ishtar_common/utils_migrations.py index f11428e3c..5654a8897 100644 --- a/ishtar_common/utils_migrations.py +++ b/ishtar_common/utils_migrations.py @@ -28,12 +28,14 @@ def migrate_simple_image_to_m2m(base_model, image_model, rel_model, verbose=Fals image_instance = image_model.objects.create() try: - image_instance.image.save( - os.path.basename(item.image.path), File(open(item.image.path)) - ) - image_instance.thumbnail.save( - os.path.basename(item.thumbnail.path), File(open(item.thumbnail.path)) - ) + with open(item.image.path) as fle: + image_instance.image.save( + os.path.basename(item.image.path), File(fle) + ) + with open(item.thumbnail.path) as fle: + image_instance.thumbnail.save( + os.path.basename(item.thumbnail.path), File(fle) + ) except IOError: # image not on hard-drive item.image = None |