diff options
Diffstat (limited to 'ishtar_common')
| -rw-r--r-- | ishtar_common/migrations/0230_auto_20230920_1417.py (renamed from ishtar_common/migrations/0230_auto_20230918_1655.py) | 3 | ||||
| -rw-r--r-- | ishtar_common/migrations/0231_default_mandatory_keys.py | 2 | ||||
| -rw-r--r-- | ishtar_common/models_imports.py | 167 | ||||
| -rw-r--r-- | ishtar_common/templates/ishtar/import_table.html | 2 | ||||
| -rw-r--r-- | ishtar_common/tests.py | 39 | ||||
| -rw-r--r-- | ishtar_common/views.py | 15 | 
6 files changed, 170 insertions, 58 deletions
diff --git a/ishtar_common/migrations/0230_auto_20230918_1655.py b/ishtar_common/migrations/0230_auto_20230920_1417.py index 76fba744c..e7a22d119 100644 --- a/ishtar_common/migrations/0230_auto_20230918_1655.py +++ b/ishtar_common/migrations/0230_auto_20230920_1417.py @@ -1,4 +1,4 @@ -# Generated by Django 2.2.24 on 2023-09-18 16:55 +# Generated by Django 2.2.24 on 2023-09-20 14:17  import django.core.validators  from django.db import migrations, models @@ -92,6 +92,7 @@ class Migration(migrations.Migration):                  ('name', models.CharField(max_length=500, null=True, verbose_name='Name')),                  ('imported_file', models.FileField(blank=True, help_text='La taille maximale supportée pour le fichier est de 100 Mo.', max_length=220, null=True, upload_to='upload/imports/%Y/%m/', verbose_name='Imported file')),                  ('imported_images', models.FileField(blank=True, help_text='La taille maximale supportée pour le fichier est de 100 Mo.', max_length=220, null=True, upload_to='upload/imports/%Y/%m/', verbose_name='Associated documents (zip file)')), +                ('archive_file', models.FileField(blank=True, help_text='La taille maximale supportée pour le fichier est de 100 Mo.', max_length=255, null=True, upload_to='upload/imports/%Y/%m/', verbose_name='Archive file')),                  ('encoding', models.CharField(choices=[('windows-1252', 'windows-1252'), ('ISO-8859-15', 'ISO-8859-15'), ('utf-8', 'utf-8')], default='utf-8', help_text='Only required for CSV file', max_length=15, verbose_name='Encoding')),                  ('csv_sep', models.CharField(choices=[(',', ','), (';', ';'), ('|', '|')], default=',', help_text='Separator for CSV file. Standard is comma but Microsoft Excel do not follow this standard and use semi-colon.', max_length=1, verbose_name='CSV separator')),                  ('skip_lines', models.IntegerField(default=1, help_text='Number of header lines in your file (can be 0 and should be 0 for geopackage or Shapefile).', verbose_name='Skip lines')), diff --git a/ishtar_common/migrations/0231_default_mandatory_keys.py b/ishtar_common/migrations/0231_default_mandatory_keys.py index 4c5e2ea35..f93891e56 100644 --- a/ishtar_common/migrations/0231_default_mandatory_keys.py +++ b/ishtar_common/migrations/0231_default_mandatory_keys.py @@ -30,7 +30,7 @@ def migrate(apps, __):  class Migration(migrations.Migration):      dependencies = [ -        ('ishtar_common', '0230_auto_20230918_1655'), +        ('ishtar_common', '0230_auto_20230920_1417'),      ]      operations = [ diff --git a/ishtar_common/models_imports.py b/ishtar_common/models_imports.py index 9e505392d..b03b42e1a 100644 --- a/ishtar_common/models_imports.py +++ b/ishtar_common/models_imports.py @@ -1359,6 +1359,14 @@ class BaseImport(models.Model):          max_length=220,          help_text=max_size_help(),      ) +    archive_file = models.FileField( +        _("Archive file"), +        upload_to="upload/imports/%Y/%m/", +        blank=True, +        null=True, +        max_length=255, +        help_text=max_size_help(), +    )      encoding = models.CharField(          _("Encoding"), choices=ENCODINGS, default="utf-8", max_length=15,          help_text=_("Only required for CSV file"), @@ -1384,6 +1392,7 @@ class BaseImport(models.Model):      end_date = models.DateTimeField(          _("End date"), auto_now_add=True, blank=True, null=True, editable=False      ) +    state = None      class Meta:          abstract = True @@ -1396,6 +1405,31 @@ class BaseImport(models.Model):      def pre_import_form_is_valid(self) -> bool:          raise NotImplemented() +    def _archive(self): +        raise NotImplemented() + +    def _unarchive(self): +        raise NotImplemented() + +    def archive(self): +        self.state = "AC" +        self.end_date = datetime.datetime.now() +        self._archive() + +    def unarchive(self, state): +        if not self._unarchive(): +            self.state = state +            self.save()  # only save if no save previously + +    def save(self, *args, **kwargs): +        super().save(*args, **kwargs) +        if ( +                self.state == "AC" +                and not getattr(self, "_archive_pending", False) +                and not self.archive_file +        ): +            self._archive() +  class ImportGroup(BaseImport):      importer_type = models.ForeignKey(ImporterGroup, on_delete=models.CASCADE, @@ -1508,6 +1542,112 @@ class ImportGroup(BaseImport):          self.end_date = datetime.datetime.now()          self.save() +    def _unarchive(self): +        if not self.archive_file: +            return +        with tempfile.TemporaryDirectory() as tmp_dir_name: +            # extract the current archive +            current_zip = zipfile.ZipFile(self.archive_file.path, "r") +            name_list = current_zip.namelist() +            if "content.json" not in name_list: +                return +            for name in name_list: +                current_zip.extract(name, tmp_dir_name) +            current_zip.close() +            content_name = os.path.join(tmp_dir_name, "content.json") +            try: +                with open(content_name, "r") as content: +                    files = json.loads(content.read()) +            except (IOError, json.JSONDecodeError): +                return +            today = datetime.date.today() +            for attr in files: +                filename = files[attr] +                full_filename = os.path.join(tmp_dir_name, filename) +                with open(full_filename, "rb") as raw_file: +                    getattr(self, attr).save( +                        "upload/imports/{}/{:02d}/{}".format( +                            today.year, today.month, filename +                        ), +                        File(raw_file), +                    ) + +        os.remove(self.archive_file.path) +        setattr(self, "archive_file", None) +        self.state = "FE" if self.error_file else "F" +        self.save() +        return True + +    def _archive(self): +        file_attr = ["imported_file"] +        sub_import_file_attr = ["error_file", "result_file", "match_file"] +        files = [ +            (k, getattr(self, k).path, getattr(self, k).name.split(os.sep)[-1]) +            for k in file_attr +            if getattr(self, k) +        ] +        import_list = self.import_list() +        for idx, sub_import in enumerate(import_list): +            files += [ +                (f"sub-{idx}-{k}", getattr(sub_import, k).path, +                 getattr(sub_import, k).name.split(os.sep)[-1]) +                for k in sub_import_file_attr +                if getattr(sub_import, k) +            ] +        with tempfile.TemporaryDirectory("-ishtar") as tmpdir: +            base_name = "{}.zip".format(slugify(self.name)) +            archive_name = os.path.join(tmpdir, base_name) +            with zipfile.ZipFile(archive_name, "w") as current_zip: +                zip_content = {} +                for k, path, name in files: +                    try: +                        current_zip.write(path, arcname=name) +                        zip_content[k] = name +                    except OSError: +                        pass +                content_name = os.path.join(tmpdir, "content.json") +                with open(content_name, "w") as content: +                    content.write(json.dumps(zip_content)) +                current_zip.write(content_name, arcname="content.json") + +            today = datetime.date.today() +            with open( +                    archive_name, +                    "rb", +            ) as raw_file: +                self.archive_file.save( +                    "upload/imports/{}/{:02d}/{}".format( +                        today.year, today.month, base_name +                    ), +                    File(raw_file), +                ) +        IshtarSiteProfile = apps.get_model("ishtar_common", "IshtarSiteProfile") +        profile = IshtarSiteProfile.get_current_profile() +        if profile.delete_image_zip_on_archive: +            sub_import_file_attr.append("imported_images") +        for attr in file_attr: +            file_field = getattr(self, attr) +            if file_field: +                try: +                    os.remove(file_field.path) +                except FileNotFoundError: +                    pass +                setattr(self, attr, None) +        sub_import_file_attr.append("imported_file") +        if profile.delete_image_zip_on_archive: +            sub_import_file_attr.append("imported_images") +        for sub_import in import_list: +            for attr in sub_import_file_attr: +                file_field = getattr(sub_import, attr) +                if file_field: +                    try: +                        os.remove(file_field.path) +                    except FileNotFoundError: +                        pass +                    setattr(sub_import, attr, None) +        self.save() +        self._archive_pending = False +      def get_all_imported(self):          imported = []          for imp in self.imports.all(): @@ -1594,14 +1734,6 @@ class Import(BaseImport):          max_length=255,          help_text=max_size_help(),      ) -    archive_file = models.FileField( -        _("Archive file"), -        upload_to="upload/imports/%Y/%m/", -        blank=True, -        null=True, -        max_length=255, -        help_text=max_size_help(), -    )      state = models.CharField(          _("State"), max_length=2, choices=IMPORT_STATE, default="C"      ) @@ -2313,16 +2445,6 @@ class Import(BaseImport):          self.save()          self._archive_pending = False -    def archive(self): -        self.state = "AC" -        self.end_date = datetime.datetime.now() -        self._archive() - -    def unarchive(self, state): -        if not self._unarchive(): -            self.state = state -            self.save()  # only save if no save previously -      def get_all_imported(self):          imported = []          for related, zorg in get_all_related_m2m_objects_with_model(self): @@ -2330,15 +2452,6 @@ class Import(BaseImport):              imported += [(accessor, obj) for obj in getattr(self, accessor).all()]          return imported -    def save(self, *args, **kwargs): -        super(Import, self).save(*args, **kwargs) -        if ( -            self.state == "AC" -            and not getattr(self, "_archive_pending", False) -            and not self.archive_file -        ): -            self._archive() -  def pre_delete_import(sender, **kwargs):      # deleted imported items when an import is delete diff --git a/ishtar_common/templates/ishtar/import_table.html b/ishtar_common/templates/ishtar/import_table.html index ba52aa8b9..0b2aa46d7 100644 --- a/ishtar_common/templates/ishtar/import_table.html +++ b/ishtar_common/templates/ishtar/import_table.html @@ -122,7 +122,7 @@              </td>          </tr>          {% endif %} -        {% if not import.importer_type.type_label %} {# group #} +        {% if not import.importer_type.type_label and not ARCHIVE_PAGE  %} {# group #}          {% for sub in import.import_list %}          <tr id="import-{{sub.import_id}}">              <td></td> diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py index 952002e89..19d09a205 100644 --- a/ishtar_common/tests.py +++ b/ishtar_common/tests.py @@ -2524,6 +2524,7 @@ class BaseImportTest(TestCase):      def get_group_import(self):          root = os.path.join(settings.LIB_BASE_PATH, "archaeological_finds", "tests") +        self.root = root          importer_filename = os.path.join(root, "importer-group.zip")          restore_serialized(importer_filename)          imp_group = models.ImporterGroup.objects.get(slug="chantier-des-depots") @@ -2721,7 +2722,7 @@ class ImportTest(BaseImportTest):          group_import.unarchive("FE")          group_import = models.ImportGroup.objects.get(pk=imprt.pk) -        for imprt in group_import.imports.all(): +        for imprt in group_import.import_list():              self.assertEqual(imprt.state, "FE")              for k in ("error_file", "result_file", "match_file", "imported_images"):                  field = getattr(imprt, k) @@ -2740,13 +2741,14 @@ class ImportTest(BaseImportTest):          profile.delete_image_zip_on_archive = False          profile.save() -        csv_content = "...." +        imp_file = os.path.join(self.root, "importer-group.csv") +        with open(imp_file, "r") as f: +            csv_content = f.read() +          group_import.archive() -        group_import = models.Import.objects.get(pk=group_import.pk) +        group_import = models.ImportGroup.objects.get(pk=group_import.pk)          self.assertEqual(group_import.state, "AC") -        self.assertFalse(group_import.error_file) -        self.assertFalse(group_import.result_file) -        self.assertFalse(group_import.match_file) +        self.assertFalse(group_import.imported_file)          self.assertTrue(group_import.imported_images)          self.assertTrue(group_import.archive_file)          self.assertTrue(zipfile.is_zipfile(group_import.archive_file)) @@ -2760,25 +2762,22 @@ class ImportTest(BaseImportTest):                  files = json.loads(content.read())              self.assertIn("imported_file", files.keys())              self.assertIn(files["imported_file"], name_list) -            self.assertIn("error_file", files.keys()) -            self.assertIn(files["error_file"], name_list) -            self.assertIn("result_file", files.keys()) -            self.assertIn(files["result_file"], name_list) -            self.assertIn("match_file", files.keys()) -            self.assertIn(files["match_file"], name_list) -            rev_dict = {v: k for k, v in files.items()} +            for idx in range(4): +                self.assertIn(f"sub-{idx}-result_file", files.keys()) +                self.assertIn(files[f"sub-{idx}-result_file"], name_list) +            for idx in range(1, 3): +                self.assertIn(f"sub-{idx}-match_file", files.keys()) +                self.assertIn(files[f"sub-{idx}-match_file"], name_list)              for name in name_list:                  current_zip.extract(name, tmpdir) -                if name.endswith(".txt"): +                if name == files["imported_file"]:                      with open(os.path.join(tmpdir, name), "r") as f: -                        self.assertEqual(f.read(), "test" + rev_dict[name]) -                elif name.endswith(".csv"):  # imported file -                    with open(os.path.join(tmpdir, name), "r") as f: -                        self.assertEqual(f.read(), csv_content) +                        result = f.read() +                        self.assertEqual(result, csv_content) -        group_import.unarchive("FE") +        group_import.unarchive("F")          group_import = models.Import.objects.get(pk=group_import.pk) -        self.assertEqual(group_import.state, "FE") +        self.assertEqual(group_import.state, "F")          for k in ("error_file", "result_file", "match_file", "imported_images"):              field = getattr(group_import, k)              self.assertTrue(field, "{} is missing in unarchive".format(k)) diff --git a/ishtar_common/views.py b/ishtar_common/views.py index 901ebc9b8..4a76207f6 100644 --- a/ishtar_common/views.py +++ b/ishtar_common/views.py @@ -1549,9 +1549,12 @@ class ImportListView(IshtarMixin, LoginRequiredMixin, ListView):      page_name = _("Current imports")      current_url = "current_imports" +    def _queryset_filter(self, query): +        return query.exclude(state="AC") +      def get_queryset(self): -        q1 = self.model.objects.exclude(state="AC") -        q2 = models.ImportGroup.objects.exclude(state="AC") +        q1 = self._queryset_filter(self.model.objects) +        q2 = self._queryset_filter(models.ImportGroup.objects)          if not self.request.user.is_superuser:              user = models.IshtarUser.objects.get(pk=self.request.user.pk)              q1 = q1.filter(user=user) @@ -2044,12 +2047,8 @@ class ImportOldListView(ImportListView):      page_name = _("Old imports")      current_url = "old_imports" -    def get_queryset(self): -        q = self.model.objects.filter(state="AC") -        if self.request.user.is_superuser: -            return q.order_by("-creation_date") -        user = models.IshtarUser.objects.get(pk=self.request.user.pk) -        return q.filter(user=user).order_by("-creation_date") +    def _queryset_filter(self, query): +        return query.filter(state="AC")      def get_context_data(self, **kwargs):          data = super().get_context_data(**kwargs)  | 
