#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2010-2015 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. """ Models description """ from cStringIO import StringIO import copy import datetime from PIL import Image from importlib import import_module import os import re import tempfile import unicodecsv from django.conf import settings from django.core.cache import cache from django.core.exceptions import ObjectDoesNotExist, ValidationError from django.core.files.uploadedfile import SimpleUploadedFile from django.core.validators import validate_slug from django.core.urlresolvers import reverse, NoReverseMatch from django.db.utils import DatabaseError from django.db.models import Q, Max, Count from django.db.models.signals import post_save from django.utils.translation import ugettext_lazy as _, ugettext from django.utils.safestring import SafeUnicode, mark_safe from django.template.defaultfilters import slugify from django.contrib.auth.models import User, Group from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes import generic from django.contrib.gis.db import models from django.contrib import admin from simple_history.models import HistoricalRecords as BaseHistoricalRecords from ishtar_common.ooo_replace import ooo_replace from ishtar_common.model_merging import merge_model_objects from ishtar_common.utils import get_cache from ishtar_common.data_importer import Importer, ImportFormater, IntegerFormater, \ FloatFormater, UnicodeFormater, DateFormater, TypeFormater def post_save_user(sender, **kwargs): user = kwargs['instance'] ishtaruser = None try: q = IshtarUser.objects.filter(username=user.username) if not q.count(): ishtaruser = IshtarUser.create_from_user(user) else: ishtaruser = q.all()[0] ADMINISTRATOR, created = PersonType.objects.get_or_create( txt_idx='administrator') if ishtaruser.is_superuser \ and not ishtaruser.has_right('administrator'): ishtaruser.person.person_types.add(ADMINISTRATOR) except DatabaseError: # manage when db is not synced pass post_save.connect(post_save_user, sender=User) class ValueGetter(object): _prefix = "" def get_values(self, prefix=''): if not prefix: prefix = self._prefix values = {} for field_name in self._meta.get_all_field_names(): if not hasattr(self, field_name): continue value = getattr(self, field_name) if hasattr(value, 'get_values'): values.update(value.get_values(prefix + field_name + '_')) else: values[prefix + field_name] = value values['KEYS'] = u'\n'.join(values.keys()) value_list = [] for key in values.keys(): if key in ('KEYS', 'VALUES'): continue value_list.append((key, unicode(values[key]))) values['VALUES'] = u'\n'.join( [u"%s: %s" % (k, v) for k, v in sorted(value_list, key=lambda x:x[0])]) for global_var in GlobalVar.objects.all(): values[global_var.slug] = global_var.value or "" return values @classmethod def get_empty_values(cls, prefix=''): if not prefix: prefix = self._prefix values = {} for field_name in cls._meta.get_all_field_names(): values[prefix + field_name] = '' return values class HistoricalRecords(BaseHistoricalRecords): def create_historical_record(self, instance, type): try: history_modifier = getattr(instance, 'history_modifier', None) assert history_modifier except (User.DoesNotExist, AssertionError): # on batch removing of users, user could have disapeared return manager = getattr(instance, self.manager_name) attrs = {} for field in instance._meta.fields: attrs[field.attname] = getattr(instance, field.attname) q_history = instance.history.filter( history_modifier_id=history_modifier.pk ).order_by('-history_date', '-history_id') if not q_history.count(): manager.create(history_type=type, **attrs) return old_instance = q_history.all()[0] # multiple saving by the same user in a very short time are generaly # caused by post_save signals it is not relevant to keep them min_history_date = datetime.datetime.now() \ - datetime.timedelta(seconds=5) q = q_history.filter(history_date__isnull=False, history_date__gt=min_history_date ).order_by('-history_date', '-history_id') if q.count(): return # record a new version only if data have been changed for field in instance._meta.fields: if getattr(old_instance, field.attname) != attrs[field.attname]: manager.create(history_type=type, **attrs) return # valid ID validator for models def valid_id(cls): def func(value): try: cls.objects.get(pk=value) except ObjectDoesNotExist: raise ValidationError(_(u"Not a valid item.")) return func def valid_ids(cls): def func(value): if "," in value: value = value.split(",") for v in value: try: cls.objects.get(pk=v) except ObjectDoesNotExist: raise ValidationError( _(u"An item selected is not a valid item.")) return func # unique validator for models def is_unique(cls, field): def func(value): query = {field:value} try: assert cls.objects.filter(**query).count() == 0 except AssertionError: raise ValidationError(_(u"This item already exist.")) return func class OwnPerms: """ Manage special permissions for object's owner """ @classmethod def get_query_owns(cls, user): """ Query object to get own items """ return None # implement for each object def is_own(self, user): """ Check if the current object is owned by the user """ query = self.get_query_owns(user) if not query: return False query = query & Q(pk=self.pk) return cls.objects.filter(query).count() @classmethod def has_item_of(cls, user): """ Check if the user own some items """ query = cls.get_query_owns(user) if not query: return False return cls.objects.filter(query).count() @classmethod def get_owns(cls, user): """ Get Own items """ if isinstance(user, User): user = IshtarUser.objects.get(user_ptr=user) if user.is_anonymous(): return cls.objects.filter(pk__isnull=True) query = cls.get_query_owns(user) if not query: return cls.objects.filter(pk__isnull=True) return cls.objects.filter(query).order_by(*cls._meta.ordering) class GeneralType(models.Model): """ Abstract class for "types" """ label = models.CharField(_(u"Label"), max_length=100) txt_idx = models.CharField(_(u"Textual ID"), validators=[validate_slug], max_length=30, unique=True) comment = models.TextField(_(u"Comment"), blank=True, null=True) available = models.BooleanField(_(u"Available"), default=True) HELP_TEXT = u"" class Meta: abstract = True unique_together = (('txt_idx', 'available'),) def __unicode__(self): return self.label @classmethod def create_default_for_test(cls): return [cls.objects.create(label='Test %d' % i) for i in range(5)] @property def short_label(self): return self.label @classmethod def get_help(cls, dct={}, exclude=[]): help_text = cls.HELP_TEXT c_rank = -1 help_items = u"\n" for item in cls.get_types(dct=dct, instances=True, exclude=exclude): if hasattr(item, '__iter__'): # TODO: manage multiple levels continue if not item.comment: continue if c_rank > item.rank: help_items += u"\n" elif c_rank < item.rank: help_items += u"
\n" c_rank = item.rank help_items += u"
%s
%s
" % (item.label, u"
".join(item.comment.split('\n'))) c_rank += 1 if c_rank: help_items += c_rank*u"
" if help_text or help_items != u'\n': return mark_safe(help_text + help_items) return u"" @classmethod def get_types(cls, dct={}, instances=False, exclude=[], empty_first=True, default=None): base_dct = dct.copy() if hasattr(cls, 'parent'): return cls._get_parent_types(base_dct, instances, exclude=exclude, empty_first=empty_first, default=default) return cls._get_types(base_dct, instances, exclude=exclude, empty_first=empty_first, default=default) @classmethod def _get_types(cls, dct={}, instances=False, exclude=[], empty_first=True, default=None): dct['available'] = True if not instances and empty_first and not default: yield ('', '--') if default: try: default = cls.objects.get(txt_idx=default) yield(default.pk, _(unicode(default))) except cls.DoesNotExist: pass items = cls.objects.filter(**dct) if default: exclude.append(default.txt_idx) if exclude: items = items.exclude(txt_idx__in=exclude) for item in items.order_by(*cls._meta.ordering).all(): if instances: item.rank = 0 yield item else: yield (item.pk, _(unicode(item))) PREFIX = "› " @classmethod def _get_childs(cls, item, dct, prefix=0, instances=False, exclude=[]): prefix += 1 dct['parent'] = item childs = cls.objects.filter(**dct) if exclude: childs = childs.exclude(txt_idx__in=exclude) if hasattr(cls, 'order'): childs = childs.order_by('order') for child in childs.all(): if instances: child.rank = prefix yield child else: yield (child.pk, SafeUnicode(prefix*cls.PREFIX + \ unicode(_(unicode(child))) )) for sub_child in cls._get_childs(child, dct, prefix, instances, exclude=exclude): yield sub_child @classmethod def _get_parent_types(cls, dct={}, instances=False, exclude=[], empty_first=True, default=None): dct['available'] = True if not instances and empty_first: yield ('', '--') dct['parent'] = None items = cls.objects.filter(**dct) if exclude: items = items.exclude(txt_idx__in=exclude) if hasattr(cls, 'order'): items = items.order_by('order') for item in items.all(): if instances: item.rank = 0 yield item else: yield (item.pk, unicode(item)) for child in cls._get_childs(item, dct, instances, exclude=exclude): yield child def save(self, *args, **kwargs): if not self.id and not self.label: self.label = u" ".join(u" ".join(self.txt_idx.split('-') ).split('_')).title() if not self.txt_idx: self.txt_idx = slugify(self.label) # clean old keys if self.pk: old = self.__class__.objects.get(pk=self.pk) content_type = ContentType.objects.get_for_model(self.__class__) if slugify(self.label) != slugify(old.label): ItemKey.objects.filter(object_id=self.pk, key=slugify(old.label), content_type=content_type).delete() if self.txt_idx != old.txt_idx: ItemKey.objects.filter(object_id=self.pk, key=old.txt_idx, content_type=content_type).delete() obj = super(GeneralType, self).save(*args, **kwargs) self.generate_key(force=True) return obj def add_key(self, key, force=False): content_type = ContentType.objects.get_for_model(self.__class__) if not force and ItemKey.objects.filter(key=key, content_type=content_type).count(): return if force: ItemKey.objects.filter(key=key, content_type=content_type).exclude( object_id=self.pk).delete() ItemKey.objects.get_or_create(object_id=self.pk, key=key, content_type=content_type) def generate_key(self, force=False): for key in (slugify(self.label), self.txt_idx): self.add_key(key) def get_keys(self): keys = [] content_type = ContentType.objects.get_for_model(self.__class__) for ik in ItemKey.objects.filter(content_type=content_type, object_id=self.pk).all(): keys.append(ik.key) return keys @classmethod def generate_keys(cls): content_type = ContentType.objects.get_for_model(cls) for item in cls.objects.all(): item.generate_key() class ItemKey(models.Model): key = models.CharField(_(u"Key"), max_length=100) content_type = models.ForeignKey(ContentType) object_id = models.PositiveIntegerField() content_object = generic.GenericForeignKey('content_type', 'object_id') importer = models.ForeignKey('Import', null=True, blank=True, help_text=_(u"Key specific to an import")) def __unicode__(self): return self.key class ImageModel(models.Model): image = models.ImageField(upload_to="upload/", blank=True, null=True) thumbnail = models.ImageField(upload_to='upload/thumbs/', blank=True, null=True) IMAGE_MAX_SIZE = settings.IMAGE_MAX_SIZE THUMB_MAX_SIZE = settings.THUMB_MAX_SIZE class Meta: abstract = True def has_changed(self, field): if not self.pk: return True manager = getattr(self.__class__, 'objects') old = getattr(manager.get(pk=self.pk), field) return not getattr(self, field) == old def create_thumb(self, image, size): """Returns the image resized to fit inside a box of the given size""" image.thumbnail(size, Image.ANTIALIAS) temp = StringIO() image.save(temp, 'jpeg') temp.seek(0) return SimpleUploadedFile('temp', temp.read()) def save(self, *args, **kwargs): # manage images if self.has_changed('image') and self.image: # convert to jpg filename = os.path.splitext(os.path.split(self.image.name)[-1])[0] old_path = self.image.path filename = "%s.jpg" % filename image = Image.open(self.image.file) # convert to RGB if image.mode not in ('L', 'RGB'): image = image.convert('RGB') # resize if necessary self.image.save(filename, self.create_thumb(image, self.IMAGE_MAX_SIZE), save=False) if old_path != self.image.path: os.remove(old_path) # save the thumbnail self.thumbnail.save('_%s' % filename, self.create_thumb(image, self.THUMB_MAX_SIZE), save=False) super(ImageModel, self).save(*args, **kwargs) class HistoryError(Exception): def __init__(self, value): self.value = value def __str__(self): return repr(self.value) class BaseHistorizedItem(models.Model): history_modifier = models.ForeignKey(User, related_name='+', on_delete=models.SET_NULL, verbose_name=_(u"Last editor"), blank=True, null=True) history_creator = models.ForeignKey(User, related_name='+', on_delete=models.SET_NULL, verbose_name=_(u"Creator"), blank=True, null=True) class Meta: abstract = True def save(self, *args, **kwargs): assert hasattr(self, 'history_modifier') == True if not self.id: self.history_creator = self.history_modifier super(BaseHistorizedItem, self).save(*args, **kwargs) return True def get_previous(self, step=None, date=None, strict=True): """ Get a "step" previous state of the item """ assert step or date historized = self.history.all() item = None if step: assert len(historized) > step item = historized[step] else: for step, item in enumerate(historized): if item.history_date == date: break # ended with no match if item.history_date != date: return item._step = step if len(historized) != (step + 1): item._previous = historized[step + 1].history_date else: item._previous = None if step > 0: item._next = historized[step - 1].history_date else: item._next = None item.history_date = historized[step].history_date model = self.__class__ for k in model._meta.get_all_field_names(): field = model._meta.get_field_by_name(k)[0] if hasattr(field, 'rel') and field.rel: if not hasattr(item, k+'_id'): setattr(item, k, getattr(self, k)) continue val = getattr(item, k+'_id') if not val: setattr(item, k, None) continue try: val = field.rel.to.objects.get(pk=val) setattr(item, k, val) except ObjectDoesNotExist: if strict: raise HistoryError(u"The class %s has no pk %d" % ( unicode(field.rel.to), val)) setattr(item, k, None) item.pk = self.pk return item @property def last_edition_date(self): try: return self.history.order_by('-history_date').all()[0].history_date except IndexError: return def rollback(self, date): """ Rollback to a previous state """ to_del, new_item = [], None for item in self.history.all(): if item.history_date == date: new_item = item break to_del.append(item) if not new_item: raise HistoryError(u"The date to rollback to doesn't exist.") try: for f in self._meta.fields: k = f.name if k != 'id' and hasattr(self, k): if not hasattr(new_item, k): k = k + "_id" setattr(self, k, getattr(new_item, k)) try: self.history_modifier = User.objects.get( pk=new_item.history_modifier_id) except User.ObjectDoesNotExist: pass self.save() except: raise HistoryError(u"The rollback has failed.") # clean the obsolete history for historized_item in to_del: historized_item.delete() def values(self): values = {} for f in self._meta.fields: k = f.name if k != 'id': values[k] = getattr(self, k) return values def get_show_url(self): try: return reverse('show-'+self.__class__.__name__.lower(), args=[self.pk, '']) except NoReverseMatch: return @property def associated_filename(self): if [True for attr in ('get_town_label', 'get_department', 'reference', 'short_class_name') if not hasattr(self, attr)]: return '' items = [slugify(self.get_department()), slugify(self.get_town_label()).upper(), slugify(self.short_class_name), slugify(self.reference), slugify(self.name or '').replace('-', '_').capitalize()] last_edition_date = self.last_edition_date if last_edition_date: items.append(last_edition_date.strftime('%Y%m%d')) else: items.append('00000000') return u"-".join([unicode(item) for item in items]) class ShortMenuItem(object): def get_short_menu_class(self): return '' class LightHistorizedItem(BaseHistorizedItem): history_date = models.DateTimeField(default=datetime.datetime.now) class Meta: abstract = True def save(self, *args, **kwargs): super(LightHistorizedItem, self).save(*args, **kwargs) return True class GlobalVar(models.Model): slug = models.SlugField(_(u"Variable name"), unique=True) description = models.TextField(_(u"Description of the variable"), null=True, blank=True) value = models.TextField(_(u"Value"), null=True, blank=True) class Meta: verbose_name = _(u"Global variable") verbose_name_plural = _(u"Global variables") ordering = ['slug'] def __unicode__(self): return unicode(self.slug) @classmethod def get_cache(cls, slug): cache_key, value = get_cache(cls, slug) if value: return value try: obj = cls.objects.get(slug=slug) cache.set(cache_key, obj.value, settings.CACHE_TIMEOUT) return obj.value except cls.DoesNotExist: return None def cached_globalvar_changed(sender, **kwargs): if not kwargs['instance']: return var = kwargs['instance'] cache_key, value = get_cache(GlobalVar, var.slug) cache.set(cache_key, var.value, settings.CACHE_TIMEOUT) post_save.connect(cached_globalvar_changed, sender=GlobalVar) class UserDashboard: def __init__(self): types = IshtarUser.objects.values('person__person_types', 'person__person_types__label') self.types = types.annotate(number=Count('pk'))\ .order_by('person__person_types') class DashboardFormItem(object): @classmethod def get_periods(cls, slice='month', fltr={}, date_source='creation'): date_var = date_source + '_date' q = cls.objects.filter(**{date_var+'__isnull':False}) if fltr: q = q.filter(**fltr) if slice == 'year': return [res[date_var].year for res in list(q.values(date_var ).annotate(Count("id")).order_by())] elif slice == 'month': return [(res[date_var].year, res[date_var].month) for res in list(q.values(date_var ).annotate(Count("id")).order_by())] return [] @classmethod def get_by_year(cls, year, fltr={}, date_source='creation'): date_var = date_source + '_date' q = cls.objects.filter(**{date_var+'__isnull':False}) if fltr: q = q.filter(**fltr) return q.filter(**{date_var+'__year':year}).distinct('pk') @classmethod def get_by_month(cls, year, month, fltr={}, date_source='creation'): date_var = date_source + '_date' q = cls.objects.filter(**{date_var+'__isnull':False}) if fltr: q = q.filter(**fltr) q = q.filter(**{date_var+'__year':year, date_var+'__month':month}) return q.distinct('pk') @classmethod def get_total_number(cls, fltr={}): q = cls.objects if fltr: q = q.filter(**fltr) return q.distinct('pk').count() class Dashboard: def __init__(self, model, slice='year', date_source=None, show_detail=None, fltr={}): # don't provide date_source if it is not relevant self.model = model self.total_number = model.get_total_number(fltr) self.show_detail = show_detail history_model = self.model.history.model # last edited - created self.recents, self.lasts = [], [] for last_lst, modif_type in ((self.lasts, '+'), (self.recents, '~')): last_ids = history_model.objects.values('id')\ .annotate(hd=Max('history_date')) last_ids = last_ids.filter(history_type=modif_type) from archaeological_finds.models import Find if self.model == Find: last_ids = last_ids.filter(downstream_treatment_id__isnull=True) if modif_type == '+': last_ids = last_ids.filter(upstream_treatment_id__isnull=True) last_ids = last_ids.order_by('-hd').distinct().all()[:5] for idx in last_ids: try: obj = self.model.objects.get(pk=idx['id']) except: # deleted object are always referenced in history continue obj.history_date = idx['hd'] last_lst.append(obj) # years base_kwargs = {'fltr':fltr.copy()} if date_source: base_kwargs['date_source'] = date_source periods_kwargs = copy.deepcopy(base_kwargs) periods_kwargs['slice'] = slice self.periods = model.get_periods(**periods_kwargs) self.periods = list(set(self.periods)) self.periods.sort() if not self.total_number or not self.periods: return kwargs_num = copy.deepcopy(base_kwargs) self.serie_labels = [_(u"Total")] # numbers if slice == 'year': self.values = [('year', "", list(reversed(self.periods)))] self.numbers = [model.get_by_year(year, **kwargs_num).count() for year in self.periods] self.values += [('number', _(u"Number"), list(reversed(self.numbers)))] if slice == 'month': periods = list(reversed(self.periods)) self.periods = ["%d-%s-01" % (p[0], ('0'+str(p[1])) if len(str(p[1])) == 1 else p[1]) for p in periods] self.values = [('month', "", self.periods)] if show_detail: for dpt in settings.ISHTAR_DPTS: self.serie_labels.append(unicode(dpt)) idx = 'number_' + unicode(dpt) kwargs_num['fltr']["towns__numero_insee__startswith"] = \ unicode(dpt) numbers = [model.get_by_month(*p.split('-')[:2], **kwargs_num).count() for p in self.periods] self.values += [(idx, dpt, list(numbers))] # put "Total" at the end self.serie_labels.append(self.serie_labels.pop(0)) kwargs_num = base_kwargs.copy() self.numbers = [model.get_by_month(*p.split('-')[:2], **kwargs_num).count() for p in self.periods] self.values += [('number', _(u"Total"), list(self.numbers))] # calculate self.average = self.get_average() self.variance = self.get_variance() self.standard_deviation = self.get_standard_deviation() self.median = self.get_median() self.mode = self.get_mode() # by operation if not hasattr(model, 'get_by_operation'): return operations = model.get_operations() operation_numbers = [model.get_by_operation(op).count() for op in operations] # calculate self.operation_average = self.get_average(operation_numbers) self.operation_variance = self.get_variance(operation_numbers) self.operation_standard_deviation = self.get_standard_deviation( operation_numbers) self.operation_median = self.get_median(operation_numbers) operation_mode_pk = self.get_mode(dict(zip(operations, operation_numbers))) if operation_mode_pk: from archaeological_operations.models import Operation self.operation_mode = unicode(Operation.objects.get( pk=operation_mode_pk)) def get_average(self, vals=[]): if not vals: vals = self.numbers[:] return sum(vals)/len(vals) def get_variance(self, vals=[]): if not vals: vals = self.numbers[:] avrg = self.get_average(vals) return self.get_average([(x-avrg)**2 for x in vals]) def get_standard_deviation(self, vals=[]): if not vals: vals = self.numbers[:] return round(self.get_variance(vals)**0.5, 3) def get_median(self, vals=[]): if not vals: vals = self.numbers[:] len_vals = len(vals) vals.sort() if (len_vals % 2) == 1: return vals[len_vals/2] else: return (vals[len_vals/2-1] + vals[len_vals/2])/2.0 def get_mode(self, vals={}): if not vals: vals = dict(zip(self.periods, self.numbers[:])) mx = max(vals.values()) for v in vals: if vals[v] == mx: return v class DocumentTemplate(models.Model): CLASSNAMES = (('archaeological_operations.models.AdministrativeAct', _(u"Administrative Act")),) name = models.CharField(_(u"Name"), max_length=100) template = models.FileField(_(u"Template"), upload_to="upload/templates/") associated_object_name = models.CharField(_(u"Associated object"), max_length=100, choices=CLASSNAMES) available = models.BooleanField(_(u"Available"), default=True) class Meta: verbose_name = _(u"Document template") verbose_name_plural = _(u"Document templates") ordering = ['associated_object_name', 'name'] def __unicode__(self): return self.name @classmethod def get_tuples(cls, dct={}, empty_first=True): dct['available'] = True if empty_first: yield ('', '----------') items = cls.objects.filter(**dct) for item in items.distinct().order_by(*cls._meta.ordering).all(): yield (item.pk, _(unicode(item))) def publish(self, c_object): tempdir = tempfile.mkdtemp("-ishtardocs") output_name = tempdir + os.path.sep + \ slugify(self.name.replace(' ', '_').lower()) + u'-' +\ datetime.date.today().strftime('%Y-%m-%d') +\ u"." + self.template.name.split('.')[-1] values = c_object.get_values() missing = ooo_replace(self.template, output_name, values) return output_name class Department(models.Model): label = models.CharField(_(u"Label"), max_length=30) number = models.CharField(_(u"Number"), unique=True, max_length=3) class Meta: verbose_name = _(u"Department") verbose_name_plural = _(u"Departments") ordering = ['number'] def __unicode__(self): return u"%s (%s)" % (self.label, self.number) class Address(BaseHistorizedItem): address = models.TextField(_(u"Address"), null=True, blank=True) address_complement = models.TextField(_(u"Address complement"), null=True, blank=True) postal_code = models.CharField(_(u"Postal code"), max_length=10, null=True, blank=True) town = models.CharField(_(u"Town"), max_length=70, null=True, blank=True) country = models.CharField(_(u"Country"), max_length=30, null=True, blank=True) phone = models.CharField(_(u"Phone"), max_length=18, null=True, blank=True) mobile_phone = models.CharField(_(u"Mobile phone"), max_length=18, null=True, blank=True) email = models.EmailField(_(u"Email"), max_length=75, blank=True, null=True) history = HistoricalRecords() class Meta: abstract = True class Merge(models.Model): merge_key = models.CharField(_("Merge key"), max_length=300, blank=True, null=True) merge_candidate = models.ManyToManyField("self", blank=True, null=True) merge_exclusion = models.ManyToManyField("self", blank=True, null=True) # 1 for one word similarity, 2 for two word similarity, etc. MERGE_CLEMENCY = None EMPTY_MERGE_KEY = '--' class Meta: abstract = True def generate_merge_key(self): self.merge_key = slugify(self.name if self.name else '')[:300] if not self.merge_key: self.merge_key = self.EMPTY_MERGE_KEY self.merge_key = self.merge_key[:300] def generate_merge_candidate(self): if not self.merge_key: self.generate_merge_key() self.save() if not self.pk or self.merge_key == self.EMPTY_MERGE_KEY: return q = self.__class__.objects.exclude(pk=self.pk ).exclude(merge_exclusion=self ).exclude(merge_candidate=self) if not self.MERGE_CLEMENCY: q = q.filter(merge_key=self.merge_key) else: subkeys_front = u"-".join( self.merge_key.split('-')[:self.MERGE_CLEMENCY]) subkeys_back = u"-".join( self.merge_key.split('-')[-self.MERGE_CLEMENCY:]) q = q.filter(Q(merge_key__istartswith=subkeys_front) | Q(merge_key__iendswith=subkeys_back)) for item in q.all(): self.merge_candidate.add(item) def save(self, *args, **kwargs): self.generate_merge_key() item = super(Merge, self).save(*args, **kwargs) self.generate_merge_candidate() def merge(self, item): merge_model_objects(self, item) self.generate_merge_candidate() class OrganizationType(GeneralType): class Meta: verbose_name = _(u"Organization type") verbose_name_plural = _(u"Organization types") ordering = ('label',) MODELS = [ ('archaeological_operations.models.Operation', _(u"Operation")), ('archaeological_operations.models.Parcel', _(u"Parcels")), ('archaeological_operations.models.OperationSource', _(u"Operation source")), ] if 'archaeological_files' in settings.INSTALLED_APPS: MODELS = [('archaeological_files.models.File', _(u"Archaeological files")), ] + MODELS def get_model_fields(model): """ Return a dict of fields from model To be replace in Django 1.8 with get_fields, get_field """ fields = {} options = model._meta for field in sorted(options.concrete_fields + options.many_to_many + options.virtual_fields): fields[field.name] = field return fields class ImporterType(models.Model): """ Description of a table to be mapped with ishtar database """ name = models.CharField(_(u"Name"), blank=True, null=True, max_length=100) description = models.CharField(_(u"Description"), blank=True, null=True, max_length=500) users = models.ManyToManyField('IshtarUser', verbose_name=_(u"Users"), blank=True, null=True) associated_models = models.CharField(_(u"Associated model"), max_length=200, choices=MODELS) is_template = models.BooleanField(_(u"Is template"), default=False) class Meta: verbose_name = _(u"Importer - Type") verbose_name_plural = _(u"Importer - Types") def __unicode__(self): return self.name @property def importer_class(self): name = ''.join(x for x in slugify(self.name).replace('-', ' ').title() if not x.isspace()) OBJECT_CLS = import_module(self.associated_models) DEFAULTS = dict((default.keys, default.values) for default in self.defaults.all()) LINE_FORMAT = [] idx = 0 for column in self.columns.order_by('col_number').all(): idx += 1 while column.order > idx: LINE_FORMAT.append(None) idx += 1 targets = None formater_types = None nb = column.targets.count() if not nb: LINE_FORMAT.append(None) continue for target in column.targets.all(): ft = target.formater_type.get_formater_type(target) if not ft: continue formater_types.append(ft) targets.append(target.target) formater_kwargs = {} if column.regexp_pre_filter: formater_kwargs['regexp'] = re.compile( column.regexp_pre_filter.regexp) formater_kwargs['duplicate_fields'] = [field.field_name for field in column.duplicate_fields.all()] formater = ImportFormater(targets, formater_types, **formater_kwargs) LINE_FORMAT.append(formater) args = {'OBJECT_CLS':OBJECT_CLS, 'DESC':self.description, 'DEFAULTS':DEFAULTS, 'LINE_FORMAT':LINE_FORMAT} newclass = type(name, (Importer,), args) return newclass class ImporterDefault(models.Model): """ Targets of default values in an import """ importer_type = models.ForeignKey(ImporterType, related_name='defaults') target = models.CharField(u"Target", max_length=500) class Meta: verbose_name = _(u"Importer - Default") verbose_name_plural = _(u"Importer - Defaults") @property def keys(self): return default.target.split('__') @property def associated_model(self): field = None OBJECT_CLS = import_module(self.importer_type.associated_models) for idx, item in enumerate(self.keys): if not idx: field = get_model_fields(OBJECT_CLS)[item] else: raise NotImplemented() if hasattr(field, 'rel') and hasattr(field.rel, 'to'): return field.rel.to @property def values(self): values = {} for default_value in self.default_values.all(): values[default_value.target] = default_value.get_value() return values class ImporterDefaultValues(models.Model): """ Default values in an import """ default_target = models.ForeignKey(ImporterDefault, related_name='default_values') target = models.CharField(u"Target", max_length=500) value = models.CharField(u"Value", max_length=500) class Meta: verbose_name = _(u"Importer - Default value") verbose_name_plural = _(u"Importer - Default values") def get_value(self): model = self.default_target.associated_model if not model: return self.value # if value is an id try: return model.objects.get(pk=int(self.value)) except (ValueError, model.DoesNotExist): pass # try with txt_idx try: return model.objects.get(txt_idx=self.value) except (ValueError, model.DoesNotExist): pass return "" class ImporterColumn(models.Model): """ Import file column description """ importer_type = models.ForeignKey(ImporterType, related_name='columns') col_number = models.IntegerField(_(u"Column number"), default=1) regexp_pre_filter = models.ForeignKey("Regexp", blank=True, null=True) required = models.BooleanField(_(u"Required"), default=False) class Meta: verbose_name = _(u"Importer - Column") verbose_name_plural = _(u"Importer - Columns") class ImporterDuplicateField(models.Model): """ Direct copy of result in other fields """ column = models.ForeignKey(ImporterColumn, related_name='duplicate_fields') field_name = models.CharField(_(u"Field name"), blank=True, null=True, max_length=200) class Meta: verbose_name = _(u"Importer - Duplicate field") verbose_name_plural = _(u"Importer - Duplicate fields") class Regexp(models.Model): name = models.CharField(_(u"Name"), max_length=100) description = models.CharField(_(u"Description"), blank=True, null=True, max_length=500) regexp = models.CharField(_(u"Regular expression"), max_length=500) class Meta: verbose_name = _(u"Importer - Regular expression") verbose_name_plural = _(u"Importer - Regular expressions") IMPORTER_TYPES = [] class ImportTarget(models.Model): """ Ishtar database target for a column """ column = models.ForeignKey(ImporterColumn, related_name='targets') target = models.CharField(u"Target", max_length=500) regexp_filter = models.ForeignKey("Regexp", blank=True, null=True) formater_type = models.ForeignKey("FormaterType") class Meta: verbose_name = _(u"Importer - Target") verbose_name_plural = _(u"Importer - Targets") def __unicode__(self): return u" - ".join([unicode(self.column), self.target[:50]]) class TargetKey(models.Model): """ User's link between import source and ishtar database. Also temporary used for GeneralType to point missing link before adding them in ItemKey table """ target = models.ForeignKey(ImporterColumn, related_name='keys') key = models.TextField(_(u"Key"), blank=True, null=True) value = models.TextField(_(u"Value")) is_set = models.BooleanField(_(u"Is set"), default=False) class Meta: unique_together = ('target', 'value') def __unicode__(self): return u" - ".join([unicode(self.target), self.key[:50]]) TARGET_MODELS = [ ('OrganizationType', _(u"Organization type")), ('SourceType', _(u"Source type")), ('AuthorType', _(u"Author type")), ('Format', _(u"Format")), ('archaeological_operations.models.OperationType', _(u"OperationType")), ('archaeological_operations.models.Period', _(u"Period")), ] TARGET_MODELS_KEYS = (tm[0] for tm in TARGET_MODELS) IMPORTER_TYPES = ( ('IntegerFormater', _(u"Integer")), ('FloatFormater', _(u"Float")), ('UnicodeFormater', _(u"String")), ('DateFormater', _(u"Date")), ('TypeFormater', _(u"Type")), ) IMPORTER_TYPES_DCT = { 'IntegerFormater':IntegerFormater, 'FloatFormater':FloatFormater, 'UnicodeFormater':UnicodeFormater, 'DateFormater':DateFormater, 'TypeFormater':TypeFormater, } DATE_FORMATS = ( ('%Y', _(u"4 digit year. e.g.: \"2015\"")), ('%Y/%m/%d', _(u"4 digit year/month/day. e.g.: \"2015/02/04\"")), ('%d/%m/%Y', _(u"Day/month/4 digit year. e.g.: \"04/02/2015\"")), ) IMPORTER_TYPES_CHOICES = {'TypeFormater':TARGET_MODELS, 'DateFormater':DATE_FORMATS} class FormaterType(models.Model): formater_type = models.CharField(u"Formater type", max_length=20, choices=IMPORTER_TYPES) options = models.CharField(_(u"Options"), max_length=500, blank=True, null=True) many_split = models.CharField(_(u"Split character(s)"), max_length=10, blank=True, null=True) class Meta: verbose_name = _(u"Importer - Formater type") verbose_name_plural = _(u"Importer - Formater types") unique_together = ('formater_type', 'options', 'many_split') def __unicode__(self): return u" - ".join([unicode(dict(IMPORTER_TYPES)[self.formater_type]) if self.formater_type in IMPORTER_TYPES_DCT else ''] + [getattr(self, k) for k in ('options', 'many_split') if getattr(self, k)]) def get_choices(self): if self.format_type in IMPORTER_TYPES_CHOICES: return IMPORTER_TYPES_CHOICES[self.format_type] def get_formater_type(self, target): if self.formater_type not in IMPORTER_TYPES_DCT.keys(): return kwargs = {'target':target} if self.many_split: kwargs['many_split'] = self.many_split if self.formater_type == 'TypeFormater': if self.options not in TARGET_MODELS_KEYS: return model = None if self.options in dir(): model = dir()[self.options] else: model = import_module(self.options) return TypeFormater(model, **kwargs) elif self.formater_type == 'IntegerFormater': return IntegerFormater(**kwargs) elif self.formater_type == 'FloatFormater': return FloatFormater(**kwargs) elif self.format_type == 'UnicodeFormater': try: return UnicodeFormater(int(self.options.strip()), **kwargs) except ValueError: return elif self.format_type == 'DateFormater': return DateFormater(self.options, **kwargs) IMPORT_STATE = (("C", _(u"Created")), ("AP", _(u"Analyse in progress")), ("A", _(u"Analysed")), ("P", _(u"Import pending")), ("IP", _(u"Import in progress")), ("F", _(u"Finished"))) class Import(models.Model): user = models.ForeignKey('IshtarUser') importer_type = models.ForeignKey(ImporterType) imported_file = models.FileField(_(u"Imported file"), upload_to="upload/imports/") skip_lines = models.IntegerField(default=1) error_file = models.FileField(_(u"Error file"), upload_to="upload/imports/", blank=True, null=True) result_file = models.FileField(_(u"Result file"), upload_to="upload/imports/", blank=True, null=True) state = models.CharField(_(u"State"), max_length=2, choices=IMPORT_STATE, default='C') creation_date = models.DateTimeField(_(u"Creation date"), auto_now_add=True, blank=True, null=True) end_date = models.DateTimeField(_(u"End date"), blank=True, null=True, editable=False) seconds_remaining = models.IntegerField(_(u"Seconds remaining"), blank=True, null=True, editable=False) class Meta: verbose_name = _(u"Import") verbose_name_plural = _(u"Imports") def __unicode__(self): return u"%s - %s" % (unicode(self.importer_type), unicode(self.user)) @property def importer_instance(self): return self.importer_type.importer_class(skip_lines=self.skip_lines, import_instance=self) @property def data_table(self): encodings = [settings.ENCODING, settings.ALT_ENCODING, 'utf-8'] with open(self.imported_file.filename) as csv_file: for encoding in encodings: try: return [line for line in unicodecsv.reader(csv_file, encoding=encoding)] except UnicodeDecodeError: if encoding != encodings[-1]: csv_file.seek(0) continue return [] def initialize(self): self.importer_instance.initialize(self.data_table, output='db') class Organization(Address, Merge, OwnPerms, ValueGetter): TABLE_COLS = ('name', 'organization_type',) name = models.CharField(_(u"Name"), max_length=300) organization_type = models.ForeignKey(OrganizationType, verbose_name=_(u"Type")) history = HistoricalRecords() class Meta: verbose_name = _(u"Organization") verbose_name_plural = _(u"Organizations") permissions = ( ("view_organization", ugettext(u"Can view all Organization")), ("view_own_organization", ugettext(u"Can view own Organization")), ("add_own_organization", ugettext(u"Can add own Organization")), ("change_own_organization", ugettext(u"Can change own Organization")), ("delete_own_organization", ugettext(u"Can delete own Organization")), ) def __unicode__(self): return self.name def generate_merge_key(self): self.merge_key = slugify(self.name if self.name else '') if not self.merge_key: self.merge_key = self.EMPTY_MERGE_KEY if self.town: self.merge_key += "-" + slugify(self.town or '') self.merge_key = self.merge_key[:300] @property def associated_filename(self): values = [unicode(getattr(self, attr)) for attr in ('organization_type', 'name') if getattr(self, attr)] return slugify(u"-".join(values)) class PersonType(GeneralType): #rights = models.ManyToManyField(WizardStep, verbose_name=_(u"Rights")) groups = models.ManyToManyField(Group, verbose_name=_(u"Groups"), blank=True, null=True) class Meta: verbose_name = _(u"Person type") verbose_name_plural = _(u"Person types") ordering = ('label',) class Person(Address, Merge, OwnPerms, ValueGetter) : _prefix = 'person_' TYPE = ( ('Mr', _(u'Mr')), ('Ms', _(u'Miss')), ('Mr and Miss', _(u'Mr and Miss')), ('Md', _(u'Mrs')), ('Dr', _(u'Doctor')), ) TABLE_COLS = ('name', 'surname', 'email', 'person_types_list', 'attached_to') title = models.CharField(_(u"Title"), max_length=2, choices=TYPE, blank=True, null=True) surname = models.CharField(_(u"Surname"), max_length=50, blank=True, null=True) name = models.CharField(_(u"Name"), max_length=200, blank=True, null=True) raw_name = models.CharField(_(u"Raw name"), max_length=300, blank=True, null=True) person_types = models.ManyToManyField(PersonType, verbose_name=_(u"Types")) attached_to = models.ForeignKey('Organization', related_name='members', on_delete=models.SET_NULL, verbose_name=_(u"Is attached to"), blank=True, null=True) class Meta: verbose_name = _(u"Person") verbose_name_plural = _(u"Persons") permissions = ( ("view_person", ugettext(u"Can view all Person")), ("view_own_person", ugettext(u"Can view own Person")), ("add_own_person", ugettext(u"Can add own Person")), ("change_own_person", ugettext(u"Can change own Person")), ("delete_own_person", ugettext(u"Can delete own Person")), ) def __unicode__(self): values = [unicode(getattr(self, attr)) for attr in ('surname', 'name') if getattr(self, attr)] if not values: values = [self.raw_name or ""] if self.attached_to: values.append(u"- " + unicode(self.attached_to)) return u" ".join(values) def get_values(self, prefix=''): values = super(Person, self).get_values(prefix=prefix) title = '' TYPES = dict(self.TYPE) if self.title in TYPES: title = dict(self.TYPE)[self.title] values[prefix+'title'] = title if not self.attached_to: values.update(Person.get_empty_values(prefix=prefix + 'attached_to_')) return values person_types_list_lbl = _(u"Types") @property def person_types_list(self): return u", ".join([unicode(pt) for pt in self.person_types.all()]) def generate_merge_key(self): if self.name and self.name.strip(): self.merge_key = slugify(self.name.strip()) + ( (u'-' + slugify(self.surname.strip())) if self.surname else u'') elif self.raw_name and self.raw_name.strip(): self.merge_key = slugify(self.raw_name.strip()) elif self.attached_to: self.merge_key = self.attached_to.merge_key else: self.merge_key = self.EMPTY_MERGE_KEY if self.merge_key != self.EMPTY_MERGE_KEY and self.attached_to: self.merge_key += "-" + self.attached_to.merge_key self.merge_key = self.merge_key[:300] def has_right(self, right_name): if '.' in right_name: right_name = right_name.split('.')[-1] if type(right_name) in (list, tuple): return bool(self.person_types.filter( txt_idx__in=right_name).count()) or \ bool(self.person_types.filter( groups__permissions__codename__in=right_name).count()) # or self.person_types.filter(wizard__url_name__in=right_name).count()) return bool(self.person_types.filter(txt_idx=right_name).count()) or \ bool(self.person_types.filter( groups__permissions__codename=right_name).count()) # or self.person_types.filter(wizard__url_name=right_name).count()) def full_label(self): values = [] if self.title: values = [unicode(_(self.title))] values += [unicode(getattr(self, attr)) for attr in ('surname', 'name') if getattr(self, attr)] if not values and self.raw_name: values = [self.raw_name] if self.attached_to: values.append(u"- " + unicode(self.attached_to)) return u" ".join(values) @property def associated_filename(self): values = [unicode(getattr(self, attr)) for attr in ('surname', 'name', 'attached_to') if getattr(self, attr)] return slugify(u"-".join(values)) class IshtarUser(User): person = models.ForeignKey(Person, verbose_name=_(u"Person"), unique=True, related_name='ishtaruser') class Meta: verbose_name = _(u"Ishtar user") verbose_name_plural = _(u"Ishtar users") @classmethod def create_from_user(cls, user): default = user.username surname = user.first_name or default name = user.last_name or default email = user.email person_type = None if user.is_superuser: ADMINISTRATOR, created = PersonType.objects.get_or_create( txt_idx='administrator') person_type = ADMINISTRATOR else: person_type, created = PersonType.objects.get_or_create( txt_idx='public_access') person = Person.objects.create(title='Mr', surname=surname, name=name, email=email, history_modifier=user) person.person_types.add(person_type) return IshtarUser.objects.create(user_ptr=user, username=default, person=person) def has_right(self, right_name): return self.person.has_right(right_name) def full_label(self): return self.person.full_label() class AuthorType(GeneralType): class Meta: verbose_name = _(u"Author type") verbose_name_plural = _(u"Author types") class Author(models.Model): person = models.ForeignKey(Person, verbose_name=_(u"Person"), related_name='author') author_type = models.ForeignKey(AuthorType, verbose_name=_(u"Author type")) class Meta: verbose_name = _(u"Author") verbose_name_plural = _(u"Authors") def __unicode__(self): return unicode(self.person) + settings.JOINT + unicode(self.author_type) def related_sources(self): return list(self.treatmentsource_related.all()) + \ list(self.operationsource_related.all()) + \ list(self.findsource_related.all()) + \ list(self.contextrecordsource_related.all()) class SourceType(GeneralType): class Meta: verbose_name = _(u"Source type") verbose_name_plural = _(u"Source types") class SupportType(GeneralType): class Meta: verbose_name = _(u"Support type") verbose_name_plural = _(u"Support types") class Format(GeneralType): class Meta: verbose_name = _(u"Format") verbose_name_plural = _(u"Formats") class Source(models.Model): title = models.CharField(_(u"Title"), max_length=300) external_id = models.CharField(_(u"External ID"), max_length=12, null=True, blank=True) source_type = models.ForeignKey(SourceType, verbose_name=_(u"Type")) support_type = models.ForeignKey(SupportType, verbose_name=_(u"Support"), blank=True, null=True,) format_type = models.ForeignKey(Format, verbose_name=_(u"Format"), blank=True, null=True,) scale = models.CharField(_(u"Scale"), max_length=30, null=True, blank=True) authors = models.ManyToManyField(Author, verbose_name=_(u"Authors"), related_name="%(class)s_related") associated_url = models.URLField(verify_exists=False, blank=True, null=True, verbose_name=_(u"Numerical ressource (web address)")) receipt_date = models.DateField(blank=True, null=True, verbose_name=_(u"Receipt date")) creation_date = models.DateField(blank=True, null=True, verbose_name=_(u"Creation date")) item_number = models.IntegerField(_(u"Item number"), default=1) reference = models.CharField(_(u"Ref."), max_length=25, null=True, blank=True) internal_reference = models.CharField(_(u"Internal reference"), max_length=25, null=True, blank=True) description = models.TextField(_(u"Description"), blank=True, null=True) comment = models.TextField(_(u"Comment"), blank=True, null=True) additional_information = models.TextField(_(u"Additional information"), blank=True, null=True) TABLE_COLS = ['title', 'source_type', 'authors',] class Meta: abstract = True def __unicode__(self): return self.title if settings.COUNTRY == 'fr': class Arrondissement(models.Model): name = models.CharField(u"Nom", max_length=30) department = models.ForeignKey(Department, verbose_name=u"Département") def __unicode__(self): return settings.JOINT.join((self.name, unicode(self.department))) class Canton(models.Model): name = models.CharField(u"Nom", max_length=30) arrondissement = models.ForeignKey(Arrondissement, verbose_name=u"Arrondissement") def __unicode__(self): return settings.JOINT.join((self.name, unicode(self.arrondissement))) class Town(models.Model): name = models.CharField(_(u"Name"), max_length=100) surface = models.IntegerField(_(u"Surface (m²)"), blank=True, null=True) center = models.PointField(_(u"Localisation"), srid=settings.SRID, blank=True, null=True) if settings.COUNTRY == 'fr': numero_insee = models.CharField(u"Numéro INSEE", max_length=6, unique=True) departement = models.ForeignKey(Department, verbose_name=u"Département", null=True, blank=True) canton = models.ForeignKey(Canton, verbose_name=u"Canton", null=True, blank=True) objects = models.GeoManager() class Meta: verbose_name = _(u"Town") verbose_name_plural = _(u"Towns") if settings.COUNTRY == 'fr': ordering = ['numero_insee'] def __unicode__(self): if settings.COUNTRY == "fr": return u"%s (%s)" % (self.name, self.numero_insee[:2]) return self.name