diff options
Diffstat (limited to 'ishtar_common/models_imports.py')
| -rw-r--r-- | ishtar_common/models_imports.py | 188 | 
1 files changed, 179 insertions, 9 deletions
| diff --git a/ishtar_common/models_imports.py b/ishtar_common/models_imports.py index b931cc4b4..b8c842284 100644 --- a/ishtar_common/models_imports.py +++ b/ishtar_common/models_imports.py @@ -19,9 +19,13 @@  import csv  import datetime +import fiona +from fiona import crs as fiona_crs  import json +from osgeo import ogr  import os  import logging +from pyproj import CRS  import shutil  import re  import tempfile @@ -30,6 +34,9 @@ import zipfile  from django.apps import apps  from django.conf import settings  from django.contrib.gis.db import models +from django.contrib.gis.gdal.error import GDALException +from django.contrib.gis.geos import GEOSGeometry +from django.contrib.gis.geos.error import GEOSException  from django.core.exceptions import ValidationError  from django.core.files import File  from django.core.files.base import ContentFile @@ -75,6 +82,7 @@ from ishtar_common.data_importer import (      LowerCaseFormater,  )  from ishtar_common.utils import task +from ishtar_common.ignf_utils import IGNF  logger = logging.getLogger(__name__) @@ -124,6 +132,8 @@ IMPORT_TYPES = (      ("gis", _("GIS")),  ) +IMPORT_TYPES_DICT = dict(IMPORT_TYPES) +  class ImporterType(models.Model):      """ @@ -185,6 +195,12 @@ class ImporterType(models.Model):      def __str__(self):          return self.name +    @property +    def type_label(self): +        if self.type in IMPORT_TYPES_DICT: +            return IMPORT_TYPES_DICT[self.type] +        return "" +      def get_libreoffice_template(self):          if not UnoCalc:              return @@ -320,6 +336,8 @@ class ImporterType(models.Model):              "UNICITY_KEYS": UNICITY_KEYS,              "LINE_EXPORT_FORMAT": LINE_EXPORT_FORMAT,              "MODEL_CREATION_LIMIT": MODEL_CREATION_LIMIT, +            "TYPE": self.type, +            "MAIN_GEO": self.is_main_geometry,          }          name = str(              "".join( @@ -1068,6 +1086,45 @@ def delayed_check(import_pk):      imp.check_modified() +RE_NUMBER = r"[+-]?\d+(?:\.\d*)?" +RE_COORDS = r"(" + RE_NUMBER + r") (" + RE_NUMBER + r")" + + +def _reverse_coordinates(wkt): +    # TODO: à effacer +    return re.sub(RE_COORDS, r"\2 \1", wkt) + + +def convert_geom(feature, srid): +    geo_type = feature["type"] +    if geo_type in ("LineString", "Polygon"): +        feature["type"] = "Multi" + geo_type +        feature["coordinates"] = [feature["coordinates"]] +    feature = GEOSGeometry(json.dumps(feature)) +    has_z = feature.hasz +    feature = f"SRID={srid};{feature.wkt}" + +    IshtarSiteProfile = apps.get_model("ishtar_common", "IshtarSiteProfile") +    profile = IshtarSiteProfile.get_current_profile() +    srs = 4326 if not has_z else 4979 +    if profile.srs and profile.srs.srid: +        srs = profile.srs.srid +    if srs != srid: +        feature = GEOSGeometry(feature).transform(srs, clone=True).ewkt +    return feature + + +IMPORT_GEOMETRY = { +    "Point": "point_2d", +    "3D Point": "point_3d", +    "MultiPoint": "multi_points", +    "LineString": "multi_line", +    "MultiLineString": "multi_line", +    "Polygon": "multi_polygon", +    "MultiPolygon": "multi_polygon", +} + +  class Import(models.Model):      user = models.ForeignKey(          "IshtarUser", blank=True, null=True, on_delete=models.SET_NULL @@ -1208,6 +1265,8 @@ class Import(models.Model):          return errors      def get_number_of_lines(self): +        if self.importer_type.type == "gis": +            return          if self.number_of_line:              return self.number_of_line          if not self.imported_file or not self.imported_file.path: @@ -1291,13 +1350,11 @@ class Import(models.Model):          IshtarSiteProfile = apps.get_model("ishtar_common", "IshtarSiteProfile")          profile = IshtarSiteProfile.get_current_profile()          actions = [] -        if self.state == 'C': -            actions.append(('A', _("Analyse"))) -        if self.state in ('C', 'A'): -            actions.append(('ED', _("Edit"))) -        if self.state in ('A', 'PI'): -            actions.append(('A', _("Re-analyse"))) -            actions.append(('I', _("Launch import"))) +        if self.state == "C": +            actions.append(("A", _("Analyse"))) +        if self.state in ("A", "PI"): +            actions.append(("A", _("Re-analyse"))) +            actions.append(("I", _("Launch import")))              if profile.experimental_feature:                  if self.changed_checked:                      actions.append(("IS", _("Step by step import"))) @@ -1317,6 +1374,8 @@ class Import(models.Model):          if self.state == "AC":              state = "FE" if self.error_file else "F"              actions.append((state, _("Unarchive"))) +        if self.state in ("C", "A"): +            actions.append(("ED", _("Edit")))          actions.append(("D", _("Delete")))          return actions @@ -1337,8 +1396,7 @@ class Import(models.Model):              conservative_import=self.conservative_import,          ) -    @property -    def data_table(self): +    def _data_table_tab(self):          imported_file = self.imported_file.path          tmpdir = None          if zipfile.is_zipfile(imported_file): @@ -1373,6 +1431,118 @@ class Import(models.Model):              shutil.rmtree(tmpdir)          return [] +    def get_gis_attr(self): +        return self._data_table_gis(get_gis_attr=True) + +    def _data_table_gis(self, get_gis_attr=False): +        self.gis_attr = None +        imported_file = self.imported_file.path +        tmp_dir = None +        file_type = "gpkg" +        if zipfile.is_zipfile(imported_file): +            z = zipfile.ZipFile(imported_file) +            imported_file = None +            filenames = [] +            for name in z.namelist(): +                # get first CSV file found +                name_l = name.lower() +                if name_l.endswith(".gpkg"): +                    filenames = [name] +                    break +                if name_l.endswith(".shp"): +                    file_type = "shp" +                    filenames.append(name) +                    continue +                for end in [".cpg", ".prj", ".shx", ".dbf"]: +                    if name_l.endswith(end): +                        filenames.append(name) +                        continue +            if not filenames: +                return [] +            tmp_dir = tempfile.mkdtemp(prefix="tmp-ishtar-") +            for filename in filenames: +                if filename.lower().endswith(".shp") or filename.lower().endswith( +                    ".gpkg" +                ): +                    imported_file = z.extract(filename, tmp_dir) +                else: +                    z.extract(filename, tmp_dir) +        elif not imported_file.endswith(".gpkg"): +            raise ImporterError(_("Incorrect GIS file.")) + +        if not imported_file: +            raise ImporterError(_("Incorrect GIS file.")) + +        kwargs = {} +        if self.importer_type.layer_name: +            kwargs["layer"] = self.importer_type.layer_name +        try: +            with fiona.open(imported_file, **kwargs) as collection: +                schema = collection.schema +                geometry = schema["geometry"] +                if geometry not in IMPORT_GEOMETRY: +                    raise ImporterError(_(f'Geometry "{geometry}" not managed.')) +                self.gis_attr = IMPORT_GEOMETRY[geometry] +                if get_gis_attr: +                    if tmp_dir: +                        shutil.rmtree(tmp_dir) +                    return self.gis_attr +                properties = schema["properties"].keys() + +                crs = fiona_crs.to_string(collection.crs) +                if not crs: +                    driver_type = {"shp": "ESRI Shapefile", "gpkg": "GPKG"} +                    driver = ogr.GetDriverByName(driver_type[file_type]) +                    shape = driver.Open(imported_file) +                    layer = shape.GetLayer() +                    crs = layer.GetSpatialRef() +                    auth = crs.GetAttrValue("AUTHORITY", 0) +                    srid = crs.GetAttrValue("AUTHORITY", 1) +                    if auth == "EPSG": +                        srid = int(srid) +                    elif auth == "IGNF" and srid in IGNF: +                        srid = IGNF[srid] +                    else: +                        raise ImporterError(_("CRS not managed.")) + +                # https://pyproj4.github.io/pyproj/stable/gotchas.html#axis-order-changes-in-proj-6 +                elif crs.startswith("+init=epsg:"): +                    srid = crs[len("+init=epsg:"):] +                else: +                    srid = CRS.from_proj4(crs).to_epsg() +                data = [] +                # Warning: RuntimeWarning: Sequential read of iterator was interrupted. +                # Resetting iterator. +                # not relevant -> bug in fiona 1.8.18 (fixed in 1.8.19) +                for idx, feature in enumerate(collection): +                    try: +                        line = [ +                            convert_geom(feature["geometry"], srid) +                        ] +                        for prop in properties: +                            value = feature["properties"][prop] +                            if value is None: +                                value = "" +                            line.append(str(value)) +                        data.append(line) +                    except (TypeError, GDALException, GEOSException) as e: +                        raise ImporterError( +                            _(f"Error reading feature {idx + 1} - {e}") +                        ) +        except fiona.errors.DriverError: +            raise ImporterError(_("Incorrect GIS file.")) + +        if tmp_dir: +            shutil.rmtree(tmp_dir) +        return data + +    @property +    def data_table(self): +        if self.importer_type.type == "tab": +            return self._data_table_tab(), None +        if self.importer_type.type == "gis": +            return self._data_table_gis() +      def initialize(self, user=None, session_key=None):          self.state = "AP"          self.end_date = datetime.datetime.now() | 
