#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2013-2014 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import re, copy import unicodecsv from django.conf import settings from django.template.defaultfilters import slugify from ishtar_common.data_importer import * from ishtar_common.models import Town, Person, OrganizationType from ishtar_common.unicode_csv import unicode_csv_reader from archaeological_files import models RE_FILTER_CEDEX = re.compile("(.*) *(?: *CEDEX|cedex|Cedex|Cédex|cédex *\d*)") class TownFormater(Formater): def __init__(self, town_full_dct={}, town_dct={}): self._town_full_dct = town_full_dct self._town_dct = town_dct self._initialized = False if not self._town_full_dct else True def town_dct_init(self): for town in Town.objects.all(): key = (slugify(town.name.strip()), town.numero_insee[:2]) if key in self._town_full_dct: print("Danger! %s is ambiguous with another town on the same "\ "department."% town.name) continue self._town_full_dct[key] = town key = slugify(town.name.strip()) if key in self._town_dct: print("Warning %s is ambiguous with no department provided" %\ town.name) continue self._town_dct[key] = town self._initialized = True def format(self, value, extra=None): if not self._initialized: self.town_dct_init() m = RE_FILTER_CEDEX.match(value) if m: value = m.groups()[0] if not value: return None if extra: key = (slugify(value), extra) if key in self._town_full_dct: return self._town_full_dct[key] key = slugify(value) if key in self._town_dct: return self._town_dct[key] #RE_ADD_CD_POSTAL_TOWN = re.compile("(.*)[, ](\d{5}) (.*?) *(?: "\ # "*CEDEX|cedex|Cedex *\d*)*") RE_ADD_CD_POSTAL_TOWN = re.compile("(.*)?[, ]+(\d{5})[, ]+(.+)") RE_ADD_CD_POSTAL_TOWN = re.compile("(.*)?[, ]*(\d{2} *\d{3})[, ]*(.+)") RE_CD_POSTAL_FILTER = re.compile("(\d*) (\d*)") class FileImporterSraPdL(Importer): LINE_FORMAT = [] OBJECT_CLS = models.File DEFAULTS = {('responsible_town_planning_service', 'attached_to'):{ 'organization_type':OrganizationType.objects.get( txt_idx="planning_service")}, ('general_contractor', 'attached_to'):{ 'organization_type':OrganizationType.objects.get( txt_idx="general_contractor")}, tuple():{ 'file_type': models.FileType.objects.get( txt_idx='undefined'),} } def _init_line_format(self): tf = TownFormater() tf.town_dct_init() self.line_format = [ ImportFormater('responsible_town_planning_service__name', UnicodeFormater(300), comment=u"Service instructeur - nom", required=False), ImportFormater(['address', 'postal_code', 'towns'], [UnicodeFormater(500, clean=True), UnicodeFormater(5, re_filter=RE_CD_POSTAL_FILTER), tf], regexp=RE_ADD_CD_POSTAL_TOWN, regexp_formater_args=[[0], [1], [2, 1]], required=False, comment="Dossier - adresse"), ImportFormater('general_contractor__name', UnicodeFormater(200), comment=u"Aménageur - nom", duplicate_field='general_contractor__attached_to__name', required=False), ImportFormater(['general_contractor__attached_to__address', 'general_contractor__attached_to__postal_code', 'general_contractor__attached_to__town'], [UnicodeFormater(500, clean=True), UnicodeFormater(5, re_filter=RE_CD_POSTAL_FILTER), TownFormater(town_full_dct=tf._town_full_dct, town_dct=tf._town_dct)], regexp=RE_ADD_CD_POSTAL_TOWN, regexp_formater_args=[[0], [1], [2, 1]], required=False, comment="Aménageur - adresse"), ImportFormater("general_contractor__title", StrChoiceFormater(Person.TYPE, cli=True), required=False), ] def __init__(self, *args, **kwargs): super(FileImporterSraPdL, self).__init__(*args, **kwargs) self._init_line_format() if tuple() not in self._defaults: self._defaults[tuple()] = {} self._defaults[tuple()]['history_modifier'] = self.history_modifier def test(filename): importer = FileImporterSraPdL(skip_first_line=True) with open(filename) as csv_file: encodings = [settings.ENCODING, settings.ALT_ENCODING, 'utf-8'] for encoding in encodings: try: importer.importation([line for line in unicodecsv.reader(csv_file, encoding='utf-8')]) #importer.importation(unicode_csv_reader( # [line.decode(encoding) # for line in csv_file.readlines()]) print importer.get_csv_errors() break except ImporterError, e: print(unicode(e)) if e.type == ImporterError.HEADER and encoding != encodings[-1]: csv_file.seek(0) continue except UnicodeDecodeError: if encoding != encodings[-1]: csv_file.seek(0) continue