#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2012 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. """ Utils: import archaelogical operation from a CSV file """ DELIMITER = ";" QUOTECHAR = '"' import datetime import csv, codecs import re from django.conf import settings from django.contrib.auth.models import User from django.db import transaction from ishtar_common.models import Town, Person, PersonType from archaeological_operations.models import Operation, OperationType, Period, \ AdministrativeAct, ActType def unicode_csv_reader(unicode_csv_data, dialect=csv.excel, **kwargs): # csv.py doesn't do Unicode; encode temporarily as UTF-8: csv_reader = csv.reader(utf_8_encoder(unicode_csv_data), dialect=dialect, **kwargs) for row in csv_reader: # decode UTF-8 back to Unicode, cell by cell: yield [unicode(cell, 'utf-8') for cell in row] def utf_8_encoder(unicode_csv_data): for line in unicode_csv_data: yield line.encode('utf-8') ope_types = {} for k in settings.ISHTAR_OPE_TYPES.keys(): ot, created = OperationType.objects.get_or_create( txt_idx=settings.ISHTAR_OPE_TYPES[k][0], defaults={'label':settings.ISHTAR_OPE_TYPES[k][1], 'preventive':k[0]==u'préventive'}) ope_types[k] = ot def parse_multivalue(value): s1 = re.sub('(.)([A-Z][a-z]+)', r'\1 \2', name) s1 = re.sub('([a-z0-9])([A-Z])', r'\1 \2', s1) return re.sub('([0-9])([a-z])', r'\1 \2', s1) def parse_operationtype(value, preventive, owner): value = (preventive.strip(), value.strip()) if value not in ope_types: return None return ope_types[value] periods = {} for k in settings.ISHTAR_PERIODS.keys(): periods[k] = Period.objects.get(txt_idx=settings.ISHTAR_PERIODS[k]) periods_keys = periods.keys() periods_keys.sort(key=len) periods_keys.reverse() def parse_period(value): value = value[3:] if value.startswith('EUR') else value while value.endswith('-'): value = value[:-1] value = value[3:] if value.startswith('EUR') else value if not value: return [periods[u'']] period, old_val = [], u'' while value and old_val != value: old_val = value for k in periods_keys: if value.startswith(k): period.append(periods[k]) value = value[len(k):] break return period def parse_date(value): try: return datetime.datetime.strptime(value, '%d/%m/%Y') except: return None def parse_surface(value): value = value.replace(',', '.') try: # hectare en metre carrés value = float(value) * 10000 if value: return value return None except: return None def parse_year(value): try: yr = int(value) except: return None if yr < 1900 or yr > 2100: return None return yr def parse_insee(value): values = [] while len(value) > 4: values.append(value[:5]) value = value[5:] towns = [] for value in values: try: town = Town.objects.get(numero_insee=value) towns.append(town) except: #sys.stderr.write('Numero INSEE : %s non existant en base' % value) continue return towns def parse_patriarche(value): if not value: return value = value.replace(' ', '') try: int(value) except: return return '18' + unicode(value) def parse_operation_code(value): code = value.split('.')[-1] try: return int(code) except: return def parse_title(value): if not value: return return value.title() def parse_person(surname, name, old_ref, owner): values = {"surname":parse_title(surname), "name":parse_title(name)} q = Person.objects.filter(**values) if q.count(): return q.all()[0] else: defaults = {'history_modifier':owner, 'title':'', 'person_type':PersonType.objects.get( txt_idx='head_scientist')} defaults.update(values) p = Person.objects.create(**defaults) return p # si pas de start date : premier janvier de year # attrs, convert DEFAULT_OPE_COLS = [ [], # numéro de dossier ? (('operation_type',), parse_operationtype), (('common_name',), unicode), (('in_charge', 'name'), unicode), (('in_charge', 'surname'), unicode), [], # État ? [], # Adresse ? [], # origine ? (('periods',), parse_period), [], # Programme ? [], # Rattach PC ? [], # vide (('administrative_act', 'ref_sra'), unicode), (('administrative_act', 'signature_date'), parse_date), (('start_date',), parse_date), (('excavation_end_date',), parse_date), (('year',), parse_year), [], # identification (('code_patriarche',), int), [], # X degré [], # Y degré [], # X saisi ? [], # Y saisi ? [], # georef [], # geometrie (('surface',), parse_surface), ] OPE_COLS = settings.ISHTAR_OPE_COL_FORMAT if settings.ISHTAR_OPE_COL_FORMAT \ else DEFAULT_OPE_COLS @transaction.commit_manually def import_operations(values, col_defs=OPE_COLS, update=False, person=None, stdout=None): default_person = person or User.objects.order_by('pk').all()[0] # key : (class, default, reverse) key_classes = { 'administrative_act':(AdministrativeAct, {'history_modifier':default_person, 'act_type':ActType.objects.get( txt_idx='excavation_order')}, 'operation'), } ope_default = {'history_modifier':default_person} new_ops = 0 error_ope, error_reversed, error_multis = [], [], [] for line_idx, vals in enumerate(values): if stdout: stdout.write("\r* line %d" % (line_idx)) if not line_idx: continue # remove header args = {} for col_idx, val in enumerate(vals): if len(col_defs) <= col_idx or not col_defs[col_idx]: continue attrs, typ, extra_cols = col_defs[col_idx] if not callable(typ): typ = globals()[typ] c_args = args for attr in attrs: if attr not in c_args: c_args[attr] = {} c_args = c_args[attr] try: if not extra_cols: v = typ(val) else: arguments = [vals[col_number] for col_number in extra_cols] if not [arg for arg in arguments if arg]: continue arguments += [default_person] v = typ(val, *arguments) except: v = None if len(attrs) == 1: args[attrs[0]] = v elif len(attrs) == 2: args[attrs[0]][attrs[1]] = v elif len(attrs) == 3: args[attrs[0]][attrs[1]][attrs[2]] = v # manage exploded dates for k in args.keys(): if '__year' in k: key = k[:-len('__year')] try: v = datetime.datetime(args[k], args[key+'__month'], args[key+'__day']) args[key] = v except: pass args.pop(k) args.pop(key+'__month') args.pop(key+'__day') reversed_items, multis = [], [] for k in args.keys(): if k in key_classes: cls, default, reverse = key_classes[k] default.update(args[k]) if reverse: reversed_items.append((cls, default, reverse)) args.pop(k) continue try: obj = cls.objects.get(**default) except: obj = cls.objects.create(**default) obj.save() transaction.commit() args[k] = obj elif type(args[k]) == list: multis.append((k, args[k])) args.pop(k) op = None if not update and not args['operation_type']: #print "Pas d'operation_type" continue try: op = Operation.objects.get(code_patriarche=args['code_patriarche']) if not update: #print "Code patriarche existant" continue except: pass # check if not args.get('year') and args.get('start_date'): args['year'] = args['start_date'].year # creation if not op: args.update(ope_default) #if not args.get('operation_code'): # args['operation_code'] = Operation.get_available_operation_code( # args['year']) #try: op = Operation.objects.create(**args) #op.save() new_ops += 1 #except: # error_ope.append((line_idx, args)) # transaction.rollback() # continue transaction.commit() else: # mise à jour try: for k in args: if getattr(op, k): continue setattr(op, k, args[k]) op.save() except: transaction.rollback() continue transaction.commit() try: for cls, default, reverse in reversed_items: default[reverse] = op it = cls(**default).save() except: transaction.rollback() error_reversed.append((line_idx, reversed_items)) continue transaction.commit() try: for k, vals in multis: for v in vals: getattr(op, k).add(v) op.save() except: transaction.rollback() error_multis.append((line_idx, multis)) continue transaction.commit() errors = [] if error_ope: error = "Error while recording theses operations:\n" for line_idx, args in error_ope: error += "line: " + str(line_idx) + " args: " + str(args) + '\n' errors.append(error) if error_multis: error = "Error while recording theses multiples items attached to "\ "operation:" for line_idx, args in error_multis: error += "line: " + str(line_idx) + " args: " + str(args) + '\n' errors.append(error) if error_reversed: error = "Error while recording theses items that depend to operation:" for line_idx, args in error_reversed: error += "line: " + str(line_idx) + " args: " + str(args) + '\n' errors.append(error) return new_ops, errors def import_from_csv(filename, update=False, col_defs=OPE_COLS, person=None, stdout=None): """ Import from a CSV file. Return number of operation treated and errors. """ try: values = unicode_csv_reader(codecs.open(filename, 'rb', "utf-8"), delimiter=DELIMITER, quotechar=QUOTECHAR) except (IOError): return 0, [u"Incorrect CSV file."] new_ops, errors = import_operations(values, col_defs=col_defs, update=update, person=person, stdout=stdout) return new_ops, errors