#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2012 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . # See the file COPYING for details. """ Utilitaries """ import urllib2, re import unicodedata import zipfile import StringIO from external_utils import OsmApi from lxml import etree from chimere import get_version from django.conf import settings from django.core.exceptions import ObjectDoesNotExist from django.utils.translation import ugettext_lazy as _ def unicode_normalize(string): return ''.join( (c for c in unicodedata.normalize('NFD', string) if unicodedata.category(c) != 'Mn')) def get_files_inside_zip(zippedfile, suffixes): try: flz = zipfile.ZipFile(zippedfile) except zipfile.BadZipfile: return [], _(u"Bad zip file") namelist = flz.namelist() filenames = [] for suffix in suffixes: current_file_name = None for name in namelist: if name.endswith(suffix): current_file_name = name filenames.append(current_file_name) files = [] for filename in filenames: if filename: files.append(flz.open(filename)) else: files.append(None) return files class ImportManager: u""" Generic class for specific importers """ def __init__(self, importer_instance): self.importer_instance = importer_instance def get(self): pass def put(self): pass class KMLManager(ImportManager): u""" KML importer The filtr argument has to be defined as the exact name of the folder to be imported """ XPATH = '//kml:Folder/kml:name[text()="%s"]/../kml:Placemark' DEFAULT_XPATH = '//kml:Placemark' def __init__(self, importer_instance, ns=''): self.importer_instance = importer_instance self.ns = ns def get(self, source=None): u""" Get data from the source Args: - source (None): input file if not provided get it from the distant source provided in the importer instance. Return a tuple with: - number of new item ; - number of item updated ; - error detail on error """ from models import Marker new_item, updated_item, msg = 0, 0, '' if not source: try: remotehandle = urllib2.urlopen(self.importer_instance.source) source = StringIO.StringIO(remotehandle.read()) remotehandle.close() except ValueError: # assume it is a local file try: source = open(self.importer_instance.source) except IOError, msg: return (new_item, updated_item, msg) except urllib2.URLError as error: return (new_item, updated_item, error.message) if self.importer_instance.zipped: try: files = get_files_inside_zip(source, ['.kml']) except zipfile.BadZipfile: return (new_item, updated_item, _(u"Bad zip file")) if not files or not files[0]: return (new_item, updated_item, _(u"No KML file inside the zip file")) source = files[0] tree = etree.parse(source) # try to get default namespace if not self.ns: self.ns = tree.getroot().nsmap[None] xpath = self.XPATH % self.importer_instance.filtr \ if self.importer_instance.filtr else self.DEFAULT_XPATH for placemark in tree.xpath(xpath, namespaces={'kml':self.ns}): name, point, linestring = None, None, None pl_id = placemark.attrib.get('id') pl_key = 'kml-%d' % self.importer_instance.pk ns = '{%s}' % self.ns for item in placemark: if item.tag == ns + 'name': name = item.text elif item.tag == ns + 'description': description = item.text elif item.tag == ns + 'Point': for coord in item: if coord.tag == ns + 'coordinates': x, y, z = coord.text.split(',') point = 'SRID=4326;POINT(%s %s)' % (x, y) if point: dct = {'point':point, 'description':description, 'name':name,} m = None if pl_id: dct_import = { 'import_key__icontains':'%s:%s;' % (pl_key, pl_id), 'import_source':self.importer_instance.source} try: m = Marker.objects.get(**dct_import) for k in dct: setattr(m, k, dct[k]) m.save() updated_item += 1 except ObjectDoesNotExist: m = None dct.update({ 'import_source':self.importer_instance.source}) if not m: dct['status'] = 'I' m = Marker.objects.create(**dct) new_item += 1 if pl_id: m.set_key(pl_key, pl_id) m.categories.clear() for cat in self.importer_instance.categories.all(): m.categories.add(cat) return (new_item, updated_item, msg) RE_NODE = re.compile('node\[([^\]]*)\]') # manage deleted item from OSM class OSMManager(ImportManager): u""" OSM importer/exporter The source url is a path to an OSM file or a XAPI url The filtr argument is XAPI args or empty if it is an OSM file. """ def get(self, source=None): u""" Get data from the source Args: - source (None): input file if not provided get it from the distant source provided in the importer instance. Return a tuple with: - new items; - updated items; - error detail on error. """ from models import Marker new_item, updated_item = 0 , 0 items = [] msg = '' try: source = urllib2.urlopen(settings.CHIMERE_XAPI_URL+ self.importer_instance.filtr) except ValueError: # assume it is a local file try: source = open(self.importer_instance.source) except IOError, msg: return (new_item, updated_item, msg) except urllib2.URLError as error: return (new_item, updated_item, error.message) tree = etree.parse(source) for node in tree.xpath('//node'): name, point, linestring = None, None, None node_id = node.attrib.get('id') version = node.attrib.get('version') for item in node: k = item.attrib.get('k') if k == 'name': name = item.attrib.get('v') point = 'SRID=4326;POINT(%s %s)' % (node.get('lon'), node.get('lat')) if point: dct = {'point':point, 'name':name, 'import_version':version} m = None if node_id: dct_import = { 'import_key__icontains':'OSM:%s;' % (node_id), 'import_source':self.importer_instance.source} try: m = Marker.objects.get(**dct_import) items.append(m) if version and m.import_version == int(version): # no update since the last import continue for k in dct: setattr(m, k, dct[k]) m.save() updated_item += 1 except ObjectDoesNotExist: m = None dct.update({ 'import_source':self.importer_instance.source}) if not m: dct['status'] = 'I' m = Marker.objects.create(**dct) new_item += 1 items.append(m) if node_id: m.set_key('OSM', node_id) m.categories.clear() for cat in self.importer_instance.categories.all(): m.categories.add(cat) return (new_item, updated_item, msg) def put(self): # first of all: reimport in order to verify that no changes has been # made since the last import from models import Marker new_item, updated_item, msg = self.get() # check if import is possible if msg: return 0, msg if new_item: return 0, _(u"New items imported - validate them before exporting") if Marker.objects.filter(status='I').count(): return 0, _(u"There are items from a former import not yet " u"validated - validate them before exporting") # start import api = OsmApi.OsmApi(api=settings.CHIMERE_OSM_API_URL, username=settings.CHIMERE_OSM_USER, password=settings.CHIMERE_OSM_PASSWORD) api.ChangesetCreate({u"comment": u"Import from Chimère %s" % \ get_version()}) tag = RE_NODE.finddall(self.importer_instance.filtr) if not tag: return 0, _(u"Bad param") tag = tag[0].split('=') default_dct = {'tag':{tag[0]:tag[1]}, 'import_source':self.importer_instance.source} for idx, item in Marker.objects.filter(status='A', categories=self.importer_instance.categories.all()): dct = default_dct.update({ 'name':item.name, 'lon':item.point.lon, 'lat':item.point.lat}) node = None import_key = marker.get_key('OSM') if not import_key: node = OsmApi.NodeCreate(dct) item.set_key('OSM', node['id']) else: dct['id'] = import_key node = OsmApi.NodeUpdate(dct) item.import_version = node['version'] item.save() api.ChangesetClose() return idx+1, None