diff options
Diffstat (limited to 'archaeological_files/data_importer.py')
| -rw-r--r-- | archaeological_files/data_importer.py | 158 | 
1 files changed, 158 insertions, 0 deletions
| diff --git a/archaeological_files/data_importer.py b/archaeological_files/data_importer.py new file mode 100644 index 000000000..b5f63fb67 --- /dev/null +++ b/archaeological_files/data_importer.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright (C) 2013-2014  Étienne Loks  <etienne.loks_AT_peacefrogsDOTnet> + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. + +# See the file COPYING for details. + +import re, copy +import unicodecsv + +from django.conf import settings +from django.template.defaultfilters import slugify + +from ishtar_common.data_importer import * +from ishtar_common.models import Town, Person, OrganizationType +from ishtar_common.unicode_csv import unicode_csv_reader + +from archaeological_files import models + +RE_FILTER_CEDEX = re.compile("(.*) *(?: *CEDEX|cedex|Cedex|Cédex|cédex *\d*)") + +class TownFormater(Formater): +    def __init__(self, town_full_dct={}, town_dct={}): +        self._town_full_dct = town_full_dct +        self._town_dct = town_dct +        self._initialized = False if not self._town_full_dct else True + +    def town_dct_init(self): +        for town in Town.objects.all(): +            key = (slugify(town.name.strip()), town.numero_insee[:2]) +            if key in self._town_full_dct: +                print("Danger! %s is ambiguous with another town on the same "\ +                      "department."% town.name) +                continue +            self._town_full_dct[key] = town +            key = slugify(town.name.strip()) +            if key in self._town_dct: +                print("Warning %s is ambiguous with no department provided" %\ +                      town.name) +                continue +            self._town_dct[key] = town +            self._initialized = True + +    def format(self, value, extra=None): +        if not self._initialized: +            self.town_dct_init() +        m = RE_FILTER_CEDEX.match(value) +        if m: +            value = m.groups()[0] +        if not value: +            return None +        if extra: +            key = (slugify(value), extra) +            if key in self._town_full_dct: +                return self._town_full_dct[key] +        key = slugify(value) +        if key in self._town_dct: +            return self._town_dct[key] + +#RE_ADD_CD_POSTAL_TOWN = re.compile("(.*)[, ](\d{5}) (.*?) *(?: "\ +#                                   "*CEDEX|cedex|Cedex *\d*)*") + +RE_ADD_CD_POSTAL_TOWN = re.compile("(.*)?[, ]+(\d{5})[, ]+(.+)") + +RE_ADD_CD_POSTAL_TOWN = re.compile("(.*)?[, ]*(\d{2} *\d{3})[, ]*(.+)") + +RE_CD_POSTAL_FILTER = re.compile("(\d*) (\d*)") + +class FileImporterSraPdL(Importer): +    LINE_FORMAT = [] +    OBJECT_CLS = models.File +    DEFAULTS = {('responsible_town_planning_service', 'attached_to'):{ +                    'organization_type':OrganizationType.objects.get( +                                             txt_idx="planning_service")}, +                ('general_contractor', 'attached_to'):{ +                    'organization_type':OrganizationType.objects.get( +                                             txt_idx="general_contractor")}, +                tuple():{ +                    'file_type': models.FileType.objects.get( +                                             txt_idx='undefined'),} +               } + +    def _init_line_format(self): +        tf = TownFormater() +        tf.town_dct_init() +        self.line_format = [ +        ImportFormater('responsible_town_planning_service__name', +                       UnicodeFormater(300), +                       comment=u"Service instructeur - nom", +                       required=False), +        ImportFormater(['address', 'postal_code', 'towns'], +                    [UnicodeFormater(500, clean=True), +                     UnicodeFormater(5, re_filter=RE_CD_POSTAL_FILTER), +                     tf], +                    regexp=RE_ADD_CD_POSTAL_TOWN, +                    regexp_formater_args=[[0], [1], [2, 1]], required=False, +                    comment="Dossier - adresse"), +        ImportFormater('general_contractor__name', +                       UnicodeFormater(200), +                       comment=u"Aménageur - nom", +                       duplicate_field='general_contractor__attached_to__name', +                       required=False), +        ImportFormater(['general_contractor__attached_to__address', +                        'general_contractor__attached_to__postal_code', +                        'general_contractor__attached_to__town'], +                    [UnicodeFormater(500, clean=True), +                     UnicodeFormater(5, re_filter=RE_CD_POSTAL_FILTER), +                     TownFormater(town_full_dct=tf._town_full_dct, +                                  town_dct=tf._town_dct)], +                    regexp=RE_ADD_CD_POSTAL_TOWN, +                    regexp_formater_args=[[0], [1], [2, 1]], required=False, +                    comment="Aménageur - adresse"), +        ImportFormater("general_contractor__title", +                        StrChoiceFormater(Person.TYPE, cli=True), +                        required=False), +        ] + +    def __init__(self, *args, **kwargs): +        super(FileImporterSraPdL, self).__init__(*args, **kwargs) +        self._init_line_format() +        if tuple() not in self._defaults: +            self._defaults[tuple()] = {} +        self._defaults[tuple()]['history_modifier'] = self.history_modifier + +def test(filename): +    importer = FileImporterSraPdL(skip_first_line=True) +    with open(filename) as csv_file: +        encodings = [settings.ENCODING, settings.ALT_ENCODING, 'utf-8'] +        for encoding in encodings: +            try: +                importer.importation([line for line in +                        unicodecsv.reader(csv_file, encoding='utf-8')]) +                #importer.importation(unicode_csv_reader( +                #        [line.decode(encoding) +                #         for line in csv_file.readlines()]) +                print importer.get_csv_errors() +                break +            except ImporterError, e: +                print(unicode(e)) +                if e.type == ImporterError.HEADER and encoding != encodings[-1]: +                    csv_file.seek(0) +                    continue +            except UnicodeDecodeError: +                if encoding != encodings[-1]: +                    csv_file.seek(0) +                    continue | 
