#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2013-2014 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import copy, datetime, re import unicodecsv from django.conf import settings from django.db import IntegrityError from django.template.defaultfilters import slugify from django.utils.translation import ugettext_lazy as _ from ishtar_common.data_importer import * from ishtar_common.models import Town, Person, OrganizationType from ishtar_common.unicode_csv import unicode_csv_reader from archaeological_files import models from archaeological_operations.models import Parcel from archaeological_operations.utils import parse_parcels RE_FILTER_CEDEX = re.compile("(.*) *(?: *CEDEX|cedex|Cedex|Cédex|cédex *\d*)") RE_PERMIT_REFERENCE = re.compile('[A-Za-z]*(.*)') class StrToBoolean(Formater): def __init__(self, choices={}, cli=False, strict=False): self.dct = copy.copy(choices) self.cli = cli self.strict= strict self.missings = set() def prepare(self, value): value = unicode(value).strip() if not self.strict: value = slugify(value) return value def check(self, values): msgstr = unicode(_(u"Choice for \"%s\" is not available. "\ u"Which one is relevant?\n")) msgstr += u"1. True\n" msgstr += u"2. False\n" msgstr += u"3. Empty\n" for value in values: value = self.prepare(value) if value in self.dct: continue if not self.cli: self.missings.add(value) continue res = None while res not in range(1, 4): sys.stdout.write(msgstr % value) res = raw_input(">>> ") try: res = int(res) except ValueError: pass if res == 1: self.dct[value] = True elif res == 2: self.dct[value] = False else: self.dct[value] = None def format(self, value): value = self.prepare(value) if value in self.dct: return self.dct[value] class ImportClosingFormater(ImportFormater): def post_process(self, obj, context, value, owner=None): value = self.formater.format(value) if not value: return open_date = obj.reception_date or obj.creation_date if not open_date: return obj.end_date = open_date + datetime.timedelta(30) obj.save() class ImportParcelFormater(ImportFormater): NEED = ['town',] PARCEL_OWNER_KEY = 'associated_file' def post_process(self, obj, context, value, owner=None): value = value.strip() base_dct = {self.PARCEL_OWNER_KEY:obj, 'history_modifier':owner} if 'parcels' in context: for key in context['parcels']: if context['parcels'][key]: base_dct[key] = context['parcels'][key] for parcel_dct in parse_parcels(value, owner=owner): parcel_dct.update(base_dct) try: Parcel.objects.get_or_create(**parcel_dct) except IntegrityError: raise ImporterError("Erreur d'import parcelle, contexte : %s" \ % unicode(parcel_dct)) class ImportYearFormater(ImportFormater): def post_process(self, obj, context, value, owner=None): value = self.formater.format(value) if not value: return obj.year = value.year obj.save() class TownFormater(Formater): def __init__(self, town_full_dct={}, town_dct={}): self._town_full_dct = town_full_dct self._town_dct = town_dct self._initialized = False if not self._town_full_dct else True def town_dct_init(self): for town in Town.objects.all(): key = (slugify(town.name.strip()), town.numero_insee[:2]) if key in self._town_full_dct: print("Danger! %s is ambiguous with another town on the same "\ "department."% town.name) continue self._town_full_dct[key] = town key = slugify(town.name.strip()) if key in self._town_dct: print("Warning %s is ambiguous with no department provided" %\ town.name) continue self._town_dct[key] = town self._initialized = True def format(self, value, extra=None): if not self._initialized: self.town_dct_init() m = RE_FILTER_CEDEX.match(value) if m: value = m.groups()[0] if not value: return None if extra: key = (slugify(value), extra) if key in self._town_full_dct: return self._town_full_dct[key] key = slugify(value) if key in self._town_dct: return self._town_dct[key] class TownINSEEFormater(Formater): def __init__(self): self._town_dct = {} def format(self, value, extra=None): value = value.strip() if not value: return None if value in self._town_dct: return self._town_dct[value] q = Town.objects.filter(insee_code=value) if not q.count(): return self._town_dct[value] = q.all()[0] return self._town_dct[value] class SurfaceFormater(Formater): def test(self): assert self.format(u"352 123") == 352123 assert self.format(u"456 789 m²") == 456789 assert self.format(u"78ha") == 780000 def format(self, value, extra=None): value = value.strip() if not value: return None factor = 1 if value.endswith(u"m2") or value.endswith(u"m²"): value = value[:-2] if value.endswith(u"ha"): value = value[:-2] factor = 10000 try: return int(value.replace(' ', '')) * factor except ValueError: raise ImporterError("Erreur import surface : %s" \ % unicode(value)) #RE_ADD_CD_POSTAL_TOWN = re.compile("(.*)[, ](\d{5}) (.*?) *(?: "\ # "*CEDEX|cedex|Cedex *\d*)*") RE_NAME_ADD_CD_POSTAL_TOWN = re.compile("(.*)?[, ]*" + NEW_LINE_BREAK \ + "(.*)?[, ]*(\d{2} *\d{3})[, ]*(.+)") RE_ADD_CD_POSTAL_TOWN = re.compile("(.*)?[, ]*(\d{2} *\d{3})[, ]*(.+)") RE_CD_POSTAL_FILTER = re.compile("(\d*) (\d*)") RE_ORGA = re.compile("([^,]*)") class FileImporterSraPdL(Importer): LINE_FORMAT = [] OBJECT_CLS = models.File DEFAULTS = {('responsible_town_planning_service', 'attached_to'):{ 'organization_type':OrganizationType.objects.get( txt_idx="planning_service")}, ('general_contractor', 'attached_to'):{ 'organization_type':OrganizationType.objects.get( txt_idx="general_contractor")}, tuple():{ 'file_type': models.FileType.objects.get( txt_idx='undefined'), }, ('in_charge',):{'attached_to':None}, # initialized in __init__ } def _init_line_format(self): tf = TownFormater() tf.town_dct_init() self.line_format = [ None, # A, 1 ImportFormater(['address', 'postal_code', ['towns', 'parcels__town']], # B, 2 [UnicodeFormater(500, clean=True), UnicodeFormater(5, re_filter=RE_CD_POSTAL_FILTER), tf], regexp=RE_ADD_CD_POSTAL_TOWN, regexp_formater_args=[[0], [1], [2, 1]], required=False, comment="Dossier - adresse"), ImportFormater('general_contractor__raw_name', # C, 3 TODO - extraire nom_prenom_titre UnicodeFormater(200), comment=u"Aménageur - nom brut", duplicate_field='general_contractor__attached_to__name', required=False), ImportFormater(['general_contractor__attached_to__address', # D, 4 'general_contractor__attached_to__postal_code', 'general_contractor__attached_to__town'], [UnicodeFormater(500, clean=True), UnicodeFormater(5, re_filter=RE_CD_POSTAL_FILTER), TownFormater(town_full_dct=tf._town_full_dct, town_dct=tf._town_dct)], regexp=RE_ADD_CD_POSTAL_TOWN, regexp_formater_args=[[0], [1], [2, 1]], required=False, comment="Aménageur - adresse"), ImportFormater("general_contractor__title", # E, 5 StrChoiceFormater(Person.TYPE, cli=True), required=False, comment="Aménageur - titre"), None, # F, 6 None, # G, 7 None, # H, 8 ImportFormater("parcels__year", # I, 9 YearNoFuturFormater(), required=False), ImportParcelFormater('', required=False, post_processing=True), # J, 10 None, # K, 11 ImportFormater([['towns', 'parcels__town']], # L, 12 tf, required=False, comment="Commune (si non définie avant)"), ImportFormater([['towns', 'parcels__town']], # M, 13 tf, required=False, comment="Commune (si non définie avant)"), ImportFormater('saisine_type', # N, 14 StrChoiceFormater(models.SaisineType.get_types(), model=models.SaisineType, cli=True), required=False, comment="Type de saisine"), None, # O, 15 ImportFormater('comment', # P, 16 UnicodeFormater(2000), comment=u"Commentaire", concat=True, required=False), None, # Q, 17 ImportFormater([ 'responsible_town_planning_service__raw_name', # R, 18 service instructeur 'responsible_town_planning_service__attached_to__address', 'responsible_town_planning_service__attached_to__postal_code', 'responsible_town_planning_service__attached_to__town',], [UnicodeFormater(300, clean=True), UnicodeFormater(300, clean=True), UnicodeFormater(5, re_filter=RE_CD_POSTAL_FILTER), TownFormater(town_full_dct=tf._town_full_dct, town_dct=tf._town_dct)], regexp=RE_NAME_ADD_CD_POSTAL_TOWN, regexp_formater_args=[[0], [1], [2], [3, 2]], comment="Aménageur - adresse", required=False), ImportFormater('comment', # S, 19 UnicodeFormater(2000), comment=u"Commentaire", concat=True, required=False), ImportYearFormater('reception_date', # T, 20 DateFormater(), comment=u"Date de création", required=False, duplicate_field='creation_date'), None, # U, 21 None, # V, 22 None, # W, 23 None, # X, 24 None, # Y, 25 None, # Z, 26 None, # AA, 27 None, # AB, 28 None, # AC, 29 None, # AD, 30 None, # AE, 31 None, # AF, 32 None, # AG, 33 None, # AH, 34 ImportFormater('creation_date', # AI, 35 DateFormater(), force_value=True, comment=u"Date de création", required=False,), None, # AJ, 36 ImportFormater('comment', # AK, 37 UnicodeFormater(2000), comment=u"Commentaire", concat=True, required=False), None, # AL, 38 None, # AM, 39 None, # AN, 40 None, # AO, 41 ImportFormater('comment', # AP, 42 UnicodeFormater(2000), comment=u"Commentaire", concat=True, required=False), None, # AQ, 43 None, # AR, 44 None, # AS, 45 None, # AT, 46 ImportFormater('comment', # AU, 47 UnicodeFormater(2000), comment=u"Commentaire", concat=True, required=False), None, # AV, 48 ImportFormater('permit_reference', # AW, 49 UnicodeFormater(300, clean=True), regexp=RE_PERMIT_REFERENCE, comment="Réf. du permis de construire", required=False), None, # AX, 50 None, # AY, 51 None, # AZ, 52 None, # BA, 53 None, # BB, 54 None, # BC, 55 None, # BD, 56 ImportFormater([['towns', 'parcels__town']], # BE, 57 TownINSEEFormater(), required=False, comment="Commune (si non définie avant)"), ImportFormater('comment', # BF, 58 UnicodeFormater(2000), comment=u"Commentaire", concat=True, required=False), None, # BG, 59 None, # BH, 60 None, # BI, 61 None, # BJ, 62 None, # BK, 63 None, # BL, 64 None, # BM, 65 None, # BN, 66 None, # BO, 67 None, # BP, 68 None, # BQ, 69 None, # BR, 70 None, # BS, 71 ImportFormater( 'responsible_town_planning_service__attached_to__name', # BT, 72 service instructeur UnicodeFormater(300, clean=True), regexp=RE_ORGA, comment="Service instructeur - nom", required=False), None, # BU, 73 ImportClosingFormater('', StrToBoolean(cli=True), post_processing=True, required=False), # BV, 74, end date ImportClosingFormater('in_charge__raw_name', # BW, 75 responsable UnicodeFormater(200), comment=u"Responsable - nom brut", required=False), ImportFormater('total_surface', # BX, 76 surface totale SurfaceFormater(), comment=u"Surface totale", required=False), ImportFormater('total_developed_surface', # BY, 77 surface totale aménagée SurfaceFormater(), comment=u"Surface totale aménagée", required=False), None, # BZ, 78 None, # CA, 79 None, # CB, 80 None, # CC, 81 None, # CD, 82 None, # CE, 83 None, # CF, 84 ImportFormater('permit_type', StrChoiceFormater(models.PermitType.get_types(), model=models.PermitType, cli=True), required=False, comment="Type de permis"), # CG, 85 None, # CH, 85 ] def __init__(self, *args, **kwargs): super(FileImporterSraPdL, self).__init__(*args, **kwargs) self.DEFAULTS[('in_charge',)]['attached_to'] = \ models.Organization.objects.get(name='SRA Pays de la Loire') self._init_line_format() if tuple() not in self._defaults: self._defaults[tuple()] = {} self._defaults[tuple()]['history_modifier'] = self.history_modifier def test(filename): importer = FileImporterSraPdL(skip_first_line=True) with open(filename) as csv_file: encodings = [settings.ENCODING, settings.ALT_ENCODING, 'utf-8'] for encoding in encodings: try: importer.importation([line for line in unicodecsv.reader(csv_file, encoding='utf-8')]) #importer.importation(unicode_csv_reader( # [line.decode(encoding) # for line in csv_file.readlines()]) print importer.get_csv_errors() break except ImporterError, e: print(unicode(e)) if e.type == ImporterError.HEADER and encoding != encodings[-1]: csv_file.seek(0) continue except UnicodeDecodeError: if encoding != encodings[-1]: csv_file.seek(0) continue