summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ishtar_common/migrations/0230_auto_20230920_1417.py (renamed from ishtar_common/migrations/0230_auto_20230918_1655.py)3
-rw-r--r--ishtar_common/migrations/0231_default_mandatory_keys.py2
-rw-r--r--ishtar_common/models_imports.py167
-rw-r--r--ishtar_common/templates/ishtar/import_table.html2
-rw-r--r--ishtar_common/tests.py39
-rw-r--r--ishtar_common/views.py15
6 files changed, 170 insertions, 58 deletions
diff --git a/ishtar_common/migrations/0230_auto_20230918_1655.py b/ishtar_common/migrations/0230_auto_20230920_1417.py
index 76fba744c..e7a22d119 100644
--- a/ishtar_common/migrations/0230_auto_20230918_1655.py
+++ b/ishtar_common/migrations/0230_auto_20230920_1417.py
@@ -1,4 +1,4 @@
-# Generated by Django 2.2.24 on 2023-09-18 16:55
+# Generated by Django 2.2.24 on 2023-09-20 14:17
import django.core.validators
from django.db import migrations, models
@@ -92,6 +92,7 @@ class Migration(migrations.Migration):
('name', models.CharField(max_length=500, null=True, verbose_name='Name')),
('imported_file', models.FileField(blank=True, help_text='La taille maximale supportée pour le fichier est de 100 Mo.', max_length=220, null=True, upload_to='upload/imports/%Y/%m/', verbose_name='Imported file')),
('imported_images', models.FileField(blank=True, help_text='La taille maximale supportée pour le fichier est de 100 Mo.', max_length=220, null=True, upload_to='upload/imports/%Y/%m/', verbose_name='Associated documents (zip file)')),
+ ('archive_file', models.FileField(blank=True, help_text='La taille maximale supportée pour le fichier est de 100 Mo.', max_length=255, null=True, upload_to='upload/imports/%Y/%m/', verbose_name='Archive file')),
('encoding', models.CharField(choices=[('windows-1252', 'windows-1252'), ('ISO-8859-15', 'ISO-8859-15'), ('utf-8', 'utf-8')], default='utf-8', help_text='Only required for CSV file', max_length=15, verbose_name='Encoding')),
('csv_sep', models.CharField(choices=[(',', ','), (';', ';'), ('|', '|')], default=',', help_text='Separator for CSV file. Standard is comma but Microsoft Excel do not follow this standard and use semi-colon.', max_length=1, verbose_name='CSV separator')),
('skip_lines', models.IntegerField(default=1, help_text='Number of header lines in your file (can be 0 and should be 0 for geopackage or Shapefile).', verbose_name='Skip lines')),
diff --git a/ishtar_common/migrations/0231_default_mandatory_keys.py b/ishtar_common/migrations/0231_default_mandatory_keys.py
index 4c5e2ea35..f93891e56 100644
--- a/ishtar_common/migrations/0231_default_mandatory_keys.py
+++ b/ishtar_common/migrations/0231_default_mandatory_keys.py
@@ -30,7 +30,7 @@ def migrate(apps, __):
class Migration(migrations.Migration):
dependencies = [
- ('ishtar_common', '0230_auto_20230918_1655'),
+ ('ishtar_common', '0230_auto_20230920_1417'),
]
operations = [
diff --git a/ishtar_common/models_imports.py b/ishtar_common/models_imports.py
index 9e505392d..b03b42e1a 100644
--- a/ishtar_common/models_imports.py
+++ b/ishtar_common/models_imports.py
@@ -1359,6 +1359,14 @@ class BaseImport(models.Model):
max_length=220,
help_text=max_size_help(),
)
+ archive_file = models.FileField(
+ _("Archive file"),
+ upload_to="upload/imports/%Y/%m/",
+ blank=True,
+ null=True,
+ max_length=255,
+ help_text=max_size_help(),
+ )
encoding = models.CharField(
_("Encoding"), choices=ENCODINGS, default="utf-8", max_length=15,
help_text=_("Only required for CSV file"),
@@ -1384,6 +1392,7 @@ class BaseImport(models.Model):
end_date = models.DateTimeField(
_("End date"), auto_now_add=True, blank=True, null=True, editable=False
)
+ state = None
class Meta:
abstract = True
@@ -1396,6 +1405,31 @@ class BaseImport(models.Model):
def pre_import_form_is_valid(self) -> bool:
raise NotImplemented()
+ def _archive(self):
+ raise NotImplemented()
+
+ def _unarchive(self):
+ raise NotImplemented()
+
+ def archive(self):
+ self.state = "AC"
+ self.end_date = datetime.datetime.now()
+ self._archive()
+
+ def unarchive(self, state):
+ if not self._unarchive():
+ self.state = state
+ self.save() # only save if no save previously
+
+ def save(self, *args, **kwargs):
+ super().save(*args, **kwargs)
+ if (
+ self.state == "AC"
+ and not getattr(self, "_archive_pending", False)
+ and not self.archive_file
+ ):
+ self._archive()
+
class ImportGroup(BaseImport):
importer_type = models.ForeignKey(ImporterGroup, on_delete=models.CASCADE,
@@ -1508,6 +1542,112 @@ class ImportGroup(BaseImport):
self.end_date = datetime.datetime.now()
self.save()
+ def _unarchive(self):
+ if not self.archive_file:
+ return
+ with tempfile.TemporaryDirectory() as tmp_dir_name:
+ # extract the current archive
+ current_zip = zipfile.ZipFile(self.archive_file.path, "r")
+ name_list = current_zip.namelist()
+ if "content.json" not in name_list:
+ return
+ for name in name_list:
+ current_zip.extract(name, tmp_dir_name)
+ current_zip.close()
+ content_name = os.path.join(tmp_dir_name, "content.json")
+ try:
+ with open(content_name, "r") as content:
+ files = json.loads(content.read())
+ except (IOError, json.JSONDecodeError):
+ return
+ today = datetime.date.today()
+ for attr in files:
+ filename = files[attr]
+ full_filename = os.path.join(tmp_dir_name, filename)
+ with open(full_filename, "rb") as raw_file:
+ getattr(self, attr).save(
+ "upload/imports/{}/{:02d}/{}".format(
+ today.year, today.month, filename
+ ),
+ File(raw_file),
+ )
+
+ os.remove(self.archive_file.path)
+ setattr(self, "archive_file", None)
+ self.state = "FE" if self.error_file else "F"
+ self.save()
+ return True
+
+ def _archive(self):
+ file_attr = ["imported_file"]
+ sub_import_file_attr = ["error_file", "result_file", "match_file"]
+ files = [
+ (k, getattr(self, k).path, getattr(self, k).name.split(os.sep)[-1])
+ for k in file_attr
+ if getattr(self, k)
+ ]
+ import_list = self.import_list()
+ for idx, sub_import in enumerate(import_list):
+ files += [
+ (f"sub-{idx}-{k}", getattr(sub_import, k).path,
+ getattr(sub_import, k).name.split(os.sep)[-1])
+ for k in sub_import_file_attr
+ if getattr(sub_import, k)
+ ]
+ with tempfile.TemporaryDirectory("-ishtar") as tmpdir:
+ base_name = "{}.zip".format(slugify(self.name))
+ archive_name = os.path.join(tmpdir, base_name)
+ with zipfile.ZipFile(archive_name, "w") as current_zip:
+ zip_content = {}
+ for k, path, name in files:
+ try:
+ current_zip.write(path, arcname=name)
+ zip_content[k] = name
+ except OSError:
+ pass
+ content_name = os.path.join(tmpdir, "content.json")
+ with open(content_name, "w") as content:
+ content.write(json.dumps(zip_content))
+ current_zip.write(content_name, arcname="content.json")
+
+ today = datetime.date.today()
+ with open(
+ archive_name,
+ "rb",
+ ) as raw_file:
+ self.archive_file.save(
+ "upload/imports/{}/{:02d}/{}".format(
+ today.year, today.month, base_name
+ ),
+ File(raw_file),
+ )
+ IshtarSiteProfile = apps.get_model("ishtar_common", "IshtarSiteProfile")
+ profile = IshtarSiteProfile.get_current_profile()
+ if profile.delete_image_zip_on_archive:
+ sub_import_file_attr.append("imported_images")
+ for attr in file_attr:
+ file_field = getattr(self, attr)
+ if file_field:
+ try:
+ os.remove(file_field.path)
+ except FileNotFoundError:
+ pass
+ setattr(self, attr, None)
+ sub_import_file_attr.append("imported_file")
+ if profile.delete_image_zip_on_archive:
+ sub_import_file_attr.append("imported_images")
+ for sub_import in import_list:
+ for attr in sub_import_file_attr:
+ file_field = getattr(sub_import, attr)
+ if file_field:
+ try:
+ os.remove(file_field.path)
+ except FileNotFoundError:
+ pass
+ setattr(sub_import, attr, None)
+ self.save()
+ self._archive_pending = False
+
def get_all_imported(self):
imported = []
for imp in self.imports.all():
@@ -1594,14 +1734,6 @@ class Import(BaseImport):
max_length=255,
help_text=max_size_help(),
)
- archive_file = models.FileField(
- _("Archive file"),
- upload_to="upload/imports/%Y/%m/",
- blank=True,
- null=True,
- max_length=255,
- help_text=max_size_help(),
- )
state = models.CharField(
_("State"), max_length=2, choices=IMPORT_STATE, default="C"
)
@@ -2313,16 +2445,6 @@ class Import(BaseImport):
self.save()
self._archive_pending = False
- def archive(self):
- self.state = "AC"
- self.end_date = datetime.datetime.now()
- self._archive()
-
- def unarchive(self, state):
- if not self._unarchive():
- self.state = state
- self.save() # only save if no save previously
-
def get_all_imported(self):
imported = []
for related, zorg in get_all_related_m2m_objects_with_model(self):
@@ -2330,15 +2452,6 @@ class Import(BaseImport):
imported += [(accessor, obj) for obj in getattr(self, accessor).all()]
return imported
- def save(self, *args, **kwargs):
- super(Import, self).save(*args, **kwargs)
- if (
- self.state == "AC"
- and not getattr(self, "_archive_pending", False)
- and not self.archive_file
- ):
- self._archive()
-
def pre_delete_import(sender, **kwargs):
# deleted imported items when an import is delete
diff --git a/ishtar_common/templates/ishtar/import_table.html b/ishtar_common/templates/ishtar/import_table.html
index ba52aa8b9..0b2aa46d7 100644
--- a/ishtar_common/templates/ishtar/import_table.html
+++ b/ishtar_common/templates/ishtar/import_table.html
@@ -122,7 +122,7 @@
</td>
</tr>
{% endif %}
- {% if not import.importer_type.type_label %} {# group #}
+ {% if not import.importer_type.type_label and not ARCHIVE_PAGE %} {# group #}
{% for sub in import.import_list %}
<tr id="import-{{sub.import_id}}">
<td></td>
diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py
index 952002e89..19d09a205 100644
--- a/ishtar_common/tests.py
+++ b/ishtar_common/tests.py
@@ -2524,6 +2524,7 @@ class BaseImportTest(TestCase):
def get_group_import(self):
root = os.path.join(settings.LIB_BASE_PATH, "archaeological_finds", "tests")
+ self.root = root
importer_filename = os.path.join(root, "importer-group.zip")
restore_serialized(importer_filename)
imp_group = models.ImporterGroup.objects.get(slug="chantier-des-depots")
@@ -2721,7 +2722,7 @@ class ImportTest(BaseImportTest):
group_import.unarchive("FE")
group_import = models.ImportGroup.objects.get(pk=imprt.pk)
- for imprt in group_import.imports.all():
+ for imprt in group_import.import_list():
self.assertEqual(imprt.state, "FE")
for k in ("error_file", "result_file", "match_file", "imported_images"):
field = getattr(imprt, k)
@@ -2740,13 +2741,14 @@ class ImportTest(BaseImportTest):
profile.delete_image_zip_on_archive = False
profile.save()
- csv_content = "...."
+ imp_file = os.path.join(self.root, "importer-group.csv")
+ with open(imp_file, "r") as f:
+ csv_content = f.read()
+
group_import.archive()
- group_import = models.Import.objects.get(pk=group_import.pk)
+ group_import = models.ImportGroup.objects.get(pk=group_import.pk)
self.assertEqual(group_import.state, "AC")
- self.assertFalse(group_import.error_file)
- self.assertFalse(group_import.result_file)
- self.assertFalse(group_import.match_file)
+ self.assertFalse(group_import.imported_file)
self.assertTrue(group_import.imported_images)
self.assertTrue(group_import.archive_file)
self.assertTrue(zipfile.is_zipfile(group_import.archive_file))
@@ -2760,25 +2762,22 @@ class ImportTest(BaseImportTest):
files = json.loads(content.read())
self.assertIn("imported_file", files.keys())
self.assertIn(files["imported_file"], name_list)
- self.assertIn("error_file", files.keys())
- self.assertIn(files["error_file"], name_list)
- self.assertIn("result_file", files.keys())
- self.assertIn(files["result_file"], name_list)
- self.assertIn("match_file", files.keys())
- self.assertIn(files["match_file"], name_list)
- rev_dict = {v: k for k, v in files.items()}
+ for idx in range(4):
+ self.assertIn(f"sub-{idx}-result_file", files.keys())
+ self.assertIn(files[f"sub-{idx}-result_file"], name_list)
+ for idx in range(1, 3):
+ self.assertIn(f"sub-{idx}-match_file", files.keys())
+ self.assertIn(files[f"sub-{idx}-match_file"], name_list)
for name in name_list:
current_zip.extract(name, tmpdir)
- if name.endswith(".txt"):
+ if name == files["imported_file"]:
with open(os.path.join(tmpdir, name), "r") as f:
- self.assertEqual(f.read(), "test" + rev_dict[name])
- elif name.endswith(".csv"): # imported file
- with open(os.path.join(tmpdir, name), "r") as f:
- self.assertEqual(f.read(), csv_content)
+ result = f.read()
+ self.assertEqual(result, csv_content)
- group_import.unarchive("FE")
+ group_import.unarchive("F")
group_import = models.Import.objects.get(pk=group_import.pk)
- self.assertEqual(group_import.state, "FE")
+ self.assertEqual(group_import.state, "F")
for k in ("error_file", "result_file", "match_file", "imported_images"):
field = getattr(group_import, k)
self.assertTrue(field, "{} is missing in unarchive".format(k))
diff --git a/ishtar_common/views.py b/ishtar_common/views.py
index 901ebc9b8..4a76207f6 100644
--- a/ishtar_common/views.py
+++ b/ishtar_common/views.py
@@ -1549,9 +1549,12 @@ class ImportListView(IshtarMixin, LoginRequiredMixin, ListView):
page_name = _("Current imports")
current_url = "current_imports"
+ def _queryset_filter(self, query):
+ return query.exclude(state="AC")
+
def get_queryset(self):
- q1 = self.model.objects.exclude(state="AC")
- q2 = models.ImportGroup.objects.exclude(state="AC")
+ q1 = self._queryset_filter(self.model.objects)
+ q2 = self._queryset_filter(models.ImportGroup.objects)
if not self.request.user.is_superuser:
user = models.IshtarUser.objects.get(pk=self.request.user.pk)
q1 = q1.filter(user=user)
@@ -2044,12 +2047,8 @@ class ImportOldListView(ImportListView):
page_name = _("Old imports")
current_url = "old_imports"
- def get_queryset(self):
- q = self.model.objects.filter(state="AC")
- if self.request.user.is_superuser:
- return q.order_by("-creation_date")
- user = models.IshtarUser.objects.get(pk=self.request.user.pk)
- return q.filter(user=user).order_by("-creation_date")
+ def _queryset_filter(self, query):
+ return query.filter(state="AC")
def get_context_data(self, **kwargs):
data = super().get_context_data(**kwargs)