From 8527647f7bc4cf91afa0db6b71f7bea70bf646b9 Mon Sep 17 00:00:00 2001 From: Étienne Loks Date: Mon, 22 Feb 2021 12:13:51 +0100 Subject: Zip/unzip files on archive/unarchive imports --- ishtar_common/tests.py | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) (limited to 'ishtar_common/tests.py') diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py index d6802ef77..71e599d59 100644 --- a/ishtar_common/tests.py +++ b/ishtar_common/tests.py @@ -2274,6 +2274,84 @@ class ImportTest(TestCase): imported_file=mcc_operation_file) return imprt + def test_archive_import(self): + imprt = self.create_import() + with open(imprt.imported_file.path, "r") as f: + csv_content = f.read() + with tempfile.TemporaryDirectory() as tmpdir: + for k in ("error_file", "result_file", "match_file", + "imported_images"): + sample_file = os.path.join(tmpdir, "media_{}.zip".format(k)) + with open(sample_file, "w") as m: + m.write("test" + k) + with open(sample_file, "rb") as raw_file: + getattr(imprt, k).save("media.txt", DjangoFile(raw_file)) + profile = models.get_current_profile() + profile.delete_image_zip_on_archive = False + profile.save() + imprt.archive() + imprt = models.Import.objects.get(pk=imprt.pk) + self.assertEqual(imprt.state, "AC") + self.assertFalse(imprt.error_file) + self.assertFalse(imprt.result_file) + self.assertFalse(imprt.match_file) + self.assertTrue(imprt.imported_images) + self.assertTrue(imprt.archive_file) + self.assertTrue(zipfile.is_zipfile(imprt.archive_file)) + with tempfile.TemporaryDirectory() as tmpdir: + current_zip = zipfile.ZipFile(imprt.archive_file.path, 'r') + name_list = current_zip.namelist() + self.assertIn("content.json", name_list) + current_zip.extract("content.json", tmpdir) + content_name = os.path.join(tmpdir, "content.json") + with open(content_name, "r") as content: + files = json.loads(content.read()) + self.assertIn("imported_file", files.keys()) + self.assertIn(files["imported_file"], name_list) + self.assertIn("error_file", files.keys()) + self.assertIn(files["error_file"], name_list) + self.assertIn("result_file", files.keys()) + self.assertIn(files["result_file"], name_list) + self.assertIn("match_file", files.keys()) + self.assertIn(files["match_file"], name_list) + rev_dict = {v: k for k, v in files.items()} + for name in name_list: + current_zip.extract(name, tmpdir) + if name.endswith(".txt"): + with open(os.path.join(tmpdir, name), "r") as f: + self.assertEqual(f.read(), "test" + rev_dict[name]) + elif name.endswith(".csv"): # imported file + with open(os.path.join(tmpdir, name), "r") as f: + self.assertEqual(f.read(), csv_content) + + imprt.unarchive('FE') + imprt = models.Import.objects.get(pk=imprt.pk) + self.assertEqual(imprt.state, "FE") + for k in ("error_file", "result_file", "match_file", "imported_images"): + field = getattr(imprt, k) + self.assertTrue(field, "{} is missing in unarchive".format(k)) + with open(field.path, "r") as f: + self.assertEqual(f.read(), "test" + k) + field = getattr(imprt, "imported_file") + self.assertTrue(field, "{} is missing in unarchive".format(k)) + with open(field.path, "r") as f: + self.assertEqual(f.read(), csv_content) + + profile = models.get_current_profile() + profile.delete_image_zip_on_archive = True + profile.save() + imprt = models.Import.objects.get(pk=imprt.pk) + image_filename = imprt.imported_images.path + self.assertTrue(os.path.isfile(image_filename)) + imprt.archive() + imprt = models.Import.objects.get(pk=imprt.pk) + self.assertFalse(imprt.imported_images) + self.assertFalse(os.path.isfile(image_filename)) + imprt.unarchive("F") + imprt = models.Import.objects.get(pk=imprt.pk) + self.assertEqual(imprt.state, "FE") # as an error file so state fixed + self.assertFalse(imprt.imported_images) + def test_delete_related(self): town = models.Town.objects.create(name='my-test') self.assertEqual(models.Town.objects.filter(name='my-test').count(), 1) -- cgit v1.2.3